2017.01.11

GreenplumDB と Hive on tez でデータのロードとクエリを試してみました

こんにちは。次世代システム研究室のデータベース・Hadoop (MySQL/MariaDB/PerconaServer, PostgreSQL, Hive, HBase, etc..) 担当のM.K.です。

今回は以前に書いた「CentOS 7 に Greenplum DB クラスタを構築してみた」の続きで、大規模データのロードとクエリの実行を試してみました。

GreenplumDB だけでなく Hadoop クラスタとも比較してみました。

GreenplumDB にデータをロード

テスト用のデータについて

兎にも角にもまずテストデータの準備ですが、色々探した結果、今回 Kaggle にあげられていた Outbrain 社のクリック予測のための巨大なサンプルデータを使うことにしました。

サンプルデータのうち、以下の2つを利用しました。
  • page_views.csv
  • documents_meta.csv

gpfdist と gpload

では GreenplumDB へのデータロードから行います。
GreenplumDB のロードは gpfdist と gpload という機能を組み合わせて行います。

gpfdist は、ロードを並列処理させるためのバックエンドプロセスを常駐させる機能。

gpload は、gpfdist のバックエンドプロセスと、GreenplumDB の外部テーブルの仕組みを応用し、データを実際にロードする機能(参考: Pivotal ドキュメント「ETL Hosts for Data Loading」)。
外部テーブルの作成を手動で行う必要はありません。gpload が自動的に作成してくれます。

まず、gpfdist コマンドを全セグメントサーバーで実施します。

gpadmin ユーザーで gpssh コマンドを使うととても便利なので利用します。
以下はコマンドのサンプルです。
su - gpadmin
gpssh -f {セグメントサーバーのホスト一覧ファイル} -e 'gpfdist -d {gpfdist用ローカルディレクトリのフルパス} -p 8081 -l {gpfdistログのフルパス} &'
gpfdist 用のローカルディレクトリは予め作成しておきます。
※gpfdist のバックエンドプロセス用に 8081 ポートを当てていますが、別のポートでも構いません。

ロード技術の仕組みについて、少し古い記事ですが、ITpro Activeのこちらの記事「Vol2 世界最高速のデータロード性能を実現するGreenplumのScatter/Gather Streaming技術」が参考になると思います。

通常のテーブルとカラムナのテーブル

続いて、格納先のテーブルを作成します。

GreenplumDB には、通常の heap-storage テーブルと、append-optimized (追記専用)テーブルの2種類あり、追記専用テーブルのときに行指向かカラム指向かを選択できます。

今回は heap-storage テーブルと、append-optimized テーブルを両方使って gpload を試しました。
カラム定義はサンプルデータの内容から定義しています。
append-optimized テーブルはカラム指向にしました。

heap-storage テーブル作成文サンプル
CREATE TABLE page_views (
 uuid           varchar(20),
 document_id    int,
 pv_timestamp   bigint,
 platform       smallint,
 geo_location   varchar(20),
 traffic_source smallint
)
DISTRIBUTED BY (uuid)
;
append-optimized テーブル(カラム指向) サンプル
CREATE TABLE page_views_columnar (
 uuid           varchar(20),
 document_id    int,
 pv_timestamp   bigint,
 platform       smallint,
 geo_location   varchar(20),
 traffic_source smallint
)
WITH (
 APPENDONLY=TRUE,
 ORIENTATION=COLUMN
)
DISTRIBUTED BY (uuid)
;
テーブル定義の詳細は、Pivotal ドキュメント「CREATE TABLE」を参考にしてください。

GreenplumDB のテーブルは DISTRIBUTED BY 句で分散のキーになるカラムの指定が求められます。
上記のサンプルは page_views.csv 用ですが、その中でユニークなキーの uuid を指定しました。

gpload 用のコントロールファイル (yaml)

テーブルを作成したら、今度は gpload 用のコントロールファイルを作成します。
このコントロールファイルは gpload の詳細な内容を記述するものです。yaml 形式で定義します。

ロード用yamlのサンプル(page_views.csv)
VERSION: 1.0.0.1
DATABASE: gpdb
USER: gpadmin
HOST: {マスターのホスト}
PORT: {マスターホストのポート}
GPLOAD:
   INPUT:
    - SOURCE:
         LOCAL_HOSTNAME:
           - {作業ホスト}
         PORT: 8081
         FILE: 
           - {作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)}
    - COLUMNS:
           - uuid: varchar(20)
           - document_id: int
           - pv_timestamp: bigint
           - platform: smallint
           - geo_location: varchar(20)
           - traffic_source: smallint
    - FORMAT: text
    - DELIMITER: ','
    - ESCAPE: 'off'
    - NULL_AS: ''
    - HEADER: true
   OUTPUT:
    - TABLE: {public.page_views or public.page_views_columnar}
    - MODE: insert
   PRELOAD:
    - TRUNCATE: true
今回はサンプルデータにあわせて、引用符なし、エスケープ文字なし、NULLを示す文字なしで、先頭1行がヘッダーの CSV 用に定義しました。
テストのため毎回上書きできるように TRUNCATE オプションも有効にしています。

VERSION を適切に指定しないと、gpload してすぐにエラーが出てしまうので、利用している gpload のバージョンを調べて指定します(今回は1.0.0.1)。

格納先のテーブルごとに OUTPUT の TABLE を変えますので、heap-storage テーブルと、append-optimized テーブル(カラム指向)の2つのコントロールファイルを用意します。

また、サンプルデータは実際の運用では圧縮して使うことが想定されるため、gzip 圧縮してロードを試しました。
gpfdist と gpload は gzip 圧縮したファイルもうまく並列処理してくれます。

gpload のコントロールファイルについては、Pivotal ドキュメント「Control File Format」を参考にしてください。

サーバー構成とスペック

サーバー構成とスペックについては、前回の記事「CentOS 7 に Greenplum DB クラスタを構築してみた」の動作環境にあるとおり、GMO アプリクラウドの以下のスペックが8台です。
マスターが 2 台、セグメントサーバーが 6 台です。
  • 仮想CPU : 4
  • メモリ容量 : 16GB
  • ディスク容量 : 320GB

DBパラメータの設定(shared_buffers)

準備が整ったのでいざロードというところですが、ここで大事なポイントがあります。

ロード性能やクエリ性能に大きく影響するDBパラメータの設定です。

GreenplumDB は PostgreSQL ベースなので、shared_buffers の設定が特に大事になります。
フルチューンするためにはたくさんの設定項目があると思いますが、今回は shared_buffers の設定を変えました。

PostgreSQL のお決まりとして、shared_buffers の設定は Linux の kernel.shmmax と関連します。
また、今回の GreenplumDB 環境はセグメントサーバー 6 台に対して分散 5 の設定をしているため、セグメントサーバー 1 台につき postgre プロセス(インスタンス)が 10 個も起動します。
※primary 用が 5 個、mirror 用が 5 個

shared_buffers の設定は、インスタンス一つずつの設定になるため、サーバースペック、サーバー台数と分散数は十分考えないといけません。

今回構築した環境では、インスタンス 1 個に割り当てる shared_buffers はあまり大きくできないので 256MB にしました。
  • shared_buffers : 125MB -> 256MB
インスタンス一つずつ postgresql.conf の記述を変えるのは大変なので、gpconfig を使うと便利です。
gpconfig -c shared_buffers -v "256MB" -m "128MB"
※ -m オプションはマスターだけ別の設定値に変えてくれます。マスターはデフォルト値とほぼ同じ 128MB にしました。

設定変更後、gpstop と gpstart コマンドを利用して全台再起動しました。

ロード時間

巨大なサンプルデータの page_views.csv を gpload でロードした結果です。
観点対応
SSRFの欠陥は、webアプリケーションがリモートリソースを検証していないユーザー提供のURLから取得するような場合に起こる。
これは、アプリケーションがファイアウォール、VPN、その他ネットワークのアクセスコントロールリスト(ACL)で保護されていても、攻撃者が予期せぬ宛先に改変されたリクエストを送信させることを可能にする。
モダンなウェブアプリケーションはエンドユーザに便利な機能を提供しており、URLからの取得は一般的なシナリオとなっている。結果として、SSRFの発生件数は増加している。またクラウドサービスや複雑なアーキテクチャのためにSSRFの危険度も上昇している。
ネットワークレイヤーから

・SSRFのインパクトを軽減するため、リモートリソースアクセス機能をネットワーク毎にセグメント化する。

・重要なイントラネットのトラフィック以外の全てをブロックするため、ファイアウォールポリシーやネットワークのアクセスコントロールルールを“デフォルト拒否”に強制する。

ヒント:
・アプリケーションに基づいたファイアウォールルールのオーナーシップとライフサイクルを確立する。
・ファイアウォールで、受理またはブロックした全てのネットワークフローをログにとる。(参照:A09 セキュリティロギングとモニタリングの不備)
アプリケーションレイヤーから

・全てのクライアントからの入力値を無害化し検証する

・ホワイトリストでURLスキーマ、ポート、宛先を扱う

・クライアントにrawなレスポンスを送らない

・HTTPリダイレクションを無効化する

・DNSリバインディング攻撃や“time of check, time of use” (TOCTOU) 競合状態を避けるため、URLの一貫性に注意する

・拒否リストや正規表現を使ってSSRFを緩和しようとしない。攻撃者はペイロードリスト、ツール、そしてスキルを駆使してリストを回避する。
検討すべき追加の施策

・フロントシステム上に別のセキュリティ関連のサービスをデプロイしない(例OpenID)。これらのシステムのローカルトラフィックを制御する。

・専用の管理可能なユーザーグループを持つフロントエンドで非常に高い保護ニーズがある場合、独立したシステムでVPNのようなネットワークの暗号化を使うことを検討する。

heap-storage テーブルと、append-optimized テーブル(カラム指向)ではロード時間が約 10 倍も違う結果となりました。

参考値として shared_buffers 変更前の heap-storage テーブル のときと比べると 1.67 倍くらい違いました。

Hadoop / Hive にデータをロード

今度は Hadoop / Hive にデータをロードします。Hadoop クラスタには Hortonworks の HDP2.4 系を利用しました。
HDP ということもあり、Hive はデフォルトの Tez エンジンを使っています(Hive on Tez)。

TEXTFILE (CSV) で読み込んで ORC テーブルに格納

実際の運用を考えると、Hadoop(HDFS)にテキストファイル形式でアップロードするだけではなく、最終的にカラム指向のテーブルに格納することが多いので、Hive の ORC テーブルにデータを入れるまでをロードの作業としました。

テキストファイル形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_text (
 uuid           string,
 document_id    int,
 pv_timestamp   bigint,
 platform       tinyint,
 geo_location   string,
 traffic_source tinyint
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1')
;
ORC 形式の Hive テーブル作成文サンプル
CREATE TABLE page_views_orc (
 uuid           string,
 document_id    int,
 pv_timestamp   bigint,
 platform       tinyint,
 geo_location   string,
 traffic_source tinyint
)
STORED AS ORC
;
最初に、テキストファイル形式 の Hive テーブルへロードします。
Hive の LOAD DATA 文を使いましたが、gzip 圧縮されたサンプルデータをそのまま HDFS にアップロードし、Hive テーブルのメタデータを被せていることとほぼ同じです。
LOAD DATA LOCAL INPATH '{作業ホストにあるロード対象データファイルのフルパス(※gzip圧縮OK)}'
 OVERWRITE INTO TABLE page_views_text
;
次に、ORC 形式のテーブルに INSERT します。
INSERT OVERWRITE TABLE page_views_orc SELECT * FROM page_views_text
;

Hadoop クラスタ構成とスペック

Hadoop クラスタ構成は、Hadoop 各種サービスのマスタープロセス用に(マスターノード) 3 台、各種サービスのデータ格納や分散処理を行う スレーブノードに 5 台を割り当てた構成です。

スレーブノードのサーバースペックは、GMO アプリクラウドの以下のスペックです。

スレーブノードのスペック
  • 仮想CPU : 10
  • メモリ容量 : 60GB
  • ディスク容量 : 800GB
YARN リソースは、default キュー一つでリソースの 100% 利用してテストしました。
参考
yarn.scheduler.capacity.root.capacity=100
yarn.scheduler.capacity.root.queues=default
yarn.scheduler.capacity.root.default.maximum-capacity=100
yarn.scheduler.capacity.root.default.user-limit-factor=1
yarn.scheduler.capacity.root.default.capacity=100

ロードと ORC テーブル格納までの時間

巨大なサンプルデータ の page_views.csv に対して、LOAD DATA と ORC テーブルへ INSERT する時間は以下となりました。
142.02s
146.34s
145.75s
142.44s
158.40s

巨大な gzip 圧縮されたデータファイルをロードして、クエリを投げられる状態にするまでの時間は、だいぶ GreenplumDB に分がある結果となりました。

ただ、Hive は gzip 圧縮された一つのデータファイルに対して一つの mapper しか起動せず、分散処理できません。そのために INSERT がとても遅くなっています。

Hive の実運用では、gzip 圧縮する場合はもっと細かく分割して取り込んだり、分散処理可能な bzip2 圧縮などを検討します。Hive で効率的なロードを行えば 10 倍以上速くなることは十分考えられますが、手間という点では複雑だったり汎用的でなくなるので、やはり GreenplumDB の gpfdist & gpload の方式の方が容易で速い結果となりました。

GreenplumDB と Hive on Tez でのクエリ時間比較

ここまでロードを試してきましたが、次にクエリ性能を比較してみました。

分散処理性能の真価を問うため、テーブル結合と GROUP BY を使ったクエリで比較しています。
実運用でよくあるように複数のカラムへの条件を指定しました。
日付条件は、特に深い意味はないですが、2016年10月の page_views.csv データに絞るようにしています。
※その他のクエリ条件の値の詳細については、Kaggle の Outbrain 社のクリック予測のためのサンプルデータページを見てください。

もう一つのサンプルデータ documents_meta.csv について GreenplumDB は 追記専用でカラム指向テーブルに、Hive は ORC テーブルに格納しました(詳細は割愛)。

GreenplumDB で実行したクエリ

select dm.publisher_id, count(*)
from page_views_columnar pv
inner join documents_meta_columnar dm on pv.document_id = dm.document_id
where
 pv.platform = 2 and
 pv.geo_location like 'US>NY%' and
 pv.traffic_source = 3 and
 pv.pv_timestamp >= extract(epoch from ('2016-10-01 00:00:00' - to_timestamp(1465876800))) and
 pv.pv_timestamp < extract(epoch from ('2016-11-01 00:00:00' - to_timestamp(1465876800))) and
 dm.publish_time >= '2016-06-01 00:00:00' and
 dm.publish_time <= '2016-07-01 00:00:00'
group by dm.publisher_id
order by dm.publisher_id
; 

Hive on Tez で実行したクエリ

select dm.publisher_id, count(*)
from page_views_orc pv
inner join documents_meta_orc dm on pv.document_id = dm.document_id
where
 pv.platform = 2 and
 pv.geo_location like 'US>NY%' and
 pv.traffic_source = 3 and
 pv.pv_timestamp >= unix_timestamp('2016-10-01 00:00:00') - 1465876800 and
 pv.pv_timestamp < unix_timestamp('2016-11-01 00:00:00') - 1465876800 and
 dm.publish_time >= '2016-06-01 00:00:00' and
 dm.publish_time <= '2016-07-01 00:00:00'
group by dm.publisher_id
order by dm.publisher_id
;

Hive パラメータの設定

GreenplumDB のパラメータは前述したとおりですが、Hive もクエリ実行にあたって幾つかチューニングしました。

Hive クエリは以下の set 文でパラメータを変更して実行しました。
set hive.vectorized.execution.enabled = true; --default値
set hive.vectorized.execution.reduce.enabled = true;
set hive.exec.parallel = true;
set hive.exec.parallel.thread.number = 16;
set tez.grouping.min-size = 268435456;
set tez.grouping.max-size = 268435456;

クエリ時間結果

クエリ時間の比較結果は以下となりました。
169.53s
212.32s
178.09s
200.05s
153.45s

今回の環境下では、約 2.4 倍の差が出ています。

結果のまとめ

ロードとクエリ時間の結果をまとめたものがこちらです。
96.98s
134.56s
110.16s
96.61s
128.70s

結論

gzip 圧縮された一つの巨大なデータファイルの扱いに Hadoop / Hive が分が悪いとは言え、今回試した内容での比較結果から、思ったよりも GreenplumDB が良い結果となりました。

大規模データに対する SQL ライクな集計がメインの用途であれば、GreenplumDB はサーバーリソースの対費用効果からいっても Hadoop クラスタより良いケースがありそうです。

逆に Hadoop クラスタは、Spark (特に MLlib) や HBase、Kafka、Solr など SQL 以外の強力なサービスとの統合的な利用・連携に強みがあり、YARN でリソース全体をうまくスケジューリングしてくれるので、用途に応じた使い分けがポイントになりそうです。

次は HAWQ

GreenplumDB を Hadoop に移植した HAWQ というものがあります。

HAWQ は元々 Piovotal 独自の Hadoop クラスタを使っていましたが、Hortonworks の HDP で動くようになり使う敷居が下がりました。

GreenplumDB の結果を受けて、GreenplumDB の良さはのままに Spark などとスムーズに連携できるか次の機会に試してみたいと思います。

最後に

次世代システム研究室では、アプリケーション開発や設計を行うアーキテクトを募集しています。アプリケーション開発者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事