2017.01.11

GreenplumDB と Hive on tez でデータのロードとクエリを試してみました


こんにちは。次世代システム研究室のデータベース・Hadoop (MySQL/MariaDB/PerconaServer, PostgreSQL, Hive, HBase, etc..) 担当のM.K.です。

今回は以前に書いた「CentOS 7 に Greenplum DB クラスタを構築してみた」の続きで、大規模データのロードとクエリの実行を試してみました。

GreenplumDB だけでなく Hadoop クラスタとも比較してみました。

GreenplumDB にデータをロード


テスト用のデータについて


兎にも角にもまずテストデータの準備ですが、色々探した結果、今回 Kaggle にあげられていた Outbrain 社のクリック予測のための巨大なサンプルデータを使うことにしました。

サンプルデータのうち、以下の2つを利用しました。
  • page_views.csv
  • documents_meta.csv

gpfdist と gpload


では GreenplumDB へのデータロードから行います。
GreenplumDB のロードは gpfdist と gpload という機能を組み合わせて行います。

gpfdist は、ロードを並列処理させるためのバックエンドプロセスを常駐させる機能。

gpload は、gpfdist のバックエンドプロセスと、GreenplumDB の外部テーブルの仕組みを応用し、データを実際にロードする機能(参考: Pivotal ドキュメント「ETL Hosts for Data Loading」)。
外部テーブルの作成を手動で行う必要はありません。gpload が自動的に作成してくれます。

まず、gpfdist コマンドを全セグメントサーバーで実施します。

gpadmin ユーザーで gpssh コマンドを使うととても便利なので利用します。
以下はコマンドのサンプルです。
su - gpadmin
gpssh -f {セグメントサーバーのホスト一覧ファイル} -e 'gpfdist -d {gpfdist用ローカルディレクトリのフルパス} -p 8081 -l {gpfdistログのフルパス} &'
gpfdist 用のローカルディレクトリは予め作成しておきます。
※gpfdist のバックエンドプロセス用に 8081 ポートを当てていますが、別のポートでも構いません。

ロード技術の仕組みについて、少し古い記事ですが、ITpro Activeのこちらの記事「Vol2 世界最高速のデータロード性能を実現するGreenplumのScatter/Gather Streaming技術」が参考になると思います。

通常のテーブルとカラムナのテーブル


続いて、格納先のテーブルを作成します。

GreenplumDB には、通常の heap-storage テーブルと、append-optimized (追記専用)テーブルの2種類あり、追記専用テーブルのときに行指向かカラム指向かを選択できます。

今回は heap-storage テーブルと、append-optimized テーブルを両方使って gpload を試しました。
カラム定義はサンプルデータの内容から定義しています。
append-optimized テーブルはカラム指向にしました。

heap-storage テーブル作成文サンプル
CREATE TABLE page_views (
 uuid           varchar(20),
 document_id    int,
 pv_timestamp   bigint,
 platform       smallint,
 geo_location   varchar(20),
 traffic_source smallint
)
DISTRIBUTED BY (uuid)
;

append-optimized テーブル(カラム指向) サンプル
CREATE TABLE page_views_columnar (
 uuid           varchar(20),
 document_id    int,
 pv_timestamp   bigint,
 platform       smallint,
 geo_location   varchar(20),
 traffic_source smallint
)
WITH (
 APPENDONLY=TRUE,
 ORIENTATION=COLUMN
)
DISTRIBUTED BY (uuid)
;

テーブル定義の詳細は、Pivotal ドキュメント「CREATE TABLE」を参考にしてください。

GreenplumDB のテーブルは DISTRIBUTED BY 句で分散のキーになるカラムの指定が求められます。
上記のサンプルは page_views.csv 用ですが、その中でユニークなキーの uuid を指定しました。

gpload 用のコントロールファイル (yaml)


テーブルを作成したら、今度は gpload 用のコントロールファイルを作成します。
このコントロールファイルは gpload の詳細な内容を記述するものです。yaml 形式で定義します。

ロード用yamlのサンプル(page_views.csv)
VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: {マスターのホスト}
PORT: {マスターホストのポート}
GPLOAD:
   INPUT:
    - SOURCE:
         LOCAL_HOSTNAME:
           - {作業ホスト}
         PORT: 8081
         FILE: 
           - {作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)}
    - COLUMNS:
           - uuid: varchar(20)
           - document_id: int
           - pv_timestamp: bigint
           - platform: smallint
           - geo_location: varchar(20)
           - traffic_source: smallint
    - FORMAT: text
    - DELIMITER: ','
    - ESCAPE: 'off'
    - NULL_AS: ''
    - HEADER: true
   OUTPUT:
    - TABLE: {public.page_views or public.page_views_columnar}
    - MODE: insert
   PRELOAD:
    - TRUNCATE: true

今回はサンプルデータにあわせて、引用符なし、エスケープ文字なし、NULLを示す文字なしで、先頭1行がヘッダーの CSV 用に定義しました。
テストのため毎回上書きできるように TRUNCATE オプションも有効にしています。

VERSION を適切に指定しないと、gpload してすぐにエラーが出てしまうので、利用している gpload のバージョンを調べて指定します(今回は1.0.0.1)。

格納先のテーブルごとに OUTPUT の TABLE を変えますので、heap-storage テーブルと、append-optimized テーブル(カラム指向)の2つのコントロールファイルを用意します。

また、サンプルデータは実際の運用では圧縮して使うことが想定されるため、gzip 圧縮してロードを試しました。
gpfdist と gpload は gzip 圧縮したファイルもうまく並列処理してくれます。

gpload のコントロールファイルについては、Pivotal ドキュメント「Control File Format」を参考にしてください。

サーバー構成とスペック


サーバー構成とスペックについては、前回の記事「CentOS 7 に Greenplum DB クラスタを構築してみた」の動作環境にあるとおり、GMO アプリクラウドの以下のスペックが8台です。
マスターが 2 台、セグメントサーバーが 6 台です。
  • 仮想CPU : 4
  • メモリ容量 : 16GB
  • ディスク容量 : 320GB

DBパラメータの設定(shared_buffers)


準備が整ったのでいざロードというところですが、ここで大事なポイントがあります。

ロード性能やクエリ性能に大きく影響するDBパラメータの設定です。

GreenplumDB は PostgreSQL ベースなので、shared_buffers の設定が特に大事になります。
フルチューンするためにはたくさんの設定項目があると思いますが、今回は shared_buffers の設定を変えました。

PostgreSQL のお決まりとして、shared_buffers の設定は Linux の kernel.shmmax と関連します。
また、今回の GreenplumDB 環境はセグメントサーバー 6 台に対して分散 5 の設定をしているため、セグメントサーバー 1 台につき postgre プロセス(インスタンス)が 10 個も起動します。
※primary 用が 5 個、mirror 用が 5 個

shared_buffers の設定は、インスタンス一つずつの設定になるため、サーバースペック、サーバー台数と分散数は十分考えないといけません。

今回構築した環境では、インスタンス 1 個に割り当てる shared_buffers はあまり大きくできないので 256MB にしました。
  • shared_buffers : 125MB -> 256MB

インスタンス一つずつ postgresql.conf の記述を変えるのは大変なので、gpconfig を使うと便利です。
gpconfig -c shared_buffers -v "256MB" -m "128MB"
※ -m オプションはマスターだけ別の設定値に変えてくれます。マスターはデフォルト値とほぼ同じ 128MB にしました。

設定変更後、gpstop と gpstart コマンドを利用して全台再起動しました。

ロード時間


巨大なサンプルデータの page_views.csv を gpload でロードした結果です。
処理時間heap-storageappend-optimized
(カラム指向)
(参考)shared_buffers
変更前のheap-storage
1回目11313秒1151秒18370秒
2回目10644秒1191秒
平均10979秒1171秒

heap-storage テーブルと、append-optimized テーブル(カラム指向)ではロード時間が約 10 倍も違う結果となりました。

参考値として shared_buffers 変更前の heap-storage テーブル のときと比べると 1.67 倍くらい違いました。


Hadoop / Hive にデータをロード


今度は Hadoop / Hive にデータをロードします。Hadoop クラスタには Hortonworks の HDP2.4 系を利用しました。
HDP ということもあり、Hive はデフォルトの Tez エンジンを使っています(Hive on Tez)。

TEXTFILE (CSV) で読み込んで ORC テーブルに格納


実際の運用を考えると、Hadoop(HDFS)にテキストファイル形式でアップロードするだけではなく、最終的にカラム指向のテーブルに格納することが多いので、Hive の ORC テーブルにデータを入れるまでをロードの作業としました。

テキストファイル形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_text (
 uuid           string,
 document_id    int,
 pv_timestamp   bigint,
 platform       tinyint,
 geo_location   string,
 traffic_source tinyint
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1')
;

ORC 形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_orc (
 uuid           string,
 document_id    int,
 pv_timestamp   bigint,
 platform       tinyint,
 geo_location   string,
 traffic_source tinyint
)
STORED AS ORC
;

最初に、テキストファイル形式 の Hive テーブルへロードします。
Hive の LOAD DATA 文を使いましたが、gzip 圧縮されたサンプルデータをそのまま HDFS にアップロードし、Hive テーブルのメタデータを被せていることとほぼ同じです。
LOAD DATA LOCAL INPATH '{作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)}'
 OVERWRITE INTO TABLE page_views_text
;

次に、ORC 形式のテーブルに INSERT します。
INSERT OVERWRITE TABLE page_views_orc SELECT * FROM page_views_text
;


Hadoop クラスタ構成とスペック


Hadoop クラスタ構成は、Hadoop 各種サービスのマスタープロセス用に(マスターノード) 3 台、各種サービスのデータ格納や分散処理を行う スレーブノードに 5 台を割り当てた構成です。

スレーブノードのサーバースペックは、GMO アプリクラウドの以下のスペックです。

スレーブノードのスペック
  • 仮想CPU : 10
  • メモリ容量 : 60GB
  • ディスク容量 : 800GB

YARN リソースは、default キュー一つでリソースの 100% 利用してテストしました。
参考
yarn.scheduler.capacity.root.capacity=100
yarn.scheduler.capacity.root.queues=default
yarn.scheduler.capacity.root.default.maximum-capacity=100
yarn.scheduler.capacity.root.default.user-limit-factor=1
yarn.scheduler.capacity.root.default.capacity=100

ロードと ORC テーブル格納までの時間


巨大なサンプルデータ の page_views.csv に対して、LOAD DATA と ORC テーブルへ INSERT する時間は以下となりました。
処理時間LOAD DATAINSERT合計
1回目1865秒10726秒12591秒
2回目1868秒10936秒12804秒
平均1867秒10831秒12698秒

巨大な gzip 圧縮されたデータファイルをロードして、クエリを投げられる状態にするまでの時間は、だいぶ GreenplumDB に分がある結果となりました。

ただ、Hive は gzip 圧縮された一つのデータファイルに対して一つの mapper しか起動せず、分散処理できません。そのために INSERT がとても遅くなっています。

Hive の実運用では、gzip 圧縮する場合はもっと細かく分割して取り込んだり、分散処理可能な bzip2 圧縮などを検討します。Hive で効率的なロードを行えば 10 倍以上速くなることは十分考えられますが、手間という点では複雑だったり汎用的でなくなるので、やはり GreenplumDB の gpfdist & gpload の方式の方が容易で速い結果となりました。

GreenplumDB と Hive on Tez でのクエリ時間比較


ここまでロードを試してきましたが、次にクエリ性能を比較してみました。

分散処理性能の真価を問うため、テーブル結合と GROUP BY を使ったクエリで比較しています。
実運用でよくあるように複数のカラムへの条件を指定しました。
日付条件は、特に深い意味はないですが、2016年10月の page_views.csv データに絞るようにしています。
※その他のクエリ条件の値の詳細については、Kaggle の Outbrain 社のクリック予測のためのサンプルデータページを見てください。

もう一つのサンプルデータ documents_meta.csv について GreenplumDB は 追記専用でカラム指向テーブルに、Hive は ORC テーブルに格納しました(詳細は割愛)。

GreenplumDB で実行したクエリ


select dm.publisher_id, count(*)
from page_views_columnar pv
inner join documents_meta_columnar dm on pv.document_id = dm.document_id
where
 pv.platform = 2 and
 pv.geo_location like 'US>NY%' and
 pv.traffic_source = 3 and
 pv.pv_timestamp >= extract(epoch from ('2016-10-01 00:00:00' - to_timestamp(1465876800))) and
 pv.pv_timestamp < extract(epoch from ('2016-11-01 00:00:00' - to_timestamp(1465876800))) and
 dm.publish_time >= '2016-06-01 00:00:00' and
 dm.publish_time <= '2016-07-01 00:00:00'
group by dm.publisher_id
order by dm.publisher_id
; 

Hive on Tez で実行したクエリ


select dm.publisher_id, count(*)
from page_views_orc pv
inner join documents_meta_orc dm on pv.document_id = dm.document_id
where
 pv.platform = 2 and
 pv.geo_location like 'US>NY%' and
 pv.traffic_source = 3 and
 pv.pv_timestamp >= unix_timestamp('2016-10-01 00:00:00') - 1465876800 and
 pv.pv_timestamp < unix_timestamp('2016-11-01 00:00:00') - 1465876800 and
 dm.publish_time >= '2016-06-01 00:00:00' and
 dm.publish_time <= '2016-07-01 00:00:00'
group by dm.publisher_id
order by dm.publisher_id
;

Hive パラメータの設定


GreenplumDB のパラメータは前述したとおりですが、Hive もクエリ実行にあたって幾つかチューニングしました。

Hive クエリは以下の set 文でパラメータを変更して実行しました。
set hive.vectorized.execution.enabled = true; --default値
set hive.vectorized.execution.reduce.enabled = true;
set hive.exec.parallel = true;
set hive.exec.parallel.thread.number = 16;
set tez.grouping.min-size = 268435456;
set tez.grouping.max-size = 268435456;

クエリ時間結果


クエリ時間の比較結果は以下となりました。
処理時間GreenplumDB
(カラム指向)
Hive on Tez
1回目97秒254秒
2回目95秒212秒
平均96秒233秒

今回の環境下では、約 2.4 倍の差が出ています。

結果のまとめ


ロードとクエリ時間の結果をまとめたものがこちらです。
処理時間GreenplumDBHadoop / Hive備考
ロード1171秒12698秒Hadoop / Hive
(LOAD DATA 1867秒)
(INSERT 10831秒)
クエリ96秒233秒


結論


gzip 圧縮された一つの巨大なデータファイルの扱いに Hadoop / Hive が分が悪いとは言え、今回試した内容での比較結果から、思ったよりも GreenplumDB が良い結果となりました。

大規模データに対する SQL ライクな集計がメインの用途であれば、GreenplumDB はサーバーリソースの対費用効果からいっても Hadoop クラスタより良いケースがありそうです。

逆に Hadoop クラスタは、Spark (特に MLlib) や HBase、Kafka、Solr など SQL 以外の強力なサービスとの統合的な利用・連携に強みがあり、YARN でリソース全体をうまくスケジューリングしてくれるので、用途に応じた使い分けがポイントになりそうです。

次は HAWQ


GreenplumDB を Hadoop に移植した HAWQ というものがあります。

HAWQ は元々 Piovotal 独自の Hadoop クラスタを使っていましたが、Hortonworks の HDP で動くようになり使う敷居が下がりました。

GreenplumDB の結果を受けて、GreenplumDB の良さはのままに Spark などとスムーズに連携できるか次の機会に試してみたいと思います。

最後に


次世代システム研究室では、アプリケーション開発や設計を行うアーキテクトを募集しています。アプリケーション開発者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

皆さんのご応募をお待ちしています。