2017.01.11
GreenplumDB と Hive on tez でデータのロードとクエリを試してみました
こんにちは。次世代システム研究室のデータベース・Hadoop (MySQL/MariaDB/PerconaServer, PostgreSQL, Hive, HBase, etc..) 担当のM.K.です。
今回は以前に書いた「CentOS 7 に Greenplum DB クラスタを構築してみた」の続きで、大規模データのロードとクエリの実行を試してみました。
GreenplumDB だけでなく Hadoop クラスタとも比較してみました。
GreenplumDB にデータをロード
テスト用のデータについて
兎にも角にもまずテストデータの準備ですが、色々探した結果、今回 Kaggle にあげられていた Outbrain 社のクリック予測のための巨大なサンプルデータを使うことにしました。
サンプルデータのうち、以下の2つを利用しました。
- page_views.csv
- documents_meta.csv
gpfdist と gpload
では GreenplumDB へのデータロードから行います。
GreenplumDB のロードは gpfdist と gpload という機能を組み合わせて行います。
gpfdist は、ロードを並列処理させるためのバックエンドプロセスを常駐させる機能。
gpload は、gpfdist のバックエンドプロセスと、GreenplumDB の外部テーブルの仕組みを応用し、データを実際にロードする機能(参考: Pivotal ドキュメント「ETL Hosts for Data Loading」)。
外部テーブルの作成を手動で行う必要はありません。gpload が自動的に作成してくれます。
まず、gpfdist コマンドを全セグメントサーバーで実施します。
gpadmin ユーザーで gpssh コマンドを使うととても便利なので利用します。
以下はコマンドのサンプルです。
su - gpadmin gpssh -f {セグメントサーバーのホスト一覧ファイル} -e 'gpfdist -d {gpfdist用ローカルディレクトリのフルパス} -p 8081 -l {gpfdistログのフルパス} &'
gpfdist 用のローカルディレクトリは予め作成しておきます。
※gpfdist のバックエンドプロセス用に 8081 ポートを当てていますが、別のポートでも構いません。
ロード技術の仕組みについて、少し古い記事ですが、ITpro Activeのこちらの記事「Vol2 世界最高速のデータロード性能を実現するGreenplumのScatter/Gather Streaming技術」が参考になると思います。
通常のテーブルとカラムナのテーブル
続いて、格納先のテーブルを作成します。
GreenplumDB には、通常の heap-storage テーブルと、append-optimized (追記専用)テーブルの2種類あり、追記専用テーブルのときに行指向かカラム指向かを選択できます。
今回は heap-storage テーブルと、append-optimized テーブルを両方使って gpload を試しました。
カラム定義はサンプルデータの内容から定義しています。
append-optimized テーブルはカラム指向にしました。
heap-storage テーブル作成文サンプル
CREATE TABLE page_views ( uuid varchar(20), document_id int, pv_timestamp bigint, platform smallint, geo_location varchar(20), traffic_source smallint ) DISTRIBUTED BY (uuid) ;
append-optimized テーブル(カラム指向) サンプル
CREATE TABLE page_views_columnar ( uuid varchar(20), document_id int, pv_timestamp bigint, platform smallint, geo_location varchar(20), traffic_source smallint ) WITH ( APPENDONLY=TRUE, ORIENTATION=COLUMN ) DISTRIBUTED BY (uuid) ;
テーブル定義の詳細は、Pivotal ドキュメント「CREATE TABLE」を参考にしてください。
GreenplumDB のテーブルは DISTRIBUTED BY 句で分散のキーになるカラムの指定が求められます。
上記のサンプルは page_views.csv 用ですが、その中でユニークなキーの uuid を指定しました。
gpload 用のコントロールファイル (yaml)
テーブルを作成したら、今度は gpload 用のコントロールファイルを作成します。
このコントロールファイルは gpload の詳細な内容を記述するものです。yaml 形式で定義します。
ロード用yamlのサンプル(page_views.csv)
VERSION: 1.0.0.1 DATABASE: gpdb USER: gpadmin HOST: {マスターのホスト} PORT: {マスターホストのポート} GPLOAD: INPUT: - SOURCE: LOCAL_HOSTNAME: - {作業ホスト} PORT: 8081 FILE: - {作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)} - COLUMNS: - uuid: varchar(20) - document_id: int - pv_timestamp: bigint - platform: smallint - geo_location: varchar(20) - traffic_source: smallint - FORMAT: text - DELIMITER: ',' - ESCAPE: 'off' - NULL_AS: '' - HEADER: true OUTPUT: - TABLE: {public.page_views or public.page_views_columnar} - MODE: insert PRELOAD: - TRUNCATE: true
今回はサンプルデータにあわせて、引用符なし、エスケープ文字なし、NULLを示す文字なしで、先頭1行がヘッダーの CSV 用に定義しました。
テストのため毎回上書きできるように TRUNCATE オプションも有効にしています。
VERSION を適切に指定しないと、gpload してすぐにエラーが出てしまうので、利用している gpload のバージョンを調べて指定します(今回は1.0.0.1)。
格納先のテーブルごとに OUTPUT の TABLE を変えますので、heap-storage テーブルと、append-optimized テーブル(カラム指向)の2つのコントロールファイルを用意します。
また、サンプルデータは実際の運用では圧縮して使うことが想定されるため、gzip 圧縮してロードを試しました。
gpfdist と gpload は gzip 圧縮したファイルもうまく並列処理してくれます。
gpload のコントロールファイルについては、Pivotal ドキュメント「Control File Format」を参考にしてください。
サーバー構成とスペック
サーバー構成とスペックについては、前回の記事「CentOS 7 に Greenplum DB クラスタを構築してみた」の動作環境にあるとおり、GMO アプリクラウドの以下のスペックが8台です。
マスターが 2 台、セグメントサーバーが 6 台です。
- 仮想CPU : 4
- メモリ容量 : 16GB
- ディスク容量 : 320GB
DBパラメータの設定(shared_buffers)
準備が整ったのでいざロードというところですが、ここで大事なポイントがあります。
ロード性能やクエリ性能に大きく影響するDBパラメータの設定です。
GreenplumDB は PostgreSQL ベースなので、shared_buffers の設定が特に大事になります。
フルチューンするためにはたくさんの設定項目があると思いますが、今回は shared_buffers の設定を変えました。
PostgreSQL のお決まりとして、shared_buffers の設定は Linux の kernel.shmmax と関連します。
また、今回の GreenplumDB 環境はセグメントサーバー 6 台に対して分散 5 の設定をしているため、セグメントサーバー 1 台につき postgre プロセス(インスタンス)が 10 個も起動します。
※primary 用が 5 個、mirror 用が 5 個
shared_buffers の設定は、インスタンス一つずつの設定になるため、サーバースペック、サーバー台数と分散数は十分考えないといけません。
今回構築した環境では、インスタンス 1 個に割り当てる shared_buffers はあまり大きくできないので 256MB にしました。
- shared_buffers : 125MB -> 256MB
インスタンス一つずつ postgresql.conf の記述を変えるのは大変なので、gpconfig を使うと便利です。
gpconfig -c shared_buffers -v "256MB" -m "128MB"
※ -m オプションはマスターだけ別の設定値に変えてくれます。マスターはデフォルト値とほぼ同じ 128MB にしました。
設定変更後、gpstop と gpstart コマンドを利用して全台再起動しました。
ロード時間
巨大なサンプルデータの page_views.csv を gpload でロードした結果です。
[table id=23 /]
heap-storage テーブルと、append-optimized テーブル(カラム指向)ではロード時間が約 10 倍も違う結果となりました。
参考値として shared_buffers 変更前の heap-storage テーブル のときと比べると 1.67 倍くらい違いました。
Hadoop / Hive にデータをロード
今度は Hadoop / Hive にデータをロードします。Hadoop クラスタには Hortonworks の HDP2.4 系を利用しました。
HDP ということもあり、Hive はデフォルトの Tez エンジンを使っています(Hive on Tez)。
TEXTFILE (CSV) で読み込んで ORC テーブルに格納
実際の運用を考えると、Hadoop(HDFS)にテキストファイル形式でアップロードするだけではなく、最終的にカラム指向のテーブルに格納することが多いので、Hive の ORC テーブルにデータを入れるまでをロードの作業としました。
テキストファイル形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_text ( uuid string, document_id int, pv_timestamp bigint, platform tinyint, geo_location string, traffic_source tinyint ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE TBLPROPERTIES ('skip.header.line.count'='1') ;
ORC 形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_orc ( uuid string, document_id int, pv_timestamp bigint, platform tinyint, geo_location string, traffic_source tinyint ) STORED AS ORC ;
最初に、テキストファイル形式 の Hive テーブルへロードします。
Hive の LOAD DATA 文を使いましたが、gzip 圧縮されたサンプルデータをそのまま HDFS にアップロードし、Hive テーブルのメタデータを被せていることとほぼ同じです。
LOAD DATA LOCAL INPATH '{作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)}' OVERWRITE INTO TABLE page_views_text ;
次に、ORC 形式のテーブルに INSERT します。
INSERT OVERWRITE TABLE page_views_orc SELECT * FROM page_views_text ;
Hadoop クラスタ構成とスペック
Hadoop クラスタ構成は、Hadoop 各種サービスのマスタープロセス用に(マスターノード) 3 台、各種サービスのデータ格納や分散処理を行う スレーブノードに 5 台を割り当てた構成です。
スレーブノードのサーバースペックは、GMO アプリクラウドの以下のスペックです。
スレーブノードのスペック
- 仮想CPU : 10
- メモリ容量 : 60GB
- ディスク容量 : 800GB
YARN リソースは、default キュー一つでリソースの 100% 利用してテストしました。
参考
yarn.scheduler.capacity.root.capacity=100 yarn.scheduler.capacity.root.queues=default yarn.scheduler.capacity.root.default.maximum-capacity=100 yarn.scheduler.capacity.root.default.user-limit-factor=1 yarn.scheduler.capacity.root.default.capacity=100
ロードと ORC テーブル格納までの時間
巨大なサンプルデータ の page_views.csv に対して、LOAD DATA と ORC テーブルへ INSERT する時間は以下となりました。
[table id=24 /]
巨大な gzip 圧縮されたデータファイルをロードして、クエリを投げられる状態にするまでの時間は、だいぶ GreenplumDB に分がある結果となりました。
ただ、Hive は gzip 圧縮された一つのデータファイルに対して一つの mapper しか起動せず、分散処理できません。そのために INSERT がとても遅くなっています。
Hive の実運用では、gzip 圧縮する場合はもっと細かく分割して取り込んだり、分散処理可能な bzip2 圧縮などを検討します。Hive で効率的なロードを行えば 10 倍以上速くなることは十分考えられますが、手間という点では複雑だったり汎用的でなくなるので、やはり GreenplumDB の gpfdist & gpload の方式の方が容易で速い結果となりました。
GreenplumDB と Hive on Tez でのクエリ時間比較
ここまでロードを試してきましたが、次にクエリ性能を比較してみました。
分散処理性能の真価を問うため、テーブル結合と GROUP BY を使ったクエリで比較しています。
実運用でよくあるように複数のカラムへの条件を指定しました。
日付条件は、特に深い意味はないですが、2016年10月の page_views.csv データに絞るようにしています。
※その他のクエリ条件の値の詳細については、Kaggle の Outbrain 社のクリック予測のためのサンプルデータページを見てください。
もう一つのサンプルデータ documents_meta.csv について GreenplumDB は 追記専用でカラム指向テーブルに、Hive は ORC テーブルに格納しました(詳細は割愛)。
GreenplumDB で実行したクエリ
select dm.publisher_id, count(*) from page_views_columnar pv inner join documents_meta_columnar dm on pv.document_id = dm.document_id where pv.platform = 2 and pv.geo_location like 'US>NY%' and pv.traffic_source = 3 and pv.pv_timestamp >= extract(epoch from ('2016-10-01 00:00:00' - to_timestamp(1465876800))) and pv.pv_timestamp < extract(epoch from ('2016-11-01 00:00:00' - to_timestamp(1465876800))) and dm.publish_time >= '2016-06-01 00:00:00' and dm.publish_time <= '2016-07-01 00:00:00' group by dm.publisher_id order by dm.publisher_id ;
Hive on Tez で実行したクエリ
select dm.publisher_id, count(*) from page_views_orc pv inner join documents_meta_orc dm on pv.document_id = dm.document_id where pv.platform = 2 and pv.geo_location like 'US>NY%' and pv.traffic_source = 3 and pv.pv_timestamp >= unix_timestamp('2016-10-01 00:00:00') - 1465876800 and pv.pv_timestamp < unix_timestamp('2016-11-01 00:00:00') - 1465876800 and dm.publish_time >= '2016-06-01 00:00:00' and dm.publish_time <= '2016-07-01 00:00:00' group by dm.publisher_id order by dm.publisher_id ;
Hive パラメータの設定
GreenplumDB のパラメータは前述したとおりですが、Hive もクエリ実行にあたって幾つかチューニングしました。
Hive クエリは以下の set 文でパラメータを変更して実行しました。
set hive.vectorized.execution.enabled = true; --default値 set hive.vectorized.execution.reduce.enabled = true; set hive.exec.parallel = true; set hive.exec.parallel.thread.number = 16; set tez.grouping.min-size = 268435456; set tez.grouping.max-size = 268435456;
クエリ時間結果
クエリ時間の比較結果は以下となりました。
[table id=25 /]
今回の環境下では、約 2.4 倍の差が出ています。
結果のまとめ
ロードとクエリ時間の結果をまとめたものがこちらです。
[table id=26 /]
結論
gzip 圧縮された一つの巨大なデータファイルの扱いに Hadoop / Hive が分が悪いとは言え、今回試した内容での比較結果から、思ったよりも GreenplumDB が良い結果となりました。
大規模データに対する SQL ライクな集計がメインの用途であれば、GreenplumDB はサーバーリソースの対費用効果からいっても Hadoop クラスタより良いケースがありそうです。
逆に Hadoop クラスタは、Spark (特に MLlib) や HBase、Kafka、Solr など SQL 以外の強力なサービスとの統合的な利用・連携に強みがあり、YARN でリソース全体をうまくスケジューリングしてくれるので、用途に応じた使い分けがポイントになりそうです。
次は HAWQ
GreenplumDB を Hadoop に移植した HAWQ というものがあります。
HAWQ は元々 Piovotal 独自の Hadoop クラスタを使っていましたが、Hortonworks の HDP で動くようになり使う敷居が下がりました。
GreenplumDB の結果を受けて、GreenplumDB の良さはのままに Spark などとスムーズに連携できるか次の機会に試してみたいと思います。
最後に
次世代システム研究室では、アプリケーション開発や設計を行うアーキテクトを募集しています。アプリケーション開発者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。
皆さんのご応募をお待ちしています。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD