2021.04.07

GCPのCloud TasksとCloud Pub/Subの違い

こんにちは。次世代システム研究室のM.Mです。

前回のブログでは、はじめてのGoogle App Engineということで、Google App EngineでWEBアプリを作った場合、ログはどのように確認するのか、redisやMySQLとはどのように連携するのかについて紹介しました。
また、WEBアプリは前回実施したKVSやRDBMSとの連携以外にも、他サービスが提供するAPIとの連携も多く行われるので、Google App Engineで作られたWEBアプリからGoogle Cloud Platformにて提供されているサービスを利用して非同期で他のAPIを実行する方法について確認しました。
(以下Google App EngineをGAE、Google Cloud PlatformをGCPと記載します)

1. Cloud TasksとCloud Pub/Sub

GCPにて提供されている非同期処理を実現するサービスとしては、Cloud TasksとCloud Pub/Subがあります。

どちらも非同期でAPIを実行させることができます。
ただし、違いも多く、どのような用途で非同期処理をさせるのか把握したうえで選択する必要はありそうです。違いについては以下にもまとめられていました。
Pub/Sub または Cloud Tasks の選択
ただ、GCPのConsole画面でどのような操作や確認ができるのか実際に使ってみたほうが理解が深まると思い実際に実装して確認することにしました。

GAEの構成はすべてPython3.8、Flaskを利用します。利用しているファイルは以下のみです。

  • app.yml
  • main.py
  • requirements.txt

2. Cloud Tasksでの非同期処理(GAE -> Cloud Tasks -> GAE)

システム構成は以下の通りです。

Cloud Tasksのキューを作成します

キューの作成自体はドキュメント(Cloud Tasks キューのクイックスタート)を参照すれば簡単にできました。
以下の図はキュー名をtest02-queueとして作成後のGCPのConsole画面になります。

現在のタスクの数などの確認ができ、表示されているキュー名を選択することで、さらにキューに登録されているタスクについても確認できるようになっています。

キューにタスクを登録するWEBアプリをGAEで作成します

上記、構成図にあるmm-gae-test02-frontappのGAEの部分です。
ソースコードは以下のようになっています。

■app.yml

service: mm-gae-test02-frontapp
runtime: python38

タスクを登録するGAEのサービスとしてmm-gae-test02-frontappを指定しています。

■requirements.txt

Flask==1.1.2
google-cloud-tasks==2.1.0

google-cloud-tasks==2.1.0にてCloud Tasksのモジュールを利用できるようにします。

■main.py

import time
import json
from flask import Flask, request
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

app = Flask(__name__)

@app.route('/send_task', methods=['GET'])
def send_task():

    project = 'xxxxxxxxxxxxxxxxxxxxxxx'
    queue = 'test02-queue'
    location = 'asia-northeast1'

    client = tasks_v2.CloudTasksClient()
    parent = client.queue_path(project, location, queue)

    payload = {'test_param_name': 'test_value'}

    task = {
        'app_engine_http_request': {
            'http_method': tasks_v2.HttpMethod.POST,
            'headers': {"Content-type": "application/json"},
            'relative_uri': '/execute_task',
            'app_engine_routing': {
                'service': 'mm-gae-test02-backapp'
            }
        }
    }
    payload = json.dumps(payload)
    converted_payload = payload.encode()
    task['app_engine_http_request']['body'] = converted_payload

    schedule_time = time.time() + 300
    seconds = int(schedule_time)
    timestamp = timestamp_pb2.Timestamp(seconds=seconds)
    task['schedule_time'] = timestamp

    response = client.create_task(parent=parent, task=task)

    print('Created task {}'.format(response.name))
    return 'Created task {}'.format(response.name)

今回は、キューに登録されたタスクから実行するAPIもGAEで作成されたWEBアプリ(上記、構成図のmm-gae-test02-backappの部分)にしているので、作成するタスクをapp_engine_http_requestとして作成しています。(22行目の部分)
また、Cloud Tasksでは実行する時間も指定することができるので、5分後に実行するように設定しています。(35行目の部分)
送信するデータは{‘test_param_name’: ‘test_value’}としています。(19行目の部分)

キューに登録したタスクから実行されるAPIをGAEで作成します

上記、構成図にあるmm-gae-test02-backappのGAEの部分です。
ソースコードは以下のようになっています。

■app.yml

service: mm-gae-test02-backapp
runtime: python38

タスクから実行されるAPIもGAEで実装するので、サービスとしてmm-gae-test02-backappと指定して、先ほどのタスクを登録するGAEのWEBアプリ(mm-gae-test02-frontapp)とは別のサービスとして作成します。

■requirements.txt

Flask==1.1.2

タスクから実行されるAPI自体はCloud Tasksの操作はしないので、Flaskだけで問題ありません。

■main.py

from flask import Flask, request

app = Flask(__name__)

@app.route('/execute_task', methods=['POST'])
def execute_task():
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return 'Printed task payload: {}'.format(payload)

今回はタスクから実行されて、意図したデータを受け取れているかどうかの連携確認ができればいいだけなので、受け取ったデータをログで確認できるように出力しているだけです。

実行と確認

キューにタスクを登録するURLにアクセス

デプロイしたGAEのサービス(mm-gae-test02-frontapp)に用意したタスク登録用URLにアクセスします。
今回、タスク登録するパスはsend_taskとしたので、以下のようなURLにアクセスします。

https://mm-gae-test02-frontapp-dot-xxxxxxxxxxxxxxxx.an.r.appspot.com/send_task

タスクの登録確認

GCPのConsole画面によりキューへのタスク登録状況を確認します。
以下の図のようにタスクが登録されていることが確認でき、またETAが作成日時の5分後になっていることも確認できます。

ETAの時間になると作成したGAEのWEBアプリ(mm-gae-test02-backapp)のexecute_taskにリクエストが送信されます。
送信されたかどうかはGCPのColsole画面のロギングから確認することができます。

3. Cloud Tasksでの非同期処理(GAE -> Cloud Tasks -> 外部サイトAPI)

システム構成は以下の通りです。

外部サイトAPIと連携する実装は以下のソースコードとなります。
app.ymlとrequirements.txtは上記mm-gae-test02-frontappで作成したGAEと同じです。

■main.py

import time
import json
from flask import Flask, request
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

app = Flask(__name__)

@app.route('/send_task', methods=['GET'])
def send_task():

    project = 'xxxxxxxxxxxxxxxxxxxxxxx'
    queue = 'test02-queue'
    location = 'asia-northeast1'

    client = tasks_v2.CloudTasksClient()
    parent = client.queue_path(project, location, queue)

    task = {
        'http_request': {
            'http_method': tasks_v2.HttpMethod.GET,
            'url': 'https://xxxx.example.com',
        }
    }

    schedule_time = time.time() + 300
    seconds = int(schedule_time)
    timestamp = timestamp_pb2.Timestamp(seconds=seconds)
    task['schedule_time'] = timestamp

    response = client.create_task(parent=parent, task=task)

    print('Created task {}'.format(response.name))
    return 'Created task {}'.format(response.name)

違いは20-22行目の部分のみです。
キューにタスクを登録する際に、前回のapp_engine_http_requestとしてタスクを作るのではなく、20行目にあるようにhttp_requestとして登録すればよいです。

実行や確認については、GAE -> Cloud Tasks -> GAEの構成の場合と同じなのでここでは記載しませんが、GCPのColsole画面での違いもなく、問題なく実行されることの確認もできました。

4. Cloud Tasksでの多重実行制御

Cloud Tasksの特徴として多重実行の制御があります。
上記にて紹介した例ではタスクIDを指定してタスクを作成していませんでしたが、タスクIDを指定してタスクを作成することで、同じタスクIDのタスクが登録済みであれば、再登録できないようにすることができます。
割り当てと上限のタスクの重複排除の期間によると、タスクが実行または削除されてから1時間は同じタスクIDでのタスクが作成できないとのこと。
同じタスクIDでタスクを作成しようとすると以下のようなエラーが発生します。

google.api_core.exceptins.AlreadyExists: 409 Requested entity already exists

タスクIDを指定した場合のソースコードは以下になります。

■main.py

import time
import datetime
import json
from flask import Flask, request
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from google.api_core.exceptions import AlreadyExists

app = Flask(__name__)

@app.route('/send_task', methods=['GET'])
def send_task():

    project = 'xxxxxxxxxxxxxxxxxxxxxxx'
    queue = 'test02-queue'
    location = 'asia-northeast1'

    client = tasks_v2.CloudTasksClient()
    parent = client.queue_path(project, location, queue)

    payload = {'test_param_name': 'test_value'}

    task_id = 'test' + datetime.datetime.now().strftime('%Y%m%d%H%M')
    task_name = parent + '/tasks/{}'.format(task_id)

    task = {
        'name' : task_name,
        'app_engine_http_request': {
            'http_method': tasks_v2.HttpMethod.POST,
            'headers': {"Content-type": "application/json"},
            'relative_uri': '/execute_task',
            'app_engine_routing': {
                'service': 'mm-gae-test02-backapp'
            }
        }
    }
    payload = json.dumps(payload)
    converted_payload = payload.encode()
    task['app_engine_http_request']['body'] = converted_payload

    schedule_time = time.time() + 300
    seconds = int(schedule_time)
    timestamp = timestamp_pb2.Timestamp(seconds=seconds)
    task['schedule_time'] = timestamp

    try:
        response = client.create_task(parent=parent, task=task)
        print('Created task {}'.format(response.name))
        return 'Created task {}'.format(response.name)
    except AlreadyExists:
        return 'Requested entity already exists'
    except:
        return 'Unexpected Error'

23行目にてタスクIDを決めて、27行目でタスクIDを含めた形でタスク名を設定しています。23行目で作成したタスクIDには年月日時分を付けているので、1分間に1回しか登録できないタスクとなっています。

5. Cloud Tasksでのリトライ処理

キューに登録されたタスクが正常に処理できなかった場合、設定したルールに基づいてリトライされます。
今回はわざとエラーを返すAPIを作って試そうと思います。
構成は、 上記にて実施したGAE -> Cloud Tasks -> GAEと同様の構成で、GAE(mm-gae-test2-backapp)は必ず400エラー返すように修正します。
以下のように必ず400エラーを返すように修正して、再度デプロイします。

■main.py

from flask import Flask, request, make_response

app = Flask(__name__)

@app.route('/execute_task', methods=['POST'])
def execute_task():
    return make_response('', 400)

再度タスクを登録し、以下はタスクが登録された状態となっています。(キュー名はtest02-queueからmm-gae-test02-queueに変更しています)

上の図はまだ実行されていない状態です。
しばらく放置すると以下の図のように再試行数、実行数が増えていきます。

また、タスク部分をクリックすると以下のように再試行の理由も確認でき、意図したとおりHTTP status code 400で失敗していることがわかります。

現状、必ず400を返すようにしていて、成功することはないので、いったんタスクを削除します。

では、リトライの設定を以下の値に変更して、意図したとおりリトライされているか試してみます。

  • 最大試行回数: 10
  • 最小間隔: 10s
  • 最大間隔: 300s
  • 最大倍増回数: 5
  • 最大試行時間: 無制限

以下はタスクの実行ログになります。

意図した間隔で実行されています。
最小間隔を10sに設定したので、まず10秒後に実行されて、次は倍の20秒後、次はさらに倍の40秒後、次もさらに倍の80秒後、・・といったように最大倍増回数の5回まで倍増していきます。
ただ、最大間隔を300sにしているので、倍増しても300s以上にならないといった具合です。

なお、各タスクのログ確認ですが、以下の図のタスクの右端にあるViewをクリックすると上記の図のような対象タスクのログを確認することができるようになっています。

6. Cloud Pub/Subでの非同期処理(GAE -> Cloud Pub/Sub -> GAE)

システム構成は以下の通りです。

GCPのConsole画面からCloud Pub/Subのトピックとサブスクリプションを作成します

mm-gae-test02-topicというトピック名で作成します。

トピックを作成するとmm-gae-test02-topic-subというサブスクリプションも以下のように作成されていました。

ただし配信タイプがpull型になっていたので、push型に変更してエンドポイントを設定します。

エンドポイントURLは上記、構成図のGAE(mm-gae-test02-backapp)に用意するタスク実行用URLを指定します。
今回もCloud Tasksの時と同様、タスク実行するURLのパスはexecute_taskとするので、以下のようなURLを指定します。
https://mm-gae-test02-backapp-dot-xxxxxxxxxxxxxxxx.an.r.appspot.com/execute_task?token=hogehoge
(?token=hogehogeといったようなパラメータを付けて実行させることも可能です)

トピックにメッセージを登録するWEBアプリをGAEで作成します

上記、構成図にあるmm-gae-test02-frontappのGAEの部分です。
ソースコードは以下のようになっています。

■app.yml

service: mm-gae-test02-frontapp
runtime: python38

トピックにメッセージを登録するGAEのサービスとしてmm-gae-test02-frontapp指定しています。

■requirements.txt

Flask==1.1.2
google-cloud-pubsub==2.2.0

google-cloud-pubsub==2.2.0にてCloud Pub/Subのモジュールを利用できるようにします。

■main.py

import json
from flask import Flask, request
from google.cloud import pubsub_v1

app = Flask(__name__)
publisher = pubsub_v1.PublisherClient()

@app.route('/send_task', methods=['GET'])
def send_task():

    project = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    pubsub_topic = 'mm-gae-test02-topic'

    params = {"test_id": 1, "test_msg": "テスト"}
    data = json.dumps(params).encode('utf-8')

    topic_path = publisher.topic_path(project, pubsub_topic)
    publisher.publish(topic_path, data=data)

    return 'OK', 200

17行目で対象のトピックを指定して、18行目で送りたいメッセージを送信しています。
送信するデータは14行目にある{“test_id”: 1, “test_msg”: “テスト”}としています。

サブスクリプションから実行されるAPIをGAEで作成します

上記、構成図にあるmm-gae-test02-backappのGAEの部分です。
ソースコードは以下のようになっています。

■app.yml

service: mm-gae-test02-backapp
runtime: python38

サブスクリプションにて実行されるAPIもGAEで実装するので、サービスとしてmm-gae-test02-backappと指定して、先ほどのタスクを登録するGAEのWEBアプリ(mm-gae-test02-frontapp)とは別のサービスとして作成します。

■requirements.txt

Flask==1.1.2

サブスクリプションから実行されるAPI自体はCloud Pub/Subの操作はしないので、Flaskだけで問題ありません。

■main.py

import json
from flask import Flask, request

app = Flask(__name__)

@app.route('/execute_task', methods=['POST'])
def execute_task():

    if (request.args.get('token', '') != 'hogehoge'):
        return 'Invalid request', 400

    print(request.data.decode('utf-8'))
    return 'OK', 200

Cloud Tasksの時と同様、サブスクリプションから実行されて、意図したデータを受け取れているかの連携確認ができればいいだけなので、受け取ったデータをログで確認できるように出力しているだけです。

実行と確認

トピックにメッセージを登録するURLにアクセス

デプロイしたGAEのサービス(mm-gae-test02-frontapp)に用意したメッセージ登録用URLにアクセスします。
今回もメッセージを登録するパスはsend_taskとしたので、以下のようなURLにアクセスします。
https://mm-gae-test02-frontapp-dot-xxxxxxxxxxxxxxxx.an.r.appspot.com/send_task

メッセージの登録確認

GCPのConsole画面によりメッセージの状況を確認します。
以下の図のようにACT処理されていないメッセージ数が増えて、メッセージが登録されたのがわかります。

ただGAEの設定ミスで、しばらく放置していても、以下の図のようにACK処理されていないメッセージ数が減らず、メッセージが送信されない状態が続いてしまいました。

メッセージが送信できないと、再試行ポリシーで設定したポリシーに沿って再試行が行われ、メッセージ保持期間が経過すると削除されました。
メッセージが送信できなかった原因は単純にGAEにIPアドレス制限を入れていて、IPアドレスが許可されていなかったからでした。
IPアドレスを許可するとACK処理されていないメッセージ数も0になりました。
無事にメッセージを送信することができたので、メッセージを受け取ったかログを確認します。

サブスクリプションからの実行確認

GCPのConsole画面のロギングよりGAE(mm-gae-test02-backapp)のログを確認

送信したメッセージ{“test_id”: 1, “test_msg”: “テスト”}は表示されていませんが、メッセージはjsonPayload.message.dataの部分にbase64encodeされて設定されています。
jsonPayload.message.dataの値(eyJ0ZXN0X2lkIjogMSwgInRlc3RfbXNnIjogIlx1MzBjNlx1MzBiOVx1MzBjOCJ9)をbase64decodeすると意図したメッセージになっていました。

7. Cloud TasksとCloud Pub/Subの違いについて

タスク実行するHTTPエンドポイントを指定する場所の違い

  • Cloud TasksはWEBアプリにてタスク作成時にHTTPエンドポイントを指定する。
  • Cloud Pub/SubはサブスクリプションにHTTPエンドポイントを指定する。

Cloud Tasksの方が、ソースコードにタスク実行するエンドポイント設定をしていて、タスクを実行する側との依存が強く、タスクを実行する側のエンドポイントが変わったら、タスクを登録するWEBアプリも修正しないといけなくなる。
一方、Cloud Pub/Subの場合、WEBアプリはトピックに登録するだけで、あとはサブスクリプションをどう設定するかのみ。
トピックに対して複数のサブスクリプションも登録でき、タスクを実行する側と連携しやすい印象を受けました。

登録されたタスク/メッセージに対してできることの違い

  • Cloud Tasksはタスクの2重実行防止ができたり、リトライ方法など細かい設定ができる。またタスク単位で状況の確認ができたり、タスク単位で削除することができる。
  • Cloud Pub/Subは登録された1つのメッセージに対して個別に状況の確認や削除ができない。リトライもCloud Tasksほど詳細な設定はできない。

登録されたタスク/メッセージに対して細かく管理できるのは、Cloud Tasks。

個人的な印象ではありますが、Cloud Pub/SubはWEBアプリからトピックに登録すればよいだけで、トピックへの登録後に処理された結果とそのWEBアプリ自体が強い結びつきがないようなサービス連携に向いていて、Cloud TasksはWEBアプリから非同期でバックエンドのAPIでデータ更新するいったような、バックエンドで処理された結果、WEBアプリと関連があるデータが更新されるといった機能的に強く結びつているようなサービス連携に向いていると思えました。

最後に、次世代システム研究室では、グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。アプリケーション開発の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集職種一覧からご応募をお願いします。

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事