2020.10.07

Hadoop上でSpark+Delta Lakeを検証してみた~続き

Pocket

こんにちは。次世代システム研究室のデータベース と Hadoop を担当している M.K. です。

今回は前回行ったApache Spark3とDelta Lakeの検証の続きを行いました。テーブル定義情報のHiveメタストアとの連携や、データロード(ストリーミング)、簡単なトランザクションなどを検証しました。

目次

  1. テーブル定義情報の取り扱い
    1. Delta Lakeのメタ情報と制限
    2. Hiveメタストアとの連携
      1. Hiveと連携可能なSparkを再インストール
      2. Hiveメタストアで使うRDBの構築
      3. mysql-connectorの準備
      4. hive-site.xmlの作成
      5. Hiveメタストアと連携するSparkセッションの張り方
  2. データロード
    1. CSVデータの準備
    2. ストリーミングRead/Write
  3. 簡単なトランザクション検証
    1. 同じ行を同時にUPDATEした場合
  4. 大量データへのクエリ
    1. パーティションキーでの絞り込み
    2. テーブル結合したクエリ速度
  5. まとめ
    1. Delta Lake+Sparkは使えそうか
    2. 本番環境でデータ解析基盤として使うために

 

1. テーブル定義情報の取り扱い

1. Delta Lakeのメタ情報と制限

Delta Lake自身でもメタ情報を管理していますが、SparkSQLからSQLを使おうとするとそのままではちょっと困ったことがありました。

SparkSQLからCREATE TABLE文を投げてテーブルを作ってもそのSparkセッション内では問題ないんですが、セッションを閉じて開き直したり別のSparkセッションからSparkSQLを使うと実はそのテーブルは見つかりません。

 

以下のようにDelta Lakeのデータがあるパスをロードしてデータフレームに読み込むのは当然問題ないですが、SparkSQLがすぐに扱えないのは困るところ。

 

df = spark.read.format("delta").load("/apps/spark/warehouse/events_p")

 

データフレームに読み込んだ後で、次のように一時ビューを作ればテーブルとして扱えるので一応対応はできますが、Sparkセッションを開くたびに毎回これを行っていては全然使い勝手が良くありません。

 

df.createOrReplaceTempView("events_p")

 

そのため、セッションを跨いでも何もしなくてもすぐにSparkSQLからテーブルが使える方法はないか探してみました。

 

2. Hiveメタストアとの連携

以前のブログで検証したように、Hiveが3系になるとそれまでのHiveとは全く違うものになり、SparkとHiveテーブルの連携にはHive Warehouse Connectorが必須になります。Spark3系でも同じです。SparkはテーブルをHiveと連携して管理することで色々便利だったと思いますが、Hive3の進化に対してSparkが今後どうなるかわからないところがあるのと、Hiveのサービス(HiveServer2やHive Metastoreなど)まで管理することは大変すぎて今回のHadoopクラスタ構成ではしたくなかったので、最初他の手段を探しました。

 

まず考えたのが、組み込みDBのApache Derbyをうまく使えないかです。SparkはデフォルトでこのDerbyを使うんですが、同じSparkセッション内だけでしかデータが共有されません。設定を変えればすぐにできるかなと思って色々調べましたが(例:参考サイト)、永続化するにはHiveサポートを有効にした方が良さそうで、そうするとDerbyを使うよりもっと適切なRDBを使った方が良いので、別の方法を考えることにしました。

 

SparkはHiveメタストアのテーブル定義情報を直接読み込めるはずなので、Hiveのサービスを立ち上げなくても何とかなるのではと思って検証してみたんですが、そしたらやっぱりできました!

どうやったかというと、Hiveと連携するためのJarなどがそろったSparkをインストールし直して、Hiveメタストアで使うRDBを別途構築しそこをSparkから参照できるようにしました。

 

2.1. Hiveと連携可能なSparkを再インストール

先に自前のHadoopクラスタ (HDFS + YARN) 環境を立ててからSparkをインストールしたため、spark-3.0.0-bin-without-hadoop.tgzのバイナリを使いましたが、Hive連携で必要な幾つかのJarなどは含まれていなくてHive連携ができなかったので、Hadoop連携を前提にしてビルドされたspark-3.0.0-bin-hadoop3.2.tgzのバイナリを使ってインストールし直すことにしました。

 

Sparkの再インストールのやり方は前回のブログの3-6に書いたみたいに、バイナリを展開してシンボリックリンクを張り直すのがメインです。今回のHadoopクラスタ環境の全ノードで再インストールしました。

 

2.2. Hiveメタストアで使うRDBの構築

RDBは以前の検証で構築したまま残っていたMySQL系のPerconaXtraDBCluster (PXC) があったのでそれを利用しました。まず任意のデータベースとユーザーを作成します。

 

mysql>
CREATE DATABASE deltalake_meta;
CREATE USER deltalake@'xx.xx.xx.%' IDENTIFIED BY 'xxxxx';
GRANT ALL ON deltalake_meta.* TO deltalake@'xx.xx.xx.%';
FLUSH PRIVILEGES;

 

次にこのデータベースにHiveメタストア用のテーブル作成と初期データ投入を行います。通常はHive Metastoreサービスを構築するときに自動で作ってくれますが、今回は手動で行います。

これらのテーブル作成と初期データ投入のためのSQLがHiveには用意されているので、それを利用します。

 

spark-3.0.0-bin-hadoop3.2.tgzのバイナリを展開して中を見ると、Hive2.3.7を使ってSparkをビルドしていたことがわかったので、Hive2.3.7のSQLを持ってきます。

GitHubに上がっているHive2.3.7のソースをダウンロードすると、hive-rel-release-2.3.7/metastore/scripts/upgrade/mysqlというディレクトリがありそこに対象のSQLがありました。hive-schema-2.3.0.mysql.sqlとhive-txn-schema-2.3.0.mysql.sqlです(後者のSQLは前者のSQLの中から呼ばれています)。

 

ここで注意点があって、MySQL系のPXCはGalera Clusterの技術を使っていますが、作成するテーブルには必ずPrimary Keyが必須になります。以前の検証でも書きましたが、Hiveメタストアが作る一部のテーブルは残念なことにPKが張っていないため、そのようなテーブルには同じように自分でPKを張りました。

修正はすべてhive-txn-schema-2.3.0.mysql.sqlの方で以下の箇所です。

 

CREATE TABLE TXN_COMPONENTS (
  DUMMY_ID bigint NOT NULL AUTO_INCREMENT,
  TC_TXNID bigint NOT NULL,
  TC_DATABASE varchar(128) NOT NULL,
  TC_TABLE varchar(128) NOT NULL,
  TC_PARTITION varchar(767),
  TC_OPERATION_TYPE char(1) NOT NULL,
  PRIMARY KEY (DUMMY_ID),
  FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE COMPLETED_TXN_COMPONENTS (
  DUMMY_ID bigint NOT NULL AUTO_INCREMENT,
  CTC_TXNID bigint NOT NULL,
  CTC_DATABASE varchar(128) NOT NULL,
  CTC_TABLE varchar(256),
  CTC_PARTITION varchar(767),
  PRIMARY KEY (DUMMY_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE NEXT_TXN_ID (
  NTXN_NEXT bigint NOT NULL,
  PRIMARY KEY (NTXN_NEXT)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_TXN_ID VALUES(1);

## 何故か同じカラムに2つKEY(インデックス)を貼ろうとしているので、以下の行をコメントアウト
-- CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID);

CREATE TABLE NEXT_LOCK_ID (
  NL_NEXT bigint NOT NULL,
  PRIMARY KEY (NL_NEXT)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_LOCK_ID VALUES(1);

CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
  NCQ_NEXT bigint NOT NULL,
  PRIMARY KEY (NCQ_NEXT)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);

CREATE TABLE WRITE_SET (
  DUMMY_ID bigint NOT NULL AUTO_INCREMENT,
  WS_DATABASE varchar(128) NOT NULL,
  WS_TABLE varchar(128) NOT NULL,
  WS_PARTITION varchar(767),
  WS_TXNID bigint NOT NULL,
  WS_COMMIT_ID bigint NOT NULL,
  WS_OPERATION_TYPE char(1) NOT NULL,
  PRIMARY KEY (DUMMY_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

 

修正が終わったらhive-schema-2.3.0.mysql.sqlと修正したhive-txn-schema-2.3.0.mysql.sqlを同じ作業ディレクトリに配置して、hive-schema-2.3.0.mysql.sqlを実行します。

これでHiveメタストアのテーブル準備が完了しました。

 

2.3. mysql-connectorの準備

構築したRDB環境にSparkからつなぐには、MySQL系なのでmysql-connectorのjarが必要になります。今回は以前の検証で構築したHadoop環境で利用したmysql-connector-java-8.0.13.jarを使いました。

mysql-connectorのjarを用意したら、全ノードのSPARK_HOME直下のjarディレクトリに配置しました。

 

2.4. hive-site.xmlの作成

SparkがHiveと連携するのにまだ大事なことがあって、実はHive設定ファイルのhive-site.xmlが必要になります。Hiveのサービスを起動するわけではないですが、SparkがHive連携するときにhive-site.xmlを読み込むためです。以下の5項目だけのhive-site.xmlを作って、全ノードのSPARK_HOME直下のconfディレクトリに配置しました。

 

<configuration>
      <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://xx.xx.xx.xx:3306/deltalake_meta</value>
      </property>
	  <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>deltalake</value>
      </property>
	  <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>xxxxx</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
      </property>
      <property>
         <name>hive.metastore.warehouse.dir</name>
         <value>/apps/spark/warehouse</value>
         <description>location of default database for the warehouse</description>
      </property>
</configuration>

 

hive-site.xmlを配置したら、spark-env.shにHIVE_CONF_DIRとしてhive-site.xmlを配置した場所を指定します。

 

echo 'export HIVE_CONF_DIR=/usr/spark/current/conf' >> /usr/spark/current/conf/spark-env.sh

 

これでHiveメタストアとSparkを連携させる準備が整いました。

 

2.5. Hiveメタストアと連携するSparkセッションの張り方

準備が整ったらHiveメタストアとの連携を有効にしてSparkセッションを張ります。そのために先ずSparkのconfigに”spark.sql.catalogImplementation=hive“を設定します。spark-defaults.confに直接書くか、pysparkシェルでの接続やspark submitするときに以下のように設定します。

 

pyspark \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore" \
  --conf "spark.sql.catalogImplementation=hive"

 

次にSparkセッションを張る際に以下のように”.enableHiveSupport()“を指定すれば、これでHiveメタストアと連携できるようになります。

 

from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
from py4j.java_collections import MapConverter
from delta.tables import *
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

spark = SparkSession \
    .builder \
    .appName("test-deltalake") \
    .enableHiveSupport() \
    .getOrCreate()

 

Hiveメタストアの連携はHiveサービスを立ち上げなくてもHiveメタストア用にRDB環境を整えてやれば連携できるので、これは便利ですね。

 

2. データロード

SparkSQLでいつでもテーブル定義情報が読み込めるようになったところで、Delta Lakeのテーブルにデータをロードする検証をやってみました。実際の本番環境ではデータをストリームで順次格納していくことが想定されるので今回は敢えてストリーミングRead/Writeを使ってCSVデータをテーブルにロードすることを試しました。

1. CSVデータの準備

ロードするCSVデータには以前の検証で使ったアイオワ州のお酒販売のデータを使いました。Sparkの基本の日付フォーマットはMySQLと同じく”0000-00-00“なので、予めこのフォーマットに日付カラムを変換したのと、カンマが値として入っていることもありえるためすべての項目をダブルクオートで括っておきました。あわせてカラム名が定義されたヘッダー行も外しておきました。

 

# データのサンプル
"S29198800001","2015-11-20","2191","Keokuk Spirits","1013 MAIN","KEOKUK","52632","1013 MAIN     KEOKUK 52632    (40.39978, -91.387531)","56","Lee","","","255","Wilson Daniels Ltd.","297","Templeton Rye w/Flask","6","750","$18.09","$27.14","6","$162.84","4","1"
"S29195400002","2015-11-21","2205","Ding's Honk And Holler","900 E WASHINGTON","CLARINDA","51632","900 E WASHINGTON     CLARINDA 51632  (40.739238, -95.02756)","73","Page","","","255","Wilson Daniels Ltd.","297","Templeton Rye w/Flask","6","750","$18.09","$27.14","12","$325.68","9","2"
"S29050300001","2015-11-16","3549","Quicker Liquor Store","1414 48TH ST","FORT MADISON","52627","1414 48TH ST   FORT MADISON 52627      (40.624226, -91.373211)","56","Lee","","","130","Disaronno International LLC","249","Disaronno Amaretto Cavalli Mignon 3-50ml Pack","20","150","$6.40","$9.60","2","$19.20","0","0"

 

2. ストリーミングRead/Write

通常はデータを全部データフレームに読み込んでしまえばSparkSQLで予めテーブルを作成しなくてもDelta Lakeフォーマットのテーブルとして保存&Hive連携できますが、ストリーミングWriteの場合はダメでした。なので先ずSparkSQLでテーブルを作成します。

 

ddl = """
CREATE TABLE iowa_liquor_sales (
  invoice_item_number STRING,
  sale_date DATE,
  store_number INTEGER,
  store_name STRING,
  address STRING,
  city STRING,
  zip_code INTEGER,
  store_location STRING,
  county_number INTEGER,
  county STRING,
  category STRING,
  category_name STRING,
  vendor_number INTEGER,
  vendor_name STRING,
  item_number INTEGER,
  item_description STRING,
  pack INTEGER,
  bottle_volume INTEGER,
  state_bottle_cost STRING,
  state_bottle_retail STRING,
  bottles_sold INTEGER,
  sale STRING,
  volume_sold_liters INTEGER,
  volume_sold_gallons INTEGER
)
USING DELTA
PARTITIONED BY (sale_date)
"""
spark.sql(ddl)

 

テーブルを作ってからCSVデータを読み込んでみます。実はCSVデータを読み込む際に、そのデータのスキーマ(フィールドの定義)を指定する必要があって、二度手間な感じがあります。

 

tab_schema = StructType([
  StructField("invoice_item_number", StringType(), False),
  StructField("sale_date", DateType(), False),
  StructField("store_number", IntegerType(), True),
  StructField("store_name", StringType(), True),
  StructField("address", StringType(), True),
  StructField("city", StringType(), True),
  StructField("zip_code", IntegerType(), True),
  StructField("store_location", StringType(), True),
  StructField("county_number", IntegerType(), True),
  StructField("county", StringType(), True),
  StructField("category", StringType(), True),
  StructField("category_name", StringType(), True),
  StructField("vendor_number", IntegerType(), True),
  StructField("vendor_name", StringType(), True),
  StructField("item_number", IntegerType(), True),
  StructField("item_description", StringType(), True),
  StructField("pack", IntegerType(), True),
  StructField("bottle_volume", IntegerType(), True),
  StructField("state_bottle_cost", StringType(), True),
  StructField("state_bottle_retail", StringType(), True),
  StructField("bottles_sold", IntegerType(), True),
  StructField("sale", StringType(), True),
  StructField("volume_sold_liters", IntegerType(), True),
  StructField("volume_sold_gallons", IntegerType(), True),
])

 

StructFieldの3番目の要素のTrue/FalseはデータがNullを許すかどうかで、実質的なPrimary Keyにあたるinvoice_item_numberとパーティションキーのsale_dateはデータが必須なのでFalse(Nullを許さない)としてみました。

スキーマを定義したらストリーミングRead/Writeを行います。ストリーミングRead/WriteはSparkのStructured Streamingが使われています。readStreamに読み込みたいCSVデータを指定して、writeStreamに書き込みたいテーブルのパスを指定します。また、今回用意したCSVファイルに合うようにCSVの読み込みのオプションも指定おきます。

 

spark \
  .readStream \
    .format("csv") \
    .options(header="false", inferSchema="false", quote='"') \
    .schema(tab_schema) \
    .load(load_path) \
  .writeStream \
    .format("delta") \
    .partitionBy("sale_date") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_location) \
    .start(tab_location)

 

tab_schemaは上記で指定したCSVのフィールドの定義で、load_pathは読み込むCSVデータのパス、checkpoint_locationはストリーミングWriteのチェックポイントの情報を書き込む先で、tab_locationは書き込むテーブルのパスです。

 

最初、PySparkシェルを実行するノードのローカルに置いたCSVファイルを読み込ませようとしてたんですがうまくいかず、よくよく考えたらSparkをデフォルトでYARNモード(Hadoopクラスタ利用)にしていてPySparkシェルの実行環境が別のノードとなっていたことが原因でした。

 

PySparkシェルの実行環境を都度調べてそのノードのローカルにファイルを配置するのもHadoopクラスタ上でSparkを使っている意味が薄いので、今回はCSVファイルを一度HDFSに置いてからストリーミングRead/Writeさせることにしました。

 

# アップロード先のHDFSディレクトリを作成
hdfs dfs -mkdir -p /tmp/load_data/iowa_liquor_sales/iowa_liquor_sales_change_dateformat
# CSVファイルをHDFSにアップロード
hdfs dfs -copyFromLocal iowa_liquor_sales_change_dateformat.csv /tmp/load_data/iowa_liquor_sales/iowa_liquor_sales_change_dateformat/
# ストリーミングWriteに必要なチェックポイントの情報を書き込む先のHDFSディレクトリを作成
hdfs dfs -mkdir -p /user/spark/_checkpoints/iowa_liquor_sales/csv_iowa_liquor_sales_change_dateformat

 

必要なHDFSのディレクトリ作成をしてCSVファイルをアップロードしたら、load_pathcheckpoint_locationtab_locationを指定して上記のreadStream/writeStreamを実行します。

 

load_path = "/tmp/load_data/iowa_liquor_sales/iowa_liquor_sales_change_dateformat/"
checkpoint_location = "/user/spark/_checkpoints/iowa_liquor_sales/csv_iowa_liquor_sales_change_dateformat"
tab_location = "/apps/spark/warehouse/iowa_liquor_sales"

 

これでCSVデータのストリーミングRead/Writeがうまくいきました!

今回のCSVデータは3GBくらいのサイズでしたが何の問題もなかったです。数GBくらいのサイズのデータファイルなら一括で読み書きできる大きさなので、わざわざストリーミングRead/Writeする必要はないですが、実運用ではHDFSに定期的にデータを上げてそれを順次ストリーミングRead/Writeしたり、Kafkaと連携してSpark Strunctured Streamingを使ってデータを格納するんだと思います。

 

3. 簡単なトランザクション検証

今度は肝心なDelta Lakeのトランザクションを検証してみました。Sparkセッションを二つ開いて同時に同じ行にUPDATEしたらどうなるかを見てみました。

1. 同じ行を同時にUPDATEした場合

先ずPySparkシェルで二つ別々に接続して、それぞれSparkセッションを張ります。Session1のSparkSQLを実行してすぐにSession2のSparkSQLを実行してみます。

 

# Session1
dml = """
UPDATE iowa_liquor_sales
SET store_name = concat(store_name, '★')
WHERE invoice_item_number='S28856000020'
"""
spark.sql(dml)
# Session2
dml = """
UPDATE iowa_liquor_sales
SET store_name = concat(store_name, '☆')
WHERE invoice_item_number='S28856000020'
"""
spark.sql(dml)

 

結果、どうなるかというと、先に投げたUPDATE文が成功して、後に投げたUPDATE文がエラーで怒られました。

 

Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/spark/current/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/spark/current/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/spark/current/python/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/usr/spark/current/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o54.sql.
: org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to partition [sale_date=2015-11-04] by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1601107816214,"operation":"UPDATE","operationParameters":{"predicate":(invoice_item_number#1932 = S28856000020)},"readVersion":1,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numAddedFiles":"1","numUpdatedRows":"1","numCopiedRows":"1242"}}
Refer to https://docs.delta.io/latest/concurrency-control.html for more details.
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkAndRetry$2(OptimisticTransaction.scala:622)
        at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkAndRetry$1(OptimisticTransaction.scala:553)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkAndRetry(OptimisticTransaction.scala:548)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkAndRetry$(OptimisticTransaction.scala:541)
        at org.apache.spark.sql.delta.OptimisticTransaction.checkAndRetry(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommit$1(OptimisticTransaction.scala:532)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:464)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:459)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:340)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:295)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:293)
        at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
        at org.apache.spark.sql.delta.commands.UpdateCommand.performUpdate(UpdateCommand.scala:167)
        at org.apache.spark.sql.delta.commands.UpdateCommand.$anonfun$run$2(UpdateCommand.scala:67)
        at org.apache.spark.sql.delta.commands.UpdateCommand.$anonfun$run$2$adapted(UpdateCommand.scala:66)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at org.apache.spark.sql.delta.commands.UpdateCommand.$anonfun$run$1(UpdateCommand.scala:66)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.commands.UpdateCommand.recordOperation(UpdateCommand.scala:45)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.commands.UpdateCommand.recordDeltaOperation(UpdateCommand.scala:45)
        at org.apache.spark.sql.delta.commands.UpdateCommand.run(UpdateCommand.scala:63)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
        at org.apache.spark.sql.Dataset.(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

 

Deleta Lakeのトランザクションは前勝ちということがわかります。一般的なRDBのトランザクションの感覚からすると、本当は後続のUPDATE文は先のUPDATE文が終わるのを待ってからUPDATEしてほしいところですが(行ロック)、すぐにエラーで返ってきます。データの整合性は保たれますがアプリケーションの実装はちょっと考えどころですね。

 

4. 大量データへのクエリ

最後に大量データのDelta Lakeテーブルへのクエリの処理速度がどんな感じが検証しました。

1. パーティションキーでの絞り込み

大量データにクエリする前に、先ずテーブルの一つのパーティションだけスキャンするように条件を絞ったクエリとそうでないクエリの速度を比較してみました。

 

import timeit

q = """
SELECT invoice_item_number, sale_date FROM iowa_liquor_sales WHERE sale_date='2015-11-04' AND invoice_item_number='S28856000020'
"""
startTime=timeit.default_timer()
spark.sql(q).show()
endTime=timeit.default_timer()
print(endTime - startTime) # 結果:1.8秒ほど(2回平均)

q = """
SELECT invoice_item_number, sale_date FROM iowa_liquor_sales WHERE invoice_item_number='S28856000020'
"""
startTime=timeit.default_timer()
spark.sql(q).show()
endTime=timeit.default_timer()
print(endTime - startTime) # 結果:70秒ほど(2回平均)

 

invoice_item_numberが’S28856000020’の1行を検索するだけのクエリです。

事前に調べてこの行が’2015-11-04’のパーティション(パーティションキーはsale_date)にあることがわかっているので、その条件の有無でクエリ速度を比較しました。計測にはtimeitを使っています。

 

iowa_liquor_salesテーブルは1260万件ほどのレコードがありプレーンテキストで3.3GBくらいのサイズ(HDFS上のDelta Lakeフォーマットで790MBほど)で、sale_dateをキーにしたパーティション総数は1379個です。’2015-11-04’のパーティションには1万2千行ほどが入っていました。

 

比較の結果、当たり前といえば当たり前ですがパーティションで絞った方が圧倒的に速かったです。Delta Lakeは一般的なRDBのPrimary Key制約(+インデックス)があるわけではないので、やっぱりパーティショニングを活用するのがファーストステップのようですね。

ちなみに、Delta Lakeはキャッシュテーブルというものもあるようで、

 

q="""
CACHE TABLE iowa_liquor_sales
"""
spark.sql(q).show(truncate=False)

 

これを実行するとRDDとしてデータがメモリに乗り、その後接続を切るまではそのテーブルへのクエリは速くなりました。上記のパーティションキーによる絞り込みがない方のクエリを投げると、キャッシュが効いて3秒(2回平均)くらいで返ってきました。毎回CACHE TABLE文を発行するのは使い勝手が悪いですが、それほど大きくないテーブルに繰り返しクエリを投げる処理があれば使うのも有効かもしれません。

 

2. テーブル結合したクエリ速度

パーティションキーを条件にするかしないかのクエリ速度の違いをわかったところで、今度は大量データへのクエリを検証するために大きなテーブルを準備します。

 

あくまでテストデータなので、iowa_liquor_salesテーブルのデータを何回もiowa_liquor_sales_largeというテーブルにINSERT INTOして準備しました。実質的なPrimary Keyのinvoice_item_numberは重複しないように値を加工してINSERTしています。

最終的にiowa_liquor_sales_largeには2億7700万件くらいのレコード、HDFS上のDelta Lakeフォーマットで15GB、プレーンテキストなら恐らく70GBくらいのサイズになりました。

 

内容的には特に意味のない検証のためのテーブル結合クエリを行うpyshonファイルを用意して、

 

from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
from delta.tables import *
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

spark = SparkSession \
    .builder \
    .master("yarn") \
    .appName("query_large_data") \
    .enableHiveSupport() \
    .getOrCreate()

q="""
SELECT l.category_name, count(*) FROM iowa_liquor_sales_large l
INNER JOIN (
  SELECT category_name FROM iowa_liquor_sales
  WHERE (sale_date BETWEEN '2016-12-20' AND '2017-01-05') AND vendor_name = 'Jim Beam Brands'
  ) t ON l.category_name = t.category_name
WHERE (l.sale_date BETWEEN '2017-10-01' AND '2017-10-31')
GROUP BY l.category_name
"""
spark.sql(q).show(2000, truncate=False)

 

このPythonファイルを以下のパラメータでspark-submitするやり方で検証してみました。

 

#!/usr/bin/bash
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 4 \
  --executor-memory 6G \
  --executor-cores 1 \
  --driver-memory 6G \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore" \
  --conf "spark.sql.catalogImplementation=hive" \
  --conf "spark.ui.enabled=false" \
  query_large_data.py \
  2>&1 |tee query_large_data.log

 

結果、2回試しましたが5分くらいのクエリ処理速度でした。

 

+------------------------------+----------+
|category_name                 |count(1)  |
+------------------------------+----------+
|Straight Rye Whiskies         |1344420   |
|American Cordials & Liqueurs  |18564     |
|Single Barrel Bourbon Whiskies|257544    |
|Spiced Rum                    |8220198   |
|American Schnapps             |321057513 |
|White Rum                     |6118560   |
|Mixto Tequila                 |13402305  |
|Imported Vodka                |34650     |
|American Cordials & Liqueur   |100758    |
|Scotch Whiskies               |224406    |
|American Brandies             |10003896  |
|Imported Dry Gins             |389151    |
|Temporary & Specialty Packages|857766    |
|Canadian Whiskies             |497578368 |
|Irish Whiskies                |2909151   |
|Single Malt Scotch            |6679806   |
|100% Agave Tequila            |31736922  |
|Aged Dark Rum                 |883386    |
|Holiday VAP                   |620823    |
|Cocktails / RTD               |629160    |
|Flavored Rum                  |38652012  |
|Special Order Items           |212541    |
|Blended Whiskies              |224657454 |
|Triple Sec                    |1631700   |
|Imported Brandies             |15856596  |
|American Vodkas               |618555    |
|Coffee Liqueurs               |4217556   |
|American Dry Gins             |11088336  |
|Bottled in Bond Bourbon       |59220     |
|Imported Flavored Vodka       |28078050  |
|Imported Cordials & Liqueurs  |7220346   |
|Straight Bourbon Whiskies     |1035891045|
+------------------------------+----------+

 

データサイズとクエリの内容からしたらまずまずの速度で処理できたんじゃないでしょうか。大量のデータを格納したDelta Lakeテーブルへの参照クエリは問題なくできました!

 

5. まとめ

1. Delta Lake+Sparkは使えそうか

前回と今回の技術検証でHadoopクラスタ上にDelta Lake+Sparkを試しましたが、Delta Lakeを使えばある程度のトランザクション制御&バージョン管理もでき、SparkSQLのテーブル定義もHiveサービスを立ち上げずにHiveメタストア連携での管理ができたので、この構成は結構使えそうな印象です。

 

設定がとても大変ですけどHDFSとYARNを使うのはストレージ管理、リソース管理、そして冗長性においてやっぱり強力なので、Delta Lake+Sparkを使うにはHadoopクラスタ上でやるのが一つの基本線な気がします。

 

2. 本番環境でデータ解析基盤として使うために

本番環境でデータ解析基盤として使うためにはさらにまだ幾つかの課題があります。

  • BIツールとの連携
    • TableauやApache Zeppelin他
  • セキュリティ回り
    • Apache Rangerの利用など
  • INSERTメインで時々UPDATEが投げられるようなワークロードの検証
    • データの整合性とVACUUM

特にBIツールが使えないことにはデータ解析基盤としてはダメなので、次に検証するときはApache ZeppelinとDelta Lake+Sparkあたりをやってみたいと思います。

最後に

次世代システム研究室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。   皆さんのご応募をお待ちしています。





Pocket

関連記事