DynamoDBのThrottlingExceptionを書き込みシャーディングで対策した記録

こんにちは。ASKULのほかほかごはんです。最近は商品データ管理の開発を担当しています。

ASKULではBtoB, BtoCのお客様に最適な商品を提供するために大量の商品データを管理しています。 一方で、データを加工する夜間バッチの処理速度に問題を抱えていました。

今回、商品データ管理にDynamoDBを利用することで、バッチ処理におけるパフォーマンスの大幅更新にチャレンジしました 🚀

処理速度は上がった。しかし...

いろいろ苦労して (省略 😅) DynamoDBを導入できました。

結果としてバッチ処理速度は大幅更新! 数時間かかっていた処理が数10分で終わるようになるような劇的な改善も見られるように 🌟

しかし、処理データ量が増えるにつれ、次のようなExceptionが発生するようになりました...。

ThrottlingException: Throughput exceeds the current capacity for one or more global secondary indexes. DynamoDB is automatically scaling your index so please try again shortly.

global secondary indexesでThrottlingException 🤔 これは一体...。

DynamoDBにおけるワークロードの分割

DynamoDBはPartition Keyでデータを格納する論理パーティションを決定します (詳細はリンクの記事参照) 。 そのため、Partition Key Valueの数を増やし、I/Oを均等に分配する必要があります。

ところで、今回ThrottlingExceptionが頻発していたグローバルセカンダリインデックスのPartition Keyは「商品の有効開始日 (uuuuMMdd)」でした。

前述のAWSの記事で均一性「不良」の代表にあげられとるやないかーい 😢

  • グローバルセカンダリインデックスのPartition Key Valueが少なかったこと
  • 特定の日付について書き込み処理を行ったことで1つの論理パーティションに書き込みが集中したこと

がエラー原因でした。 テーブル自体のPartition Keyには気を使っていましたが、グローバルセカンダリインデックスの定義でもPartition Key Valueの数を意識することが大切だと学びました。

書き込みシャーディングによる対策

日付のようなPartition Keyを用いる場合、書き込みシャーディングを利用することが有効な解決策になります。

今回は対策として、AWSの記事のように日付に0 - 199のサフィックスをつけることでワークロードを分散させることにしました。

具体的には次のような手順でプログラムを修正しています。

  • Dateに0 - 199のサフィックスをランダムに付与したDateWithSuffix属性をテーブルに追加する
  • DateWithSuffixをPartition KeyにしたIndexを作成する
  • プログラムで利用するIndexを切り替える
  • DateをPartition KeyにしたIndexを削除する

Indexの修正

修正前のIndex

DateIndex

Partition Key
Date

Dateには 20211123 などの日付が値として設定される。

修正後のIndex

DateWithSuffixIndex

Partition Key
DateWithSuffix

DateWithSuffixには 20211123.0 〜 20211123.199 という分散した値が設定される。

プログラムの修正

プログラムはGoを利用しています。DateWithSuffixIndexを利用するためにQueryのコードを修正しました。

修正前のコード

func (service QueryService) Fetch(date string) <-chan entity.ItemEntity {
    return service.query(date)
}

修正後のコード

後続処理に影響を与えないように次のように修正しました。すべてのサフィックスに対してQueryを実行し、Channelをマージしています。

func (service QueryService) Fetch(date string) <-chan entity.ItemEntity {
    cs := make([]<-chan entity.ItemEntity, 200)
    for i := 0; i < 200; i++ {
        cs[i] = service.query(fmt.Sprintf("%s.%d", date, i))
    }
    return merge(cs)
}

// merge Channel配列を1つのChannelにmergeする
func merge(cs []<-chan entity.ItemEntity) <-chan entity.ItemEntity {
    out := make(chan entity.ItemEntity)

    var wg sync.WaitGroup
    wg.Add(len(cs))

    for _, c := range cs {
        go func(c <-chan entity.ItemEntity) {
            defer wg.Done()
            for e := range c {
                out <- e
            }
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

プログラム更新後、ThrottlingExceptionを起こしていたDateIndexを削除することで例外が発生しなくなりました ☺️

まとめ

  • DynamoDB Tableを作成する際にはワークロードを分散させるためにPartition Key Valueの数が多くなるPartition Keyを利用する
  • GSIについても同様の考慮が必要

今後も開発で得た知見を共有していきます 💪

We are hiring !!

ASKUL 商品チームでは一緒に働く仲間を募集しています! 次のリンクよりお気軽にご応募ください。

hrmos.co

参考

パーティションキーを設計してワークロードを均等に分散する - Amazon DynamoDB

書き込みシャーディングを使用してワークロードを均等に分散させる - Amazon DynamoDB

ASKUL Engineering BLOG

2021 © ASKUL Corporation. All rights reserved.