ようへいの日々精進XP

よかろうもん

DynamoDB の Scan でテーブルのデータを漏れなく取得する為のメモ

これは

qiita.com

初老丸 Advent Calendar 2017 6 日目の記事になる予定です.

これはやらかしの記録である

一年前に作った DynamoDB テーブルの Scan した結果を解析処理して REST API で返すだけのシンプルなシステム. ところが, 最近になって返却する値がとても不安定(意図した結果が返ってくる時と返ってこない時がある)になってしまい調査したところ...

Scan した結果に全てのレコードが含まれていないことを確認...

勢いでドキュメントを読むと...

結果セットに 1MB 制限がある

docs.aws.amazon.com

DynamoDB の API 呼び出しにおいて, Query や Scan の結果セットは, 呼び出しあたり 1MB という制限がある為, 1MB を超える場合には, レスポンスから LastEvaluatedKey を利用して 1MB 以上の結果を取得する必要がある.

ということで, 以下にダメ(1MB 制限を考慮していない)な実装と 1MB 制限を考慮した実装例を掲示する.

実装例

環境

$ python -V
Python 2.7.11

$ pip freeze | grep boto
...
boto==2.42.0
boto3==1.4.4
botocore==1.7.36
...

ダメなやつ

従来は以下のように何の変哲も無い scan() しているだけ.

import boto3
from boto3.session import Session

session = Session(profile_name='my-profile', region_name='ap-northeast-1')
dynamodb = session.resource('dynamodb')
dynamodb_table = dynamodb.Table('my-teble')


def scan_table_dame():
    response = dynamodb_table.scan()
    data = response['Items']
    return data

これだと, Scan の結果が 1MB を超えた場合に全ての結果を取得出来ないという悲しい状況になる.

1MB 以上の結果を取得するやつ

冒頭に掲載したドキュメントの通り LastEvaluatedKey を利用して, 以下のようにレスポンスに LastEvaluatedKey が含まれなくなるまでループさせてみると...全てのデータを取得することが出来る.

import boto3
from boto3.session import Session

session = Session(profile_name='my-profile', region_name='ap-northeast-1')
dynamodb = session.resource('dynamodb')
dynamodb_table = dynamodb.Table('my-teble')


def scan_table_ok():
    response = dynamodb_table.scan()
    data = response['Items']
    # レスポンスが 1MB になることを想定して,
    # レスポンスに LastEvaluatedKey が含まれなくなるまでループする
    while 'LastEvaluatedKey' in response:
        response = dynamodb_table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])

    return data

ちょっと冗長というか, エレガントな書き方では無いような気がする.

1MB 以上の結果を取得するやつ(2)

DynamoDB を Python から扱う場合には boto3 を利用するのが一般的だが, boto3 では以下のようなインターフェースが用意されている.

機能 クラス名 概要
Client DynamoDB.Client 低レベルな操作が可能なインターフェース
Paginators DynamoDB.Paginator.* 自動的なページングを提供
Waiters DynamoDB.Waiter.* 特定の状態に達するまでのブロックを提供
Service Resource DynamoDB.ServiceResource 高レベルのオブジェクト指向インタフェース
Table DynamoDB.Table テーブル操作

先述の 2 つの操作には Table インターフェースを利用していたわけだが, Pagenators というインターフェースも用意されていて, これを使うと以下のようにちょっとシンプルに書ける

import boto3
from boto3.session import Session

session = Session(profile_name='my-profile', region_name='ap-northeast-1')
dynamodb = session.client('dynamodb')


def scan_table_ok():
    paginator = dynamodb.get_paginator('scan')

    data = []
    # PaginationConfig で {'PageSize': 1} としておくと動作確認し易い
    # for page in paginator.paginate(TableName='my-table', PaginationConfig={'PageSize': 1}):
    for page in paginator.paginate(TableName='my-table'):
        data.extend(page['Items'])

    return data

この Pagenators を使えばええやんと思うのだが, 以下のように Service Resource で Scan した結果とフォーマットが若干異なるので注意が必要. 特に, Items キーの結果に SN 等の DataType が含まれているので, その後の加工処理をガチャガチャする必要があると思う.

  • Table インターフェースで Scan した場合
{
    'Items': [
        {
            'string': 'string'|123|Binary(b'bytes')|True|None|set(['string'])|set([123])|set([Binary(b'bytes')])|[]|{}
        },
    ],
...
  • Pagenators インターフェースで Scan した場合
{
    'Items': [
        {
            'string': {
                'S': 'string',
                'N': 'string',
                'B': b'bytes',
                'SS': [
                    'string',
                ],
                'NS': [
                    'string',
                ],
                'BS': [
                    b'bytes',
                ],
                'M': {
                    'string': {'... recursive ...'}
                },
                'L': [
                    {'... recursive ...'},
                ],
                'NULL': True|False,
                'BOOL': True|False
            }
        },
    ],
...

ということで

教訓

実装前にドキュメントを熟読する. 特に「制限」について書かれているドキュメントについては, 隅々まで読んでから実装する.

自分を殴りたい

一年越しで発見したバグを直したメモでした...一年前の自分を殴りたい.