こんにちは、アスクルのユウです。
Embulkを用いてデータ移行をしたのですが、データ抽出のクエリが複雑になってしまい思ったようにパフォーマンスがでませんでした。
今回対応した方法を記載します。
背景
アスクルで商品の単価情報を管理しているシステムを新しいシステムに刷新するというPJがありました。
スケジュールがタイトですべての機能を新システムに実装してリリースというのが難しく、
一部機能は旧システムで利用し続けるという方針になりました。
そのため、新システムから旧システムにデータ同期をする必要があり、毎時でデータ移行をする必要がありました。
データ移行には過去に利用したことがあったEmbulkを採用しました。
新システムはPostgreSQL、旧システムではDB2を使用していました。
環境
- PostgreSQL 15.4
- DB2(バージョンは割愛)
- embulk-input-postgresql
- embulk-output-jdbc
移行対象データ
旧システムと新システムではテーブルの正規化によりデータの構造が変わったため、移行元のクエリが長くなりました。
また、Embulkではコンフィグの query
に移行元クエリを定義する必要があり、1つのSQLで記述する必要がありました。
その結果、次のような500行超えSQLになりました。
with target as ( select upload_id, status from upload_status where uploaded_at between current_timestamp - interval '1 hours' and now() ) select item_code, item_price, -- 省略 from item_price inner join target on item.upload_id = target.upload_id where item_price.is_active = true and target.status = 1 union all select item_code, item_price, -- 省略 from item_price inner join target on item.upload_id = target.upload_id where item_price.is_active = true and target.status = 2 union all select item_code, item_price, -- 省略
発生した問題
移行対象データが多くなると、移行処理が終わらなくなりました。
ログを確認していたところ、embulk-input-postgresql では fetch_rows
に指定した行数をcursor変数に展開しているようでした。
responseが遅いクエリの場合に、cursor変数に展開しようとすると返ってこなくなっているようでした。
実行計画をとってクエリを改善しようとしても、union all
で複数クエリをつなげて取得しているため難しい状況でした。
対応方法
embulk-input-postgresql 側のコンフィグで何か対策できないかを見ていたところ、
before_setup
というコンフィグがあることに気づきました。
before_setup
は query
で定義したクエリが実行される前に行う処理を定義します。
responseが遅いクエリだとcursor変数に展開しようとすると遅延するため、query
に定義したクエリでデータが高速に取得できれば改善できると考えました。
そのため、before_setup
で移行用のworkテーブルにデータ挿入して、queryではworkテーブルから参照するように変更しました。
変更後イメージ
before_setup: | truncate table main.migration_work; insert into main.migration_work with target as ( select upload_id, status from upload_status where uploaded_at between current_timestamp - interval '1 hours' and now() ) select item_code, item_price, -- 省略 from item_price inner join target on item.upload_id = target.upload_id where item_price.is_active = true and target.status = 1 ; insert into main.migration_work with target as ( select upload_id, status from upload_status where uploaded_at between current_timestamp - interval '1 hours' and now() ) select item_code, item_price, -- 省略 from item_price inner join target on item.upload_id = target.upload_id where item_price.is_active = true and target.status = 2 ; query: select * from main.migration_work;
この変更により、データ抽出部分がシンプルになり無事にデータ移行できました。
変更前は200万レコードの連携で1時間経っても処理が終わりませんでしたが、変更後は20分ぐらいで終わるようになりました。
最後に
Embulkで頑張らなくても、データ移行用のテーブルを他のバッチ処理で作成するなどの方法もあるかと思います。
今回は旧システムが近い未来なくなるので、凝った仕組みを作りたくなかったためEmbulkですべて対応しました。
私の経験がどなたかの役にたてば幸いです。
それでは。