2023.07.06

Composer ローカル開発CLIツールを使ってみた

こんにちは。次世代システム研究室のM.Mです。

現在担当している案件では、Google Cloud Composerを利用しており、主にBigQueryと連携したデータ分析用途ではありますが、バッチサーバーと連携して、バッチサーバー上のスクリプトを依存関係を維持しつつ実行するといったこともしています。
ただ、私自身はGoogle Cloud Composerの構築にあまり関わっておらず、出来上がった環境をルールに従って利用しているだけの状況になっていたため、Cloud Composerの理解を深めるため、ローカル環境にComposerを構築していろいろ試せないかと考えておりました。
そこで、Cloud Composerのドキュメントを確認すると、

という資料があり、試してみることにしました。

今回は、Composerローカル開発CLIツールにてComposer環境の作成、他のサーバー上のスクリプト実行を試してみます。

1. Composer ローカル開発CLIツールでローカルComposer環境構築

前提

以下が利用できる環境であること
  • Python(私は3.9を使いました)
  • Docker
  • git
  • google-cloud-sdk

注意点

GCPと連携するための事前準備として、以下のコマンドを実行します。
gcloud init
gcloud auth application-default login
実行すると、~/.config/gcloud/application_default_credentials.jsonが作成されるので、以下のように権限を変更します。
chmod +r ~/.config/gcloud/application_default_credentials.json
この権限設定をしておかないと、BigQueryと連携するようなDAGを作ると実行時に以下のエラーがでます。
PermissionError: [Errno 13] Permission denied: '/home/airflow/.config/gcloud/application_default_credentials.json'
では、Composer ローカル開発CLIツールのインストールに進みます。

Composer ローカル開発CLIツールのインストール

作業ディレクトリはどこでもよいですが、今回は検証なので、以下のインストール手順はrootユーザーで/root配下にcloudcomposerディレクトリを作成して、
/root/cloudcomposer配下で作業を行っています。

1. Composer ローカル開発CLIツールのソースコードを取得します

cd /root/cloudcomposer/
git clone https://github.com/GoogleCloudPlatform/composer-local-dev.git

2. 必要となるモジュールをインストールします

cd composer-local-dev/
pip install .

3. インストールするComposerのバージョンを確認します

cd /root/cloudcomposer
composer-dev list-available-versions --include-past-releases --limit 20
実行すると以下のようなバージョンの一覧が表示されます。
  Image version                 │ Release Date  
╶───────────────────────────────┼──────────────╴
  composer-2.3.2-airflow-2.5.1  │ 26/06/2023    
  composer-2.3.2-airflow-2.4.3  │ 26/06/2023    
  composer-2.3.1-airflow-2.5.1  │ 21/06/2023    
  ...
  composer-2.1.12-airflow-2.4.3 │ 17/04/2023    
  composer-2.1.12-airflow-2.3.4 │ 17/04/2023    
  composer-2.1.11-airflow-2.4.3 │ 31/03/2023    
表示された一覧から、インストールするComposerのバージョンを決めます。

4. インストールしたいバージョンを指定してComposer環境を作成します

composer-dev create --from-image-version composer-2.1.11-airflow-2.4.3 mmtestcomposer
(今回は、composer-2.1.11-airflow-2.4.3を選択し、mmtestcomposerという名前で生成しました。)
実行すると/root/cloudcomposer/composer/mmtestcomposerにcomposer環境が作られます。

5. 作成したComposer環境を起動します

composer-dev start mmtestcomposer
Composerのdockerコンテナが起動します。
そして起動が完了すると、以下のようなメッセージが表示されます。
  ...
  Started mmtestcomposer environment.

  1. You can put your DAGs in /root/cloudcomposer/composer/mmtestcomposer/dags
  2. Access Airflow at http://localhost:8080

6.Composer環境のGUI画面を確認します

http://localhost:8080にアクセスすると以下の図のような画面が表示されます。
まだDAGを登録していないので、DAGs一覧には何も表示されていない状態になっています。
起動時に出力されたメッセージの通り、
/root/cloudcomposer/composer/mmtestcomposer/dagsにDAGファイルを配置すれば、自動で読み込まれ一覧に表示されるようになります。

これでローカルのComposer環境ができあがりました。
続いて、DAGファイルを作成して、
/root/cloudcomposer/composer/mmtestcomposer/dags配下に配置していくことになりますが、まずDAGファイルを作成する前に、すでにローカルのComposer環境で利用できるようになっているPythonパッケージを確認しておくとよいと思います。
記載がないPythonパッケージを利用したい場合は、
/root/cloudcomposer/composer/mmtestcomposer/requirements.txtに追加して再起動すれば利用できるようになります。

では、ローカルComposer環境を利用していきたいと思います。

2. sshで他のサーバーのスクリプトを実行

sshで対象サーバーに接続できるようにConnectionsの設定を行います

以下の図にようAdmin -> Connectionsを選択します。
すると、以下の図のようにConnectionの一覧が表示されます。
上記画面の左上にある+ボタンをクリックして、今回接続するサーバーへの新しい接続を作成します。
Connection Idにはどのサーバーに接続するか分かるような名前になっていればよいと思います。
Connection TypeにはSSHを選択します。
Hostには接続するサーバーのIP、Usernameにはサーバーへ接続するユーザー、Extraには指定したユーザーが利用する秘密鍵を設定します。

Testボタンを押して接続確認を行い保存します。
そして、再度Connectionsの一覧を見ると登録したssh-dev-batchが追加されていることが確認できます。
ただ、以下の図のようにis Extra Encryptedという項目がFalseになっているのが気なります。

Securing Connectionsの設定をする

1. cryptoパッケージが必要になるので、以下のようにcryptoパッケージを利用できるようにします

/root/cloudcomposer/composer/mmtestcomposer/requirements.txtに以下を追加します。
apache-airflow[crypto]==2.4.3
ローカルComposerを再起動します。

2. fernet_keyを生成します

まず、ローカルComposer環境にアクセスします。ローカルComposer環境はDockerコンテナなので、以下のようにアクセスできます。
docker exec -it composer-local-dev-mmtestcomposer /bin/sh
続いて以下のようにpythonを使ってfernet_keyを生成します。
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode())
実行するとfernet_keyが生成されるのでメモしてDockerコンテナからログアウトします。

3. 変数設定を行います。

/root/cloudcomposer/composer/mmtestcomposer/variables.envに上記にてメモしたfernet_keyを設定します。
AIRFLOW__CORE__FERNET_KEY={上記にてメモしたfernet_keyの値}
ローカルComposerを再起動し、Connectionsの設定を再度確認します。
すると以下の図のようにis Extra EncryptedがTrueに変わりました。
これで、接続設定が終わりました。

ただ、編集画面に秘密鍵の値がそのまま見えているのが気になります。
connectionsを利用しないで、秘密鍵をGCPのSecret Managerに登録し、DAGファイル内の実装にて、Secret Managerに登録された秘密鍵を取得するような実装もできるようですが、DAGファイルの実装は極力シンプルにしたいので、今回は上記の設定にて進めます。

では、作成した接続設定を利用してSSH接続して、コマンドを実行するDAGファイル作成に進みます。

DAGファイルを作成する

今回は、適当にsshtest.pyというファイル名で以下のようなDAGファイルを作成しました。
import datetime

import airflow
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

with airflow.DAG(
        'mmtest_sshtest',
        start_date=datetime.datetime(2023, 6, 29)
        ) as dag:

    ssh_task = SSHOperator(
        task_id='mmtest_sshtest_task',
        ssh_hook=SSHHook(
            ssh_conn_id='ssh-dev-batch'
        ),
        command='echo "COMPOSER SSH TEST" > /tmp/sshtest.log',
        dag=dag)
SSH接続して他のサーバー上でコマンドを実行するには、SSHOperator、SSHHookを利用すれば可能です。
ここで、SSHHookのssh_conn_idに上記にて作成した接続設定(ssh-dev-batch)を指定します。
また上記では定期的に実行するバッチを想定してstart_dateを設定しています。
start_dateが過去日になっている状態で実行すると過去分も含めて実行されるので注意が必要です。
/root/cloudcomposer/composer/mmtestcomposer/dagsに配置すると自動で読み込まれ、以下の図のように一覧に表示されます。
ではActionsの下にある実行ボタンをクリックして手動で実行します。
実行が完了し成功しました。
ssh-dev-batchに設定したサーバー上で、17行目のコマンドが実行され/tmp/sshtest.logにCOMPOSER SSH TESTと出力もされていました。

これで、SSHで他のサーバー上のコマンドを実行できることが確認できました。

3. 依存のあるスクリプトを実行

まず出力内容が異なるだけの以下3つのスクリプトtest_a.sh、test_b.sh、test_c.shを用意します。

test_a.sh

#!/bin/sh

echo "TEST A"
exit 0

test_b.sh

#!/bin/sh

echo "TEST B"
exit 0

test_c.sh

#!/bin/sh

echo "TEST C"
exit 0
出力内容がTEST A, TEST B, TEST Cになっているだけの違いです。
今回は適当に/usr/local/bin/mmtest/配下に作りました。
続いて、以下のように依存関係を設定したDAGファイルを作成します。
import datetime

import airflow
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

with airflow.DAG(
        'mmtest_sshtest_sometasks',
        start_date=datetime.datetime(2023, 6, 29)
        ) as dag:

    start = DummyOperator(
        task_id='start'
    )

    ssh_task_a = SSHOperator(
        task_id='mmtest_sshtest_task_a',
        ssh_hook=SSHHook(
            ssh_conn_id='ssh-dev-batch'
        ),
        command='/usr/local/bin/mmtest/test_a.sh >> /tmp/mmtest.log'
    )

    ssh_task_b = SSHOperator(
        task_id='mmtest_sshtest_task_b',
        ssh_hook=SSHHook(
            ssh_conn_id='ssh-dev-batch'
        ),
        command='/usr/local/bin/mmtest/test_b.sh >> /tmp/mmtest.log',
    )

    ssh_task_c = SSHOperator(
        task_id='mmtest_sshtest_task_c',
        ssh_hook=SSHHook(
            ssh_conn_id='ssh-dev-batch'
        ),
        command='/usr/local/bin/mmtest/test_c.sh >> /tmp/mmtest.log',
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> [ssh_task_a, ssh_task_b] >> ssh_task_c >> end
13行目と41行目にDummyOperatorが設定されていますが、これは何も実行されません。
依存関係をグラフで確認する際に見やすくするために設定しています。
17行目でtest_a.sh、25行目でtest_b.sh、33行目でtest_c.shを実行するSSHOperatorを定義しています。
そして45行目にて依存関係の設定をしています。
ssh_task_cはssh_task_a、ssh_task_bの両方が完了していないと実行されないようになっています。

上記DAGファイルを配置して、対象のDAGの情報を見ると、以下の図のような依存関係が分かるグラフを確認することができます。

続いて、手動実行して実行結果を確認します。
以下の図のように各タスクの実行結果が分かります。またここでは記載しませんが、他にも実行ログなどの詳細を確認することもできます。

本当に依存関係が保たれて実行されるのか?
わざとスクリプトエラーを発生させて確認、および再実行させます。

4. スクリプトエラー時の再実行

では、意図的にtest_b.shをエラーになるようにして実行します。
test_b.shを以下のように変更します。

test_b.sh

#!/bin/sh

echo "TEST B"
exit 1
※exit 0だったものをexit 1にするだけです。
実行すると、以下の図のように、mmtest_sshtest_task_b(test_b.sh)が赤くなり、後続は実行されていない状況になります。
mmtest_sshtest_task_a(test_a.sh)は成功しているが、mmtest_sshtest_task_b(test_b.sh)は失敗しており、後続のmmtest_sshtest_task_c(test_c.sh)、endは実行されていないことが分かります。
各スクリプトが出力する/tmp/mmtest.logにも、TEST Cは出力されておらず、依存関係が無視されて、mmtest_sshtest_task_c(test_c.sh)が実行されていないことも確認できました。

では、意図的にエラーにしたtest_b.shを元に戻して、test_b.shから再実行させます。
エラーになっているタスクをクリックすると以下の図のようなダイアログが表示されます。
上記図にあるClearボタンをクリックするとエラーになった個所から再実行することができます。
Clearボタンから再実行させると以下の図のようにmmtest_sshtest_task_b(test_b.sh)から実行が開始します。
途中から再実行するのがClearボタンというのは分かりにくいと思いましたが、エラーが発生した個所から画面から再実行できるのはありがたいです。

5. まとめ

Google Cloud Composerはフルマネージドサービスなので、利用状況に関わらず料金がかかってしまいます。
そのため、本格的に利用する前の検証段階で、いきなりGoogle Cloud Composerを利用するのは難しいと思っていましたが、Composerローカル開発CLIツールにて簡単にローカル環境にて検証できるのはありがたいと思いました。
実際はデータ分析としてBigQueryと連携するような用途での利用が多いと思いますが、私は主にサーバーサイド開発・運用を担当しているので、今回はsshで他のサーバーのスクリプト実行を試しました。
特に画面上で実行状況の確認や、エラー時に再実行できるのは運用をするエンジニアにとっては便利なのではないかと思いました。
最近は外部サービスと連携するような処理も多く、外部依存でスクリプトがエラーになるケースもあると思います。
そのような場合、単純にスクリプトを再実行すればよいだけかもしれませんが、いちいちサーバーに入って手動で再実行とか面倒ですし、外出していてすぐに再実行できないこともありますしね。

最後に、次世代システム研究室では、グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。アプリケーション開発の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集職種一覧からご応募をお願いします。

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事