2020.07.01
Kudu+Spark+Impalaを手元で試す環境を構築する
こんにちは。次世代システム研究室のS.T.です。
前回に引き続き,大規模データ関連ネタです。今回は,分散データストア「Kudu」を主軸に,SparkとImpalaを使ったアプリケーションの構成を簡単に実験してみたいと思います。
この記事でやること
Kudu(分散ストレージ)+Spark(分散処理フレームワーク)+Impala(クエリエンジン)を使ったシステムの開発・検証に使えるような環境をローカルに構築し,各コンポーネント間で連携ができることを確認します。
- ローカル上のVMにKudu,Spark,Impalaをセットアップ(自前でビルド)
- Impalaとの連携の確認
- Sparkからテーブルの更新
Kuduとは
分散データストアと言って真っ先に思い浮かぶのはHDFSだと思います。Hiveメタストアを使うことで表形式のデータを扱うことができ,クエリエンジンでSQLを使うこともできます。
Kuduもまた同じように表形式でデータを扱うデータストアですが,HDFSとか「読み書きするデータ」の狙うところが異なります。
HDFSでは大きなデータをまとめて格納・読み出しをすることを主眼に置いており,そのようなユースケースで高いパフォーマンスを発揮します。
一方,Kuduは「大量のデータをリアルタイムに更新する」ことを狙っています。HDFSではリアルタイムに書き込まれるログを随時分析するというような使い方は難しいですが,Kuduはこのような利用方法を念頭に置いた設計・実装になっており,短い間隔でのバッチ処理や人の手による直近のデータを用いた分析が可能になっています。
もちろん一長一短なので,KuduはHDFSに比べて単純なバルクリード・ライトの性能は劣りますし,リアルタイム性ではHBaseに劣ります。良いとこどりとも中途半端とも言えるアプリケーションですが,デメリットを許容できるのであれば,非常に力を発揮するのではないでしょうか。よく例として紹介される「データをリアルタイムに更新し,直近のデータを人間が分析する」というようなユースケースでは,まさにうってつけです。
具体的なアーキテクチャなどは私がここに書き記すよりも,Cloudera社の資料が非常に詳細に書かれているので,そちらを参照されるのがよろしいかと思います
構築手順
環境
今回の検証では以下の環境を使用しました。Kuduはメタデータやレプリケーション,各ノードの状態を管理するMaster serverと,実データを管理するTablet serverから構成されるので,VMも2台準備します。
- ホストマシン:CPU AMD Ryzen 7 1700X / RAM 32GB / SATA SSD
- VM
- hdp01:kudu-master用 2 Core 8GB
- hdp02:kudu-tablet用 2 Core 8GB
- imp01:Impala用 2 Core 8GB
各VMはhdp01,hdp02,imp01をhostsに追加しておきます。
Kuduのセットアップ
Kuduは公式にビルド手順が記載されている
コマンドや必要なパッケージは公式を参照して最新の情報を使うのが良いと思うので,ここには概要だけ記しておきます。ビルドはmasterサーバで行いました。
-
- 必要なパッケージをyumで入れる
- (古いCentOS用の手順はスキップ)
- (NVMサポートは使わないためスキップ)
- (ドキュメントのビルドは行わないためスキップ)
- git clone
- ツールセットのインストールスクリプトを実行 enable_devtoolset.sh(LLVMのビルドにかなり時間がかかります)
- make!!(結構時間かかります)
- DESTDIRを/opt/kuduに設定してmake install
- /opt/kudu/bin,/opt/kudu/sbinにパスを通す
これでKuduのビルド,インストールは完了です。あとはコピーするなりディスクを繋ぎ変えるなりして,Tabletサーバにもインストールをします。
以下のコマンドでmaster,tabletを起動します。
hdp01$ mkdir -p /opt/{data,meta,wal,log} hdp01$ kudu-master --fs_data_dirs /opt/kudu/data/ --fs_metadata_dir /opt/kudu/meta/ --fs_wal_dir /opt/kudu/wal/ --log_dir /opt/kudu/log/ hdp02$ mkdir -p /opt/{data,meta,wal,log} hdp02$ kudu-tserver --fs_data_dirs /opt/kudu/data/ --fs_metadata_dir /opt/kudu/meta/ --fs_wal_dir /opt/kudu/wal/ --log_dir /opt/kudu/log/ --tserver_master_addrs hdp01:7051
動作チェックのために,テーブル作成・データ投入・テーブル削除の一連の流れをテストする付属のサンプルアプリケーションを実行します。今回はMaster,Tabletともに1ノードなので,レプリケーションを1に設定するために,若干コードを修正します(デフォルトは3なので,そのまま起動すると失敗する)。
example/cpp/example.ccの100行目付近でテーブルを作成しているので,num_replicasを1にしておきます。
// Set the schema and range partition columns. KuduTableCreator* table_creator = client->NewTableCreator(); table_creator->table_name(table_name) .schema(&schema) .num_replicas(1) .set_range_partition_columns(column_names);
修正が終わったら,ビルドして実行します。今回はkudu-masterがhdp01なので,引数にhdp01を指定します。
cd example/cpp make ./example hdp01
うまく動くと,テーブル作成,スキーマ変更,データ挿入,データ参照,テーブル削除の一連の流れをテストしてくれます。動かない場合は,各サーバでKuduが起動しているかどうか,hostsにhdp01が指定されているかどうかを確認してください。
Impalaのセットアップ
Impalaは公式のビルド手順
起動前にKUDU_MASTER_HOSTSを設定しておくことで,付属のものではなく今回ビルドして構築したKuduを使うことができます。
export KUDU_MASTER_HOSTS=hdp01
起動したらimpala-shellに入り,kudu上にテーブルを作ってみます。この場合もTABLEPROPERTIESでreplicasを1にしないと作成できません。
-- テーブル作成 [imp01:21000] default> CREATE TABLE hello_kudu > ( > id BIGINT, > name STRING, > PRIMARY KEY(id) > ) > STORED AS KUDU > TBLPROPERTIES('kudu.num_tablet_replicas' = '1'); Query: CREATE TABLE hello_kudu ( id BIGINT, name STRING, PRIMARY KEY(id) ) STORED AS KUDU TBLPROPERTIES('kudu.num_tablet_replicas' = '1') +-------------------------+ | summary | +-------------------------+ | Table has been created. | +-------------------------+ WARNINGS: Unpartitioned Kudu tables are inefficient for large data sizes. Fetched 1 row(s) in 4.64s -- データ投入 [imp01:21000] default> INSERT INTO hello_kudu VALUES(1, 'Alice'); Query: INSERT INTO hello_kudu VALUES(1, 'Alice') Query submitted at: 2020-06-30 14:04:49 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=1f4162beef225671:0ed5a29400000000 Modified 1 row(s), 0 row error(s) in 5.27s [imp01:21000] default> INSERT INTO hello_kudu VALUES(2, 'Bob'); Query: INSERT INTO hello_kudu VALUES(2, 'Bob') Query submitted at: 2020-06-30 14:05:00 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=3c47b1acabb2c5d8:ba21edc500000000 Modified 1 row(s), 0 row error(s) in 0.11s -- 確認 [imp01:21000] default> SELECT * FROM hello_kudu ORDER BY id; Query: SELECT * FROM hello_kudu ORDER BY id Query submitted at: 2020-06-30 14:05:15 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=0e47a46a94ca4f45:8385e57e00000000 +----+-------+ | id | name | +----+-------+ | 1 | Alice | | 2 | Bob | +----+-------+ Fetched 2 row(s) in 2.12s
うまくいったみたいです。
Sparkのセットアップ
今回はシングルノードでお試しするだけであるため,アーカイブをダウンロード,展開するだけで準備完了です。今回はhdp02上に配置して,1台構成で
検証時はまだSpark 3.0.0がリリースされていなかったため,2.4.6を使用しています。Spark 3.0.0からKuduのVARCHARやDATETIMEといった新しくサポートされた型に対応しているので,新しい機能を試す場合は3.0.0を使ってみてください。
yum install java wget https://ftp.riken.jp/net/apache/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz tar xvfz spark-2.4.6-bin-hadoop2.7.tgz mv spark-2.4.6-bin-hadoop2.7 /opt/spark export SPARK_HOME=/opt/spark export PATH=$SPARK_HOME/bin:$PATH spark-shell --version
Sparkからテーブルの更新をしてImpalaから確認する
今回は,Sparkでデータを投入(更新)し,Impalaで人間(または接続しているBIツール)が分析を行うという想定で,SparkからKudu上のテーブルのデータを更新してみます。
といっても,今回はSpark Streamingを使って……といった凝ったことはせず,単純にデーブルをDataFrameとして取り出して,加工した新しいDataFrameで更新をかけるのみの,単純にプログラムで実験してみます。
まずはImpalaでテーブルを作成します。ちょっとだけ実際の状況に近づけるため,HASH PARTITIONを使ってidカラムで16パーティションに切っていますが,今回はそんなにデータは入れません。
[imp01:21000] default> CREATE TABLE kudu_count ( id BIGINT, name STRING, count BIGINT, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); Query: CREATE TABLE kudu_count ( id BIGINT, name STRING, count BIGINT, PRIMARY KEY(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1') +-------------------------+ | summary | +-------------------------+ | Table has been created. | +-------------------------+ Fetched 1 row(s) in 4.20s
初期データとして,いくつかデータを入れておきます。
[imp01:21000] default> INSERT INTO kudu_count VALUES(1, 'TEST001', 0); Query: insert into kudu_count values(1, 'TEST001', 0) Query submitted at: 2020-06-30 08:50:59 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=28450889313a1d7e:6d52757e00000000 Modified 1 row(s), 0 row error(s) in 1.09s [imp01:21000] default> INSERT INTO kudu_count VALUES(2, 'TEST002', 10); Query: insert into kudu_count values(2, 'TEST002', 10) Query submitted at: 2020-06-30 08:51:07 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=3f4a2febdc16feb6:ab34278f00000000 Modified 1 row(s), 0 row error(s) in 0.11s .... [imp01:21000] default> SELECT * FROM kudu_count ORDER BY id; Query: select * from kudu_count order by id Query submitted at: 2020-06-30 08:51:43 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=8245e18a42a77d90:752785fb00000000 +----+---------+-------+ | id | name | count | +----+---------+-------+ | 1 | TEST001 | 0 | | 2 | TEST002 | 10 | | 3 | TEST003 | 50 | | 4 | TEST004 | 100 | | 5 | TEST005 | 200 | +----+---------+-------+ Fetched 5 row(s) in 1.22s
準備ができました。RDBMSではSELECTする際にORDER BYを指定しないとPRIMARY KEY順に自動でソートされるものもありますが,Kudu+Impalaの場合は明示的に指定しないと順番はバラバラになります。
次に,Sparkからこのデータを更新してみます。今回はお手軽にspark-shellを使ってローカル1台構成で実行してみます。以下のコマンドでKuduのパッケージを指定したしてspark-shellを起動します。
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.10.0
うまく起動したら,下記のコードを貼り付けて実行してみます。
import org.apache.kudu.spark.kudu._ import org.apache.kudu.client._ val df = spark.read.options(Map("kudu.master" -> "hdp01:7051", "kudu.table" -> "impala::default.kudu_count")).kudu val u = udf(((x: Long) => x + 1)) val new_df = df.withColumn("count", u(col("count"))) val kuduContext = new KuduContext("hdp01:7051", spark.sparkContext) kuduContext.updateRows(new_df, "impala::default.kudu_count")
KuduのテーブルからDataFrameに読み込み,UDFを適用してcountをインクリメントしたDataFrameを作成し,それを使ってKuduのテーブルを更新します。無事に実行できれば成功しているはずです。
最後にImpalaから確認をしてみます。
[imp01:21000] default> SELECT * FROM kudu_count ORDER BY id; Query: select * from kudu_count order by id Query submitted at: 2020-06-30 10:32:05 (Coordinator: http://imp01:25000) Query progress can be monitored at: http://imp01:25000/query_plan?query_id=2942a821e2dd057b:0d4e4a2b00000000 +----+---------+-------+ | id | name | count | +----+---------+-------+ | 1 | TEST001 | 1 | | 2 | TEST002 | 11 | | 3 | TEST003 | 51 | | 4 | TEST004 | 101 | | 5 | TEST005 | 201 | +----+---------+-------+ Fetched 5 row(s) in 4.59s
更新されました!
ローカルのVMに無理に押し込んでいるのでパフォーマンスは良くないですが,Kudu上のテーブルをSparkで操作してImpalaで確認する一連の流れを試すことができました。
所感とまとめ
Kuduはインストールガイドがしっかりとメンテナンスされており,特に詰まるところなくビルドすることができました(幾度となくストレージ不足に陥りましたが。。)。しかし,KuduもImpalaもビルドする手順は「テストのため」といった記述があるため,プロダクション利用ではHadoopディストリビューションの関連コンポーネントとしてインストールすることを推奨しているのでしょうか。Kuduは,Kuduそのもののパラメータのチューニングというよりも,テーブルの設計やパーティショニングが肝になってくるため,ディストリビューションを用いていても大規模なオンライン処理で高いパフォーマンスを発揮するためには(他のコンポーネント同様)内部の仕組みやアーキテクチャの知識をある程度身につける必要があるように感じます。
今回はローカルに環境を準備してみたよ,という程度の内容であるため,SparkからKudu上のテーブルを操作するうまみがあまり感じられませんが,ストリーミングデータの処理などを行うと,スナップショットなどが効いてきて強みが活かせるのではないかと思います。このあたりは引き続き検証が必要です。
せっかく環境を準備したので,次回はこの環境を使って,ストリーミングデータを処理するアプリケーションを作ってみたいと思います。
最後に
次世代システム研究室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。
皆さんのご応募をお待ちしています。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD