Embulkで複雑なクエリを処理してデータ移行する

こんにちは、アスクルのユウです。
Embulkを用いてデータ移行をしたのですが、データ抽出のクエリが複雑になってしまい思ったようにパフォーマンスがでませんでした。
今回対応した方法を記載します。

背景

アスクルで商品の単価情報を管理しているシステムを新しいシステムに刷新するというPJがありました。
スケジュールがタイトですべての機能を新システムに実装してリリースというのが難しく、
一部機能は旧システムで利用し続けるという方針になりました。
そのため、新システムから旧システムにデータ同期をする必要があり、毎時でデータ移行をする必要がありました。
データ移行には過去に利用したことがあったEmbulkを採用しました。
新システムはPostgreSQL、旧システムではDB2を使用していました。

環境

移行対象データ

旧システムと新システムではテーブルの正規化によりデータの構造が変わったため、移行元のクエリが長くなりました。
また、Embulkではコンフィグの query に移行元クエリを定義する必要があり、1つのSQLで記述する必要がありました。
その結果、次のような500行超えSQLになりました。

with target as (
  select
    upload_id,
    status
  from
    upload_status
  where
    uploaded_at between current_timestamp - interval '1 hours' and now()
)
select
  item_code,
  item_price,
  -- 省略
from
  item_price inner join target
    on item.upload_id = target.upload_id
where
  item_price.is_active = true
  and target.status = 1
union all
select
  item_code,
  item_price,
  -- 省略
from
  item_price inner join target
    on item.upload_id = target.upload_id
where
  item_price.is_active = true
  and target.status = 2
union all
select
  item_code,
  item_price,
  -- 省略

発生した問題

移行対象データが多くなると、移行処理が終わらなくなりました。
ログを確認していたところ、embulk-input-postgresql では fetch_rows に指定した行数をcursor変数に展開しているようでした。
responseが遅いクエリの場合に、cursor変数に展開しようとすると返ってこなくなっているようでした。
実行計画をとってクエリを改善しようとしても、union all で複数クエリをつなげて取得しているため難しい状況でした。

対応方法

embulk-input-postgresql 側のコンフィグで何か対策できないかを見ていたところ、
before_setup というコンフィグがあることに気づきました。
before_setupquery で定義したクエリが実行される前に行う処理を定義します。
responseが遅いクエリだとcursor変数に展開しようとすると遅延するため、query に定義したクエリでデータが高速に取得できれば改善できると考えました。
そのため、before_setup で移行用のworkテーブルにデータ挿入して、queryではworkテーブルから参照するように変更しました。

変更後イメージ

before_setup: |
  truncate table main.migration_work;
  insert into main.migration_work
    with target as (
      select
        upload_id,
        status
      from 
        upload_status
      where 
        uploaded_at between current_timestamp - interval '1 hours' and now()
    )
    select
      item_code,
      item_price,
      -- 省略
    from
      item_price inner join target
      on item.upload_id = target.upload_id
    where
      item_price.is_active = true
      and target.status = 1
  ;
  insert into main.migration_work
    with target as (
    select
      upload_id,
      status
    from 
      upload_status
    where 
      uploaded_at between current_timestamp - interval '1 hours' and now()
    )
    select
      item_code,
      item_price,
      -- 省略
    from
      item_price inner join target
      on item.upload_id = target.upload_id
    where
      item_price.is_active = true
      and target.status = 2
  ;

query: select * from main.migration_work;

この変更により、データ抽出部分がシンプルになり無事にデータ移行できました。
変更前は200万レコードの連携で1時間経っても処理が終わりませんでしたが、変更後は20分ぐらいで終わるようになりました。

最後に

Embulkで頑張らなくても、データ移行用のテーブルを他のバッチ処理で作成するなどの方法もあるかと思います。
今回は旧システムが近い未来なくなるので、凝った仕組みを作りたくなかったためEmbulkですべて対応しました。
私の経験がどなたかの役にたてば幸いです。
それでは。

ASKUL Engineering BLOG

2021 © ASKUL Corporation. All rights reserved.