2020.01.10

Hive3のトランザクションを有効にしたテーブルにSpark2を連携してみる~Hive Warehouse Connector検証

こんにちは。次世代システム研究室のデータベース と Hadoop を担当している M.K. です。 前回のブログでHive3の新しくなったACIDトランザクションを試しましたが、今回はそのトランザクションを有効にしたテーブルと、Spark2の連携について検証してみました。実際の運用では多くの場合、HiveとSpark両方を使うことが想定されるためです。

目次

  1. 環境
    1. 環境とデータ
    2. Hadoopクラスタのパラメータ設定
  2. 検証準備
    1. Hive3とSpark2の連携はどうやるのか?
    2. 検証概要
    3. Hive Warehouse Connectorを使う準備
  3. Hive Warehouse Connectorの検証
    1. 先ずPySparkシェルで試す
    2. spark-submitでクラスタモードを試す
    3. チューニング実験
  4. SparkSessionだけの検証
    1. 先ずPySparkシェルで試す
    2. spark-submitでクラスタモードを試す
  5. まとめ
    1. 検証結果
    2. 課題

1. 環境

1. 環境とデータ

前回のブログと全く同じ環境(GMOアプリクラウド&HDP3.0.1)で引き続き検証しました。使用するデータも同じKaggleのアイオワ州のお酒販売のデータです。参考までに、このデータの元のCSVのサイズは約3.2GBで1259万件ほどのレコードがあり、それをHive3のトランザクション管理テーブルに格納(ORCフォーマットに変換)したものを検証に使用しています。

2. Hadoopクラスタのパラメータ設定

肝心のYARNとHiveのパラメータ及びYARNキューの設定は前回のブログと同じ設定です。Sparkについてのパラメータ設定については次の第2章で後述します。

 

2. 検証準備

1. Hive3とSpark2の連携はどうやるのか?

Spark2系になる前までは、SparkContextを使いその上でSQLContextかHiveContextを利用して、Hiveのテーブルに読み書きするのがメインでした。それがSpark2系になると、新たにSparkSessionができてこちらの利用が推奨されるようになりました。

ただ、Hive3のトランザクションが有効になっているテーブルに対しては、そんなに単純な話ではなさそうな感じもします。

実はこうしたHive3のテーブルとの連携用にHive Warehouse Connectorというのが開発されて利用できるようになっていました。そこで今回の検証では、Hive Warehouse Connectorを利用した連携と、SparkSessionだけの連携の二つを行ってみました。

2. 検証概要

お酒販売データのHiveテーブルをSparkのDataframeとして読み込み、日付を1年後の日付となるように加工し別のテーブルにSparkから書き込むということを試しました。これをSparkのクラスタモードで分散処理で行い、Hiveテーブルへの読み書きをHive Warehouse Connectorを使って行うか、Hive Warehouse Connectorを使わないでSpark Sessionだけで行うか、それぞれ試しました。

3. Hive Warehouse Connectorを使う準備

HDP3.0.1環境ではHive Warehouse ConnectorのJARファイルとPythonファイル(PySpark用)が最初から用意されています。検証では、データサイエンティストが実際に使うことを想定して、SparkをPythonで扱うPySparkを用いてSparkをクラスタモードで動かしました(spark-submit)。

/usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar
/usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip

Spark2の設定

Hive Warehouse Connectorを使うには、追加でSpark2のパラメータ設定も必要になります。このあたりはこちらの記事にまとめられているので、一度読んでみてください。

実際に追加したパラメータ設定内容は以下です。ほとんどは他のパラメータの値のコピーになります。

  • Spark2 > CONFIGS > Custom spark2-defaults
    • spark.datasource.hive.warehouse.load.staging.dir
       → /tmp
    • spark.datasource.hive.warehouse.metastoreUri
       → thrift://hdp3-m1.testhdp.com:9083
    • spark.hadoop.hive.llap.daemon.service.hosts
       → @llap0
    • spark.hadoop.hive.zookeeper.quorum
       → hdp3-m1.testhdp.com:2181,hdp3-m2.testhdp.com:2181,hdp3-m3.testhdp.com:2181
    • spark.sql.hive.hiveserver2.jdbc.url
       → jdbc:hive2://hdp3-m1.testhdp.com:2181,hdp3-m2.testhdp.com:2181,hdp3-m3.testhdp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive

上記パラメータを追加したら、Ambariが表示する再起動な必要なコンポーネントを再起動して、パラメータを反映します。

 

HDFS上のディレクトリのパーミッション変更

今回の検証はsparkユーザーで行いました。結論から言うと、hiveユーザー以外ではHive3から通常のテーブルの格納先となった/warehouse/tablespace/managed/hive配下を見ることができなくて、パーミッションエラーで失敗します。自分ももちろんやらかしました。。パーミッションエラーはこういう検証では必ず通る通過儀礼みたいなものですね。

本来はきちんとユーザーとパーミッション管理を設計して行うべきものですが、今回はひとまずchmodして対応しました。sparkユーザーはhiveユーザーと同じhadoopグループに所属しているので、770のパーミッションを付与します。

dfs -chmod 770 /warehouse/tablespace/managed/hive;
dfs -ls /warehouse/tablespace/managed;
+----------------------------------------------------+
|                     DFS Output                     |
+----------------------------------------------------+
| Found 1 items                                      |
| drwxrwx---+  - hive hadoop          0 2018-12-29 21:57 /warehouse/tablespace/managed/hive |
+----------------------------------------------------+

これで準備が整いました。

 

3. Hive Warehouse Connectorの検証

1. 先ずPySparkシェルで試す

この手のものは、最初からクラスタモードで分散処理を行ったりするとハマるので(はい、ハマりました)、まずPySparkシェルで試してみます。PySparkシェルはインタラクティブにPySparkのプログラムを試すことができ、ローカルモードでやってみました。

PySparkシェルをコマンドラインから実施するときに、Hive Warehouse Connectorのファイルを指定します。

pyspark \
 --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
 --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip

PySparkに接続したら、pyspark_llap.sql.sessionをimportして、Hive Warehouse Connectorが使えるようにします。以下のようにしてHiveとセッションを張ります。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1.3.0.1.0-187
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.
>>> from pyspark_llap.sql.session import HiveWarehouseSession
>>> hive = HiveWarehouseSession.session(spark).build()

Hiveとのセッションが確立されたら、お酒販売データのテーブルがあるデータベースに接続して試しに件数を取得してみます。

>>> hive.setDatabase("hivedb")
>>> df = hive.executeQuery("select count(*) from iowa_liquor_sales")
>>> df.show()
19/12/18 18:35:49 WARN TaskSetManager: Stage 0 contains a task of very large size (424 KB). The maximum recommended task size is 100 KB.
+--------+
|     _c0|
+--------+
|12590429|
+--------+

ちゃんとHiveテーブルの件数がPySparkで取れました!あとはこのやり方をクラスタモードで分散処理できるようにすれば良いということになります。

2. spark-submitでクラスタモードを試す

クラスタモードで分散処理を行うには、spark-submitコマンドを使用します。PySparkを使うときもspark-submitコマンドを使用して、–py-filesオプションでPython(PySpark)ファイルを指定するだけです。

検証したいことはHiveテーブルの読み込みだけでなく書き込みもなので、書き込み用のテーブルも作っておきます。

CREATE TABLE pyspark_iowa_liquor_sales (
  invoice_item_number  varchar(50)
 ,sale_date  date
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
)
STORED AS ORC
TBLPROPERTIES (
   'transactional'='true',
   'transactional_properties'='default',
   'NO_AUTO_COMPACTION'='false'
);

読み込み元のテーブルと全く同じ定義で、名前だけ変えたものです。Hiveに接続してテーブル作成しておきます。

次に肝心のPySparkプログラムファイルを作ります。PySparkシェルで試した内容と全く同じように書いたら、最初全然うまくいかずハマりました・・。

色々調べた後で、よくよく見てみたら、SparkSessionをHive Warehouse Connectorに”spark”という名前で引き渡しているのに、SparkSessionについて何も書いてないことに気が付きました・・。

PySparkシェルでは最初からSparkSessionが”spark”という名前で引き渡せるようになっているみたいです。

結局、以下のように書けばHiveとのセッションを確立できました。こちらの記事なども参考にしてみてください。

from pyspark.sql import SparkSession
from pyspark_llap.sql.session import HiveWarehouseSession
spark = (
    SparkSession
    .builder
    .master('yarn')
    .enableHiveSupport()
    .getOrCreate()
)
hive = HiveWarehouseSession.session(spark).build()

繋がりさえすれば、あとはデータをDataframeに読み込んで分散処理して、Hiveテーブルに書き込むところを書き足します。

検証で使ったPythonプログラムは以下のようになりました。

PySparkプログラム(test_main1.py)

from pyspark.sql import SparkSession
from pyspark_llap.sql.session import HiveWarehouseSession
from pyspark.sql.functions import *
spark = (
    SparkSession
    .builder
    .master('yarn')
    .enableHiveSupport()
    .getOrCreate()
)
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("hivedb")

query = """
SELECT * FROM iowa_liquor_sales
"""
df = hive.executeQuery(query)
df = df.select(
    df.invoice_item_number
    , date_add(df.sale_date, 365).alias("sale_date")
    , df.store_number
    , df.store_name
    , df.address
    , df.city
    , df.zip_code
    , df.store_location
    , df.county_number
    , df.county
    , df.category
    , df.category_name
    , df.vendor_number
    , df.vendor_name
    , df.item_number
    , df.item_description
    , df.pack
    , df.bottle_volume
    , df.state_bottle_cost
    , df.state_bottle_retail
    , df.bottles_sold
    , df.sale
    , df.volume_sold_liters
    , df.volume_sold_gallons
)
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "pyspark_iowa_liquor_sales").save()

これを呼び出すspark-submitのシェルスクリプトは以下です。

spark-submitシェルスクリプト(pyspark1.sh)

#!/usr/bin/sh
SPARK_HOME=/usr/hdp/current/spark2-client \

spark-submit \
--master yarn \
--deploy-mode cluster \
--queue default \
--num-executors 1 \
--executor-memory 4G \
--executor-cores 1 \
--driver-memory 2G \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
test_main1.py 2>&1 |tee pyspark1.log

このpyspark1.sh(test_main1.py)をsparkユーザーで実行したところ、格納先のpyspark_iowa_liquor_salesテーブルに日付を加工した状態で無事に書き込めてました!

クラスタモードで分散処理と言っても最初に試したのはexecutor数(num-executors)一つだけです。sparkはこのexecutor数を増やすことで同時に分散処理できる能力が上がります。ただ、各executorに割り当てるメモリサイズ(executor-memory)も大事で、処理性能に影響するだけでなく小さいとメモリ不足で処理が失敗したりします。

Hadoopクラスタのシステムリソース(つまりYARNキュー)との兼ね合いで、num-executorsとexecutor-memoryをどう割り当てるかがチューニングポイントなので、続けてこれらのパラメータを変えたり、PySparkプログラムでDataframeに読み込んだ後のパーティション数(repartition)を変えてみたり、Hive Warehouse Connectorの読み書きで色々とチューニング実験してみました。なお、pyspark1.shの実行にかかった時間は2回平均で4分37秒ほどです。

3. チューニング実験

① spark-submitのnum-executorsを4に

pyspark1.shの内容からspark-submitのnum-executorsだけ1から4に変更して実行したところ、実行時間は2回平均で4分3秒ほどになりました。それほどでもないですが速くなりました。

 

② spark-submitのnum-executorsを4にした上で、Dataframeのrepartition(4)の実施とcache()を利用

上記①の内容に加えて、Dataframeに読み込んだ後のパーティション数をexecutor数にあわせて4に、またcache()による一時キャッシュを利用するようにtest_main1.pyのコードを少し書き換えて試してみました。

このためにPySparkを変更した箇所は以下です。

df = hive.executeQuery(query)
df = df.repartition(4) #追加
df = df.select(
    df.invoice_item_number
    , date_add(df.sale_date, 365).alias("sale_date")
・・・(中略)・・・
)
df = df.cache() #追加
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", "pyspark_iowa_liquor_sales").save()

これで実行したところ、2回の実行時間の平均で6分27秒ほどでした。cache()を使ってもHive Warehouse Connectorを使った書き込みで特に速くなることもなく、返って処理が増えて遅くなった印象です。

 

③ spark-submitのnum-executorsを4にした上で、Dataframeのrepartition(100)の実施とcache()を利用

上記②のrepartition(4)を(100)に変えてやってみたところ、実行時間が2回平均で5分38秒ほどでした。パーティション数を増やすとSparkが処理するタスクが増えますが、このケースでは上記②より速くなりました。ただ、上記①よりだいぶ遅くなっているので余計な処理に時間がかかってしまっている感じです。

 

④ 読み込み元のテーブルをパーティショニング、spark-submitのnum-executorsを4にした上で、Dataframeのrepartition(71)を実施

読み込み元のHiveテーブルについて、日付カラムを元に年と月のカラムを作り、年と月の2階層のパーティションを持つテーブルに変えてみました。その結果、71個のパーティションを持つHiveテーブルになったので、上記②の内容からまずcache()を外し、読み込み元のテーブルをパーティショニングされたテーブルに変更し、Dataframeのrepartition(71)を実施するように変えて試しました。

会話していると時々混同してしまうときがありますが、Hiveテーブルのパーティショニングと、SparkのDataframeのパーティション数は全く違うものなので勘違いに注意です。

以下のHiveQLでパーティショニングされたHiveテーブルを作成しました。

CREATE TABLE iowa_liquor_sales_partitioned (
  invoice_item_number  varchar(50)
 ,sale_date  date
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
)
PARTITIONED BY (sale_year  integer,  sale_month  integer)
STORED AS ORC
TBLPROPERTIES (
   'transactional'='true',
   'transactional_properties'='default',
   'NO_AUTO_COMPACTION'='false'
);
INSERT OVERWRITE TABLE iowa_liquor_sales_partitioned
PARTITION (sale_year, sale_month)
SELECT
  invoice_item_number
 ,sale_date
 ,store_number
 ,store_name
 ,address
 ,city
 ,zip_code
 ,store_location
 ,county_number
 ,county
 ,category
 ,category_name
 ,vendor_number
 ,vendor_name
 ,item_number
 ,item_description
 ,pack
 ,bottle_volume
 ,state_bottle_cost
 ,state_bottle_retail
 ,bottles_sold
 ,sale
 ,volume_sold_liters
 ,volume_sold_gallons
 ,YEAR(sale_date) AS sale_year
 ,MONTH(sale_date) AS sale_month
FROM iowa_liquor_sales
;

実行した結果、2回の実行時間の平均は3分37秒ほどになりました。やっぱり読み込み元のテーブルをパーティショニングするのは大事というのが改めてわかる結果ですね。

 

⑤ パーティショニングしたテーブルを読み込んで、spark-submitのnum-executorsを8、executor-memoryを2Gに変更、Dataframeのrepartition(71)を実施

上記④の内容から、spark-submitのnum-executorを4から8に増やし、逆にexecutor-memoryを4GBから2GBに減らして実行してみました。結果、2回の実行時間の平均は2分47秒ほどでだいぶ速くなりました。

ただ、2Gに減らしたものの、Resource Manager WebUIを見るとどうも2Gのメモリに収まりきらず結局4G使っていて、今回のYARNのキュー設定ではdefaultキューのリソースが足りず、本来なら同時に8個立ち上がってほしいコンテナが5個しか立ち上がってないように見えました。

 

⑥ パーティショニングしたテーブルを読み込んで、spark-submitのnum-executorsを8、executor-memoryを1Gに変更、Dataframeのrepartition(71)を実施

今度は上記⑤の内容から、spark-submitのexecutor-memoryを2GBから1GBに減らしてやってみたところ、実行時間は2回平均で2分5秒ほどになりさらに速くなりました。

 

Hive Warahouse Connectorというよりはspark-submitとrepartitionのチューニングをやった感じですが、Sparkクラスタモードの実践的な内容の分散処理でHive Warehouse Connectorを使っても問題なくHive3のトランザクションを有効にしたテーブルに読み書きできることはわかりました。

 

4. SparkSessionだけの検証

次にHive Warehouse Connectorを使わないでSpark2とHive3が連携できるか検証してみました。

1. 先ずPySparkシェルで試す

とにかく先ずはPySparkシェルでということで色々やってみたのですが、全然接続できなくて最もハマりました・・。調べた挙句、以下のようにHIVE_CONF_DIRの変数とspark.driver.extraClassPathをセットすることで、Hiveへのセッションを確立でき、指定のデータベースに接続することができました(Stack Overflowのこのページを参考)。

export HIVE_CONF_DIR=/usr/hdp/current/hive-client/conf
pyspark \
 --conf "spark.driver.extraClassPath=/usr/hdp/current/hive-client/conf" \
 --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
 --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip
>>> from pyspark.sql import SparkSession
from pyspark_llap.sql.session import HiveWarehouseSession
from pyspark.sql.functions import *
spark = (
    SparkSession
    .builder
    .master('yarn')
    .config('hive.metastore.uris', 'thrift://hdp3-m1.testhdp.com:9083')
    .enableHiveSupport()
    .getOrCreate()
)
>>> spark.catalog.listDatabases()
[Database(name=u'default', description=u'Default Hive database', locationUri=u'hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive'), Database(name=u'hivedb', description=u'', locationUri=u'hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive/hivedb.db'), Database(name=u'information_schema', description=u'', locationUri=u'hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive/information_schema.db'), Database(name=u'sys', description=u'', locationUri=u'hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive/sys.db')]
>>> spark.catalog.setCurrentDatabase("hivedb")

SparkSessionなので当たり前ですがHive Warehouse Connectorとは書き方が色々違います。途中データベースのリストを取得してみましたが、期待通りに取れています。データベースに接続できたところで続いてクエリを投げてみます。

>>> spark.catalog.setCurrentDatabase("hivedb")
>>> query = """
SELECT * FROM iowa_liquor_sales_partitioned
"""
>>> df = spark.sql(query)
Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 716, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table iowa_liquor_sales_partitioned. org.apache.hadoop.security.AccessControlException: Permission denied: user=spark, access=EXECUTE, inode="/warehouse/tablespace/managed/hive/hivedb.db":hive:hadoop:drwxrwx--- 
・・・・・

sparkユーザーでPySparkシェルを呼び出してやったのですが、パーミッションエラーが出てしまいました。/warehouse/tablespace/managed/hiveのパーミッションを変えていてもSparkSessionをこのようなやり方で使うと怒られるようです。ここでは安直ですがsparkユーザーではなくhiveユーザーで行うことにしました。当然の如く、 hiveユーザーで上記を実施したらエラーがなくなりました。

そして次はSparkのクラスタモードを試します。

2. spark-submitでクラスタモードを試す

PySparkシェルで試した内容を元にPySparkプログラムとspark-submitシェルスクリプトを書いたのですが、HIVE_CONF_DIRの変数のセットがクラスタモードの分散処理で上手くいかずに手こずりました。以下のようにspark-submitにspark.yarn.appMasterEnv.HIVE_CONF_DIRを設定することで解決しました。

spark-submitシェルスクリプト(test_main99.py)

#!/usr/bin/sh
SPARK_HOME=/usr/hdp/current/spark2-client
HIVE_CONF_DIR=/usr/hdp/current/hive-client/conf

spark-submit \
--conf "spark.yarn.appMasterEnv.HIVE_CONF_DIR=/usr/hdp/current/hive-client/conf" \
--conf "spark.driver.extraClassPath=/usr/hdp/current/hive-client/conf" \
--master yarn \
--deploy-mode cluster \
--queue default \
--num-executors 8 \
--executor-memory 1G \
--executor-cores 1 \
--driver-memory 2G \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar \
--py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.0.1.0-187.zip \
test_main99.py 2>&1 |tee pyspark14.log

SparkSessionを使ったデータの書き込みは、新たにHiveの外部テーブルを作って試してみることにしました。

CREATE EXTERNAL TABLE pyspark_iowa_liquor_sales_ex_no_trn (
  invoice_item_number  varchar(50)
 ,sale_date  date
 ,store_number  integer
 ,store_name  varchar(100)
 ,address  varchar(100)
 ,city  varchar(100)
 ,zip_code  integer
 ,store_location  string
 ,county_number  integer
 ,county  varchar(50)
 ,category  varchar(50)
 ,category_name  varchar(50)
 ,vendor_number  integer
 ,vendor_name  varchar(50)
 ,item_number  integer
 ,item_description  varchar(100)
 ,pack  integer
 ,bottle_volume  integer
 ,state_bottle_cost  varchar(50)
 ,state_bottle_retail  varchar(50)
 ,bottles_sold  integer
 ,sale  varchar(50)
 ,volume_sold_liters  integer
 ,volume_sold_gallons  integer
)
STORED AS ORC
TBLPROPERTIES (
   'transactional'='false'
);

この外部テーブルにSparkSessionからデータを書き込むようにしたPySparkプログラムが以下です。

PySparkプログラム(pyspark99.sh)

from pyspark.sql import SparkSession
from pyspark_llap.sql.session import HiveWarehouseSession
from pyspark.sql.functions import *
spark = (
    SparkSession
    .builder
    .master('yarn')
    .config('spark.driver.extraClassPath', '/usr/hdp/current/hive-client/conf')
    .config('hive.metastore.uris', 'thrift://hdp3-m1.testhdp.com:9083')
    .config('spark.sql.warehouse.dir', '/warehouse/tablespace/external/hive')
    .enableHiveSupport()
    .getOrCreate()
)
spark.catalog.setCurrentDatabase("hivedb")

query = """
SELECT * FROM iowa_liquor_sales_partitioned
"""
df = spark.sql(query)
df = df.repartition(71)
df = df.select(
    df.invoice_item_number
    , date_add(df.sale_date, 365).alias("sale_date")
    , df.store_number
    , df.store_name
    , df.address
    , df.city
    , df.zip_code
    , df.store_location
    , df.county_number
    , df.county
    , df.category
    , df.category_name
    , df.vendor_number
    , df.vendor_name
    , df.item_number
    , df.item_description
    , df.pack
    , df.bottle_volume
    , df.state_bottle_cost
    , df.state_bottle_retail
    , df.bottles_sold
    , df.sale
    , df.volume_sold_liters
    , df.volume_sold_gallons
)
df.write.format("orc").mode("overwrite").saveAsTable("pyspark_iowa_liquor_sales_ex_no_trn")

pyspark99.sh(test_main99.py)を実行したところ、エラーなく処理が完了しました!

と思ったんですが、、明らかに処理時間が速く確認してみたところ、データがうまく保存できていないばかりか、勝手にテーブル定義が作り替わって(テーブルプロパティが書き替わって)しまうという事象が発生しました。むしろ何故エラーが出てくれないのかという感じです。作り替わってしまったテーブル定義は以下です。恐ろしいのは外部テーブルで作ったのに、Hiveのトランザクション管理テーブルになっているところです。

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `pyspark_iowa_liquor_sales_ex_no_trn`( |
|   `invoice_item_number` varchar(50),               |
|   `sale_date` date,                                |
|   `store_number` int,                              |
|   `store_name` varchar(100),                       |
|   `address` varchar(100),                          |
|   `city` varchar(100),                             |
|   `zip_code` int,                                  |
|   `store_location` string,                         |
|   `county_number` int,                             |
|   `county` varchar(50),                            |
|   `category` varchar(50),                          |
|   `category_name` varchar(50),                     |
|   `vendor_number` int,                             |
|   `vendor_name` varchar(50),                       |
|   `item_number` int,                               |
|   `item_description` varchar(100),                 |
|   `pack` int,                                      |
|   `bottle_volume` int,                             |
|   `state_bottle_cost` varchar(50),                 |
|   `state_bottle_retail` varchar(50),               |
|   `bottles_sold` int,                              |
|   `sale` varchar(50),                              |
|   `volume_sold_liters` int,                        |
|   `volume_sold_gallons` int)                       |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'      |
| WITH SERDEPROPERTIES (                             |
|   'path'='hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive/hivedb.db/pyspark_iowa_liquor_sales_ex_no_trn')  |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' |
| LOCATION                                           |
|   'hdfs://hdp3-m1.testhdp.com:8020/warehouse/tablespace/managed/hive/hivedb.db/pyspark_iowa_liquor_sales_ex_no_trn' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'spark.sql.create.version'='2.3.1.3.0.1.0-187',  |
|   'spark.sql.sources.provider'='orc',              |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"invoice_item_number","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"sale_date","type":"date","nullable":true,"metadata":{}},{"name":"store_number","type":"integer","nullable":true,"metadata":{}},{"name":"store_name","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(100)"}},{"name":"address","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(100)"}},{"name":"city","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(100)"}},{"name":"zip_code","type":"integer","nullable":true,"metadata":{}},{"name":"store_location","type":"string","nullable":true,"metadata":{}},{"name":"county_number","type":"integer","nullable":true,"metadata":{}},{"name":"county","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"category","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"category_name","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"vendor_number","type":"integer","nullable":true,"metadata":{}},{"name":"vendor_name","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"item_number","type":"integer","nullable":true,"metadata":{}},{"name":"item_description","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(100)"}},{"name":"pack","type":"integer","nullable":true,"metadata":{}},{"name":"bottle_volume","type":"integer","nullable":true,"metadata":{}},{"name":"state_bottle_cost","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"state_bottle_retail","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"bottles_sold","type":"integer","nullable":true,"metadata":{}},{"name":"sale","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"varchar(50)"}},{"name":"volume_sold_liters","type":"integer","nullable":true,"metadata":{}},{"name":"volume_sold_gallons","type":"integer","nullable":true,"metadata":{}}]}',  |
|   'transactional'='true',                          |
|   'transient_lastDdlTime'='1578482063')            |
+----------------------------------------------------+

明らかに今回のやり方は接続できたものの良くないと言えます。もっと適切なやり方がもしかしたらある(もしくは今後対応する?)かもしれませんが、現時点ではSparkSessionのみでHive3のトランザクション管理テーブルに読み書きするのは難しそうです。

 

5. まとめ

1. 検証結果

今回行った検証からは、Hive3のトランザクションを有効にしたテーブルをSpark2で読み書きするときはHive Warehouse Connectorを利用すべし、ということになりそうです。Hive Warehouse Connectorを使ってもSparkクラスタモードでの分散処理およびHiveテーブルへの読み書きは問題なくできました。

それにしてもHive3からは今までのHiveとは全く別物のようになり、トランザクションやCompaction、Hive Warehouse Connectorを使ったSpark連携など、新しい部分を理解してシステムを見直さないといけないため、それまでのHadoopクラスタからは簡単には移行できなそうです。もちろん、Hive3は魅力的な改善もありますけど。

2. 課題

残る課題としては、

  • Hive3のトランザクションを一切使わない(全て外部テーブルの)場合にSpark2とどのように連携するか?
  • Cloudera社とHortonworks社が合併し、新しいHadoopクラスタパッケージのCDPではHiveとSparkの連携はどうなっていくか?

このあたりはまたいつか試してみたいと思います。

 

最後に




次世代システム研究室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。


 


皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事