こんにちは。ASKULのほかほかごはんです。最近は商品データ管理の開発を担当しています。
ASKULではBtoB, BtoCのお客様に最適な商品を提供するために大量の商品データを管理しています。 一方で、データを加工する夜間バッチの処理速度に問題を抱えていました。
今回、商品データ管理にDynamoDBを利用することで、バッチ処理におけるパフォーマンスの大幅更新にチャレンジしました 🚀
処理速度は上がった。しかし...
いろいろ苦労して (省略 😅) DynamoDBを導入できました。
結果としてバッチ処理速度は大幅更新! 数時間かかっていた処理が数10分で終わるようになるような劇的な改善も見られるように 🌟
しかし、処理データ量が増えるにつれ、次のようなExceptionが発生するようになりました...。
ThrottlingException: Throughput exceeds the current capacity for one or more global secondary indexes. DynamoDB is automatically scaling your index so please try again shortly.
global secondary indexesでThrottlingException 🤔 これは一体...。
DynamoDBにおけるワークロードの分割
DynamoDBはPartition Keyでデータを格納する論理パーティションを決定します (詳細はリンクの記事参照) 。 そのため、Partition Key Valueの数を増やし、I/Oを均等に分配する必要があります。
ところで、今回ThrottlingExceptionが頻発していたグローバルセカンダリインデックスのPartition Keyは「商品の有効開始日 (uuuuMMdd)」でした。
前述のAWSの記事で均一性「不良」の代表にあげられとるやないかーい 😢
- グローバルセカンダリインデックスのPartition Key Valueが少なかったこと
- 特定の日付について書き込み処理を行ったことで1つの論理パーティションに書き込みが集中したこと
がエラー原因でした。 テーブル自体のPartition Keyには気を使っていましたが、グローバルセカンダリインデックスの定義でもPartition Key Valueの数を意識することが大切だと学びました。
書き込みシャーディングによる対策
日付のようなPartition Keyを用いる場合、書き込みシャーディングを利用することが有効な解決策になります。
今回は対策として、AWSの記事のように日付に0 - 199のサフィックスをつけることでワークロードを分散させることにしました。
具体的には次のような手順でプログラムを修正しています。
- Dateに0 - 199のサフィックスをランダムに付与したDateWithSuffix属性をテーブルに追加する
- DateWithSuffixをPartition KeyにしたIndexを作成する
- プログラムで利用するIndexを切り替える
- DateをPartition KeyにしたIndexを削除する
Indexの修正
修正前のIndex
DateIndex
Partition Key |
---|
Date |
Dateには 20211123 などの日付が値として設定される。
修正後のIndex
DateWithSuffixIndex
Partition Key |
---|
DateWithSuffix |
DateWithSuffixには 20211123.0 〜 20211123.199 という分散した値が設定される。
プログラムの修正
プログラムはGoを利用しています。DateWithSuffixIndexを利用するためにQueryのコードを修正しました。
修正前のコード
func (service QueryService) Fetch(date string) <-chan entity.ItemEntity { return service.query(date) }
修正後のコード
後続処理に影響を与えないように次のように修正しました。すべてのサフィックスに対してQueryを実行し、Channelをマージしています。
func (service QueryService) Fetch(date string) <-chan entity.ItemEntity { cs := make([]<-chan entity.ItemEntity, 200) for i := 0; i < 200; i++ { cs[i] = service.query(fmt.Sprintf("%s.%d", date, i)) } return merge(cs) } // merge Channel配列を1つのChannelにmergeする func merge(cs []<-chan entity.ItemEntity) <-chan entity.ItemEntity { out := make(chan entity.ItemEntity) var wg sync.WaitGroup wg.Add(len(cs)) for _, c := range cs { go func(c <-chan entity.ItemEntity) { defer wg.Done() for e := range c { out <- e } }(c) } go func() { wg.Wait() close(out) }() return out }
プログラム更新後、ThrottlingExceptionを起こしていたDateIndexを削除することで例外が発生しなくなりました ☺️
まとめ
- DynamoDB Tableを作成する際にはワークロードを分散させるためにPartition Key Valueの数が多くなるPartition Keyを利用する
- GSIについても同様の考慮が必要
今後も開発で得た知見を共有していきます 💪
We are hiring !!
ASKUL 商品チームでは一緒に働く仲間を募集しています! 次のリンクよりお気軽にご応募ください。