Galapagos Tech Blog

株式会社ガラパゴスのメンバーによる技術ブログです。

AWS Lambda 非同期呼び出しのご紹介(aioboto3)

Python に入門しなきゃと思いつつ半年が経ってしまった AIR Design for Marketing 事業部バックエンドエンジニアの成田です。

AWS Lambda の呼び出しを aioboto3 を使って非同期化してみたところ、呼び出し完了までが 100 倍速 ほどになり大変捗ったのでご紹介します。

結論どんな感じのコードを書けばよさそうか

同期/非同期それぞれのサンプルをまず書いておきます。

同期的なコード

session = boto3.Session()
lambda_client = session.client("lambda")


def invoke_lambda_X(batch):
    res = lambda_client.invoke(
        FunctionName="LAMBDA_FUNCTION_X",
        InvocationType="RequestResponse",
        Payload=batch
    )
    return res["Payload"].read()


results = []
for batch in batches:
    results.append(invoke_lambda_X(batch))

非同期的なコード

import asyncio

import aioboto3
from aioboto3.session import AioConfig

max_connections = 200 # 実際に動かして調整してください


async def invoke_lambda_X(batch, lambda_client):
    res = await lambda_client.invoke(
        FunctionName="LAMBDA_FUNCTION_X",
        InvocationType="RequestResponse",
        Payload=batch
    )
    return await res["Payload"].read()


async def main():
    session = aioboto3.Session()
    async with session.client(
        "lambda", config=AioConfig(
            max_pool_connections=max_connections)
    ) as lambda_client:
        return await asyncio.gather(
            *(invoke_lambda_X(batch, lambda_client))
            for batch in batches)


results = asyncio.run(main())

目的としているシチュエーション

改めて、上のコードを書く目的から始めようと思います。

まず AWS Lambda の関数は、1 つ 1 つはシンプルな単位に切り出しておいて、これらを組み合わせて複雑な処理を実現する、といった使い方ができます。

そこで、1 つの処理を AWS Lambda の関数 X として切り出し、別の Lambda に書かれたメインループからバッチ処理的に X を複数呼び出して結果を受け取るといった処理を考えてみます。

f:id:glpgsinc:20220202133725p:plain:w300
システムイメージ

results = []
for batch in batches:
    results.append(invoke_lambda_X(batch))

各バッチの処理の中でメインループは Lambda と通信しますが、Lambda の計算結果を利用するためこの通信自体は Lambda の完了を待つ必要があります。上記のコードのように同期的なループを回している場合、バッチの数だけ全体の処理時間が詰み重なります。仮に各バッチが 1 秒で終わるとしても、高々 1000 バッチ程度で Lambda の制限時間 15 分を超える処理になってしまう計算です。

メインループはその大半の時間を Lambda の応答待ちに消費しています。そのため非同期処理により各バッチを同時に走らせることで、全体の処理時間短縮が狙えます。

非同期処理の実装

今回は aioboto3 というライブラリを選びました。boto3 とインタフェースが近しいので、あまり迷わず使えます。実際 aioboto3 は boto3 と aio-libs/aiobotocore を組み合わせたラッパーです。(aio-libs は async まわりの高品質なライブラリを充実させているようです。)

使用する場合、メインループで Lambda クライアントを 1 つ作成し、各バッチのコルーチンで共有させるのがよいようです。

# コルーチンにクライアントを渡す
async def invoke_lambda_X(batch, lambda_client):
    ...

async with session.client(
        "lambda", config=AioConfig(...)
    ) as lambda_client:
    await asyncio.gather(*(invoke_lambda_X ...))

デフォルトだと Lambda クライアントは最大 10 個の通信を同時に稼動させます。同時接続数を増やしたい場合は、クライアント作成時に設定してやります。

from aiobotocore.config import AioConfig

# 同時接続数を 200 にする。
async with session.client(
        "lambda", config=AioConfig(
            max_pool_connections=200)
    ) as lambda_client:
    ...

同時接続数が多い方が全体の処理は速くなりますが、同時接続数をどの程度まで増やせるかは実行環境によると思いますので、都度試すのがよいと考えています。

どれくらい速くなるか

1 バッチごとの処理時間が十分長い場合、オーバヘッドを無視できるので、同時接続数の分だけ速くなります。例えば同時接続数が 100 なら 100 倍速くなります。(もちろん呼び出した Lambda の分の料金は変わらないですが。。。)

今回実際に適用したケースでは、1 バッチごとに 2 秒ほどかかる処理で、またマシンリソースに余裕がある状態でしたが、約 60 倍ほど速くメインループを完了させられました。

まとめ

今回のユースケースについては、 aioboto3, aiobotocore のドキュメントがあまり充実していないようで、動かすまでに割と時間がかかりました。(背景知識がしっかりあれば困らないのでしょうが。。。)

機会がありましたら是非お試しください。

参考にさせていただいた記事

qiita.com

最後に

弊社ではいっしょに働くエンジニアを募集しています! ご興味のある方はぜひご応募いただけますと嬉しいです。

www.wantedly.com