2024.07.03

SNS から Step Functions を呼び出す:実践ガイド


はじめに

みなさんこんにちは、グループ研究開発本部 AI研究開発室のA.F(海外の出身)です。
最近、仕事で AWS を使い始め、Step Functions の実行で特定の問題に遭遇しました。そこで、今回のブログ記事では、その経験を共有したいと思います。

AWS Step Functions は、ワークフローをオーケストレーションするための強力なサーバーレスフレームワークを提供します。これは、さまざまな AWS サービスをイベント駆動型の「ステップ」に連結して、複雑なプロセスを自動化することを可能にします。また、統合されたアーキテクチャ可視化図を使用して、ワークフローを簡単に設計することができます。しかし、これらのワークフローを開始するにはどうすればよいでしょうか?コンソールから手動で Step Functions を実行することはできますが、より簡単なイベントやサービスを使用してワークフローをトリガーする方がはるかに効率的です。

ここで、Amazon Simple Notification Service (SNS) が登場します。目標は、SNS メッセージを送信することで、Step Functions のワークフローを自動的にトリガーすることです。SNS を使用すると、メール、アプリケーション、AWS Lambda などのさまざまなエンドポイントやサービスにメッセージを公開できます。ただし、SNS と Step Functions を直接統合することはできません。これを達成するためには、両者をつなぐ仲介サービスが必要です。
さまざまなアプローチを試した結果、SNS から Step Functions を呼び出すには、次の 2 つの簡単な方法があるという結論に至りました。

  1. Lambda を SNS と Step Functions の間の仲介役として使用します。
  2. API Gateway を使用して Step Functions 用の REST API を提供し、SNS と REST API URL を統合します。

方法 1:Lambda を仲介として活用する

この方法は、SNS から Step Functions を実行する最も簡単な方法かもしれません。実装方法は次のとおりです。

1. SNS トピックの作成

SNS コンソールページにアクセスし、新しいトピックを作成します。私の場合、FIFO ではなく Standard を使用しました。
create_sns_topic

2. Lambda 関数の作成と SNS のパブリッシュイベントを処理するよう設定

新しい Lambda 関数を作成するか、既存の関数を使用できます。どちらにしても、SNS イベントを正しく処理するようにしてください。

  • Lambda コンソールページにアクセスし、右上の「関数の作成」をクリックし、ケースに適した「ランタイム」と「アーキテクチャ」を選択します。
  • 次に、「ハンドラー」関数で、関連する「キー」を使用して SNS メッセージを取得し、必要な情報を抽出する必要があります。以下のコードを参考にしてください。stateMachineArn を Step Functions の ARN に置き換えてください。また、Step Functions の入力パラメータに応じて、Step Functions に送信する「入力キー」を調整する必要があります。
    
    import json
    import base64
    import boto3
    
    def lambda_handler(event, context):
        sns = boto3.client('sns')
        stepfunctions = boto3.client('stepfunctions')
    
        message = json.loads(event['Records'][0]['Sns']['Message'])
        # Extract relevant information from the message
    
        # Invoke Step Function
        response = stepfunctions.start_execution(
            stateMachineArn='YOUR_STEP_FUNCTION_ARN', # replace this with your ARN
            input=json.dumps({'data': message}) # adjust the 'data' part to your Step Function input parameter
        )
    
        return {
            'statusCode': 200,
            'body': json.dumps('Step Function invoked successfully.')
        }
    

3. Lambda を呼び出すよう SNS を設定

Lambda が準備できたので、トピックに対して Lambda 関数のサブスクリプションを作成します。

  • SNS コンソールページにアクセスし、トピックを選択して、「サブスクリプションの作成」をクリックします。
  • 「プロトコル」セクションで、Lambda を選択します。
  • 「エンドポイント」に、Lambda 関数の ARN を入力します。
  • 「サブスクリプションの作成」をクリックすると、サブスクリプションが自動的に確認されます。

4. 最後に、SNS メッセージを発行して統合をテストします

私の場合、SNS メッセージは CLI コマンドを使用して発行します。以下は、私のシェルスクリプトの例です。

SNS_TOPIC_ARN = "arn:aws:sns:ap-northeast-1:000000000000:InvokeStepFunction"
S3_BUCKET_URI = "s3://bucket/key/20240528/10/00.log"
AWS_REGION = "$(aws configure get region)"

log_date = "20240528"
sns_message = "{\"s3_bucket_uri\":\"$S3_BUCKET_URI\", \"log_date\":\"$log_date\"}"

aws sns publish --topic_arn $SNS_TOPIC_ARN --region $AWS_REGION --message $sns_message

これで、Step Functions コンソールに移動して、実行されているかどうかを確認できます。すべて正しく設定されていれば、以下のように、実行ペインに「実行中」ステータスが表示されます。

方法 2:API Gateway と REST API を活用する

この方法は、API Gateway を使用して Step Functions とやり取りする REST API を作成することを含みます。その後、SNS を設定してこの API エンドポイントをトリガーできます。

1. API Gateway で REST API を作成する

  • AWS API Gateway コンソールで、「API の作成」をクリックし、「REST API」を選択します。
  • API 名を入力したら、API エンドポイントタイプを選択します。私の場合は、リージョンタイプを選択し、「ビルド」をクリックします。
  • 「メソッドの作成」プロセスで、「POST」をメソッドタイプとして選択します。
  • 「統合タイプ」で、「AWS サービス」を選択します。
  • AWS サービスに「Step Functions」を選択し、アクション名に「ExecuteStateMachine」を選択します。
  • 実行ロールについては、Step Functions を実行する権限を持つロール(IAM ロール経由)を作成します。以下は、Step Functions を実行するための権限ポリシーの例です。
    {
      "Version": "2012-10-17",
      "Statement":[
        {
          "Sid": "APItoExecuteStepFunction",
          "Effect": "Allow",
          "Action": "states:StartExecution",
          "Resource": "your-step-function-arn"
        }
      ]
    }
    
  • スムーズなデバッグと監視のために、使用しているロールに AmazonAPIGatewayPushToCloudWatchLogs 権限を追加します。また、以下の例のように、信頼エンティティを含めるように「信頼関係」を変更してください。

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "apigateway.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
  • これで、API をデプロイできます。新しいステージ名(例:invoke-step-function)を付ける必要があります。これは、REST API の URL のサフィックスとして使用されます。例えば、私の場合は次のようになります。
    https://xxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/invoke-step-function
  • 最後に、REST API のロギングを有効にするには、API Gateway コンソールに移動し、左側のサイドバーからメニューの一番下にある「設定」に移動します。次に、ロギング情報を編集して、ロールの ARN を入力します。

2. API Gateway エンドポイントをトリガーするように SNS を設定する

  • SNS コンソールに移動し、既存の SNS トピックを開くか、新しいトピックを作成します(上記の方法 1 を参照)。
  • トピックに新しいサブスクリプションを作成します。プロトコルは、Lambda ではなく HTTPS を選択し、エンドポイントには REST API URL を入力します。

Lambda サブスクリプションは自動的に確認されますが、HTTPS プロトコルでは、SNS サブスクリプションを手動で確認する必要があります。一般的に、SNS サブスクリプションを確認するには、次の 2 つの方法があります。(1)SNS から送信されたサブスクリプション URL にアクセスする、または(2)AWS SDK または CLI の confirm-subscription を使用して、SNS トピックの ARN とサブスクリプショントークンを送信する。オプション 1 は最も簡単ですが、私の場合、サブスクリプション URL の完全なものが取得できなかったので、オプション 2 を選択しました。サブスクリプショントークンはどのように取得するのでしょうか?ここで、CloudWatch ロギングの出番です。

SNS トピックへのサブスクリプションを作成すると、その SNS サブスクリプションイベントは、API Gateway のロググループの下の CloudWatch に記録されます。CloudWatch コンソールに移動し、API Gateway の名前を使用して正しいロググループを検索し、以下のような「SubscriptionConfirmation」メッセージを含むログを見つけ、”Token” の値を取得します。

トークンを取得したら、以下のように AWS CLI コマンドを使用して、サブスクリプションを確認できます。

aws sns confirm-subscription --topic-arn arn:aws:sns:ap-northeast-2:123456789012:InvokeStepFunction \
--token 2336412f37fb687f5d51e6e241d7700ae02f7124d8268910b858cb4db727ceeb2474bb937929d3bdd7ce5d0cce19325d036bc858d3c217426bcafa9c501a2cace93b83f1dd3797627467553dc438a8c974119496fc3eff026eaa5d14472ded6f9a5c43aec62d83ef5f49109da7176391

または、boto3 と AWS SDK 関数を使用して、以下のように確認できます。

import boto3

sns = boto3.client('sns')
topic_arn = 'arn:aws:sns:ap-northeast-2:123456789012:InvokeStepFunction'
token = '2336412f37fb687f5d51e6e241d7700ae02f7124d8268910b858cb4db727ceeb2474bb937929d3bdd7ce5d0cce19325d036bc858d3c217426bcafa9c501a2cace93b83f1dd3797627467553dc438a8c974119496fc3eff026eaa5d14472ded6f9a5c43aec62d83ef5f49109da7176391'

sns.confirm_subscription(
  TopicArn=topic_arn,
  Token=token
)

どちらかのコマンドを実行すると、SNS サブスクリプションが確認されます。
最後に、前の方法 1 と同様に、SNS メッセージを送信して、SNS、API Gateway、Step Functions をテストする必要があります。

まとめ

このブログ記事では、SNS から Step Functions を呼び出すための 2 つの実際的な方法を紹介しました。特定のニーズとプロジェクトのアーキテクチャに合わせて、最適な方法を選択することができます。SNS メッセージから取得したメッセージのフォーマットと、Step Functions で使用しているフォーマットが互換性があることを確認してください。また、使用しているロールに必要な権限ポリシーと信頼関係を含める必要があります。

最後に

グループ研究開発本部 AI研究開発室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務などAI研究開発室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集要項一覧からご応募をお願いします。 一緒に勉強しながら楽しく働きたい方のご応募をお待ちしております。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事