2020.07.08
ビッグデータの新しい基盤としてHadoop上でSpark+Delta Lakeを検証してみた
こんにちは。次世代システム研究室のデータベース と Hadoop を担当している M.K. です。
前回データベースのテーマでPostgreSQL&Patroniを検証しましたが、今回はまたHadoop系のテーマでDelta Lakeを検証しました。
Delta LakeはDatabricks社が開発したApache Spark用のストレージレイヤーの技術です。去年オープンソース化になってまだそれほど日が経っていません。Databricks社はビッグデータを処理する技術としてとても良く使われているApache Sparkの生みの親が創業した会社ですね。
目次
1. 何故Delta Lakeか
ビッグデータを溜めて解析する基盤として、ずっとオープンソースのHadoopとその関連技術を使ってきました。
今まではHadoop分野の大手ベンダー2社であるCloudera社やHortonwork社が出しているHadoopのオープンソースの統合パッケージ(前者がCDH、後者がHDP)を使うことが多かったのですが、両社が統合し新生Cloudera社となってパッケージをデプロイ&運用管理するツールがサブスクリプションを契約しないと事実上使うことができなくなりました。
そのような事情があり気軽に新技術が検証できなくなってしまったのと、以前の技術ブログでも何回かに渡って最新のHDPを検証してきましたが、Hadoop関連技術が発展・拡張するにつれて仕組みや設定がますます複雑化し運用が大変になってきたため、選択肢の一つとしてもう少しシンプルなソリューションも調べておこうと考えたのがきっかけです。
以前の技術ブログ:
- Hive3とLLAPとDruidを試す ~HDP3.0.1.0~(前編:構築編)
- Hive3のトランザクションを試してみる
- Hive3のトランザクションを有効にしたテーブルにSpark2を連携してみる~Hive Warehouse Connector検証
自分の周りではビッグデータを解析する上で最も必要不可欠なのはSparkなので、Sparkだけでビッグデータ解析基盤を作れないかと思っていたところに、ちょうどDelta Lakeを見つけました。
元々SparkはHadoopと親和性が高く、冗長性の高い分散ファイルシステムのHDFSとその上でRDBのようなテーブル管理ができるHiveと連携できることが強みですが、Hiveが3系になって今までとはだいぶ違う製品になり、Sparkとの連携が今までとは同じようにはいかなくなってきたこととSparkだけでもある程度テーブルのような管理ができるので、Hiveを使わずにSparkだけでできるやり方を探していたらまさにそのためのDelta Lakeがありました。
前置きがちょっと長くなりましたが、Delta LakeはSpark用のテーブル管理の技術です。ストレージレイヤーの技術ですが独自のファイルシステムや分散処理の仕組みを持っているわけではありません。ファイルシステムはローカルファイルだったりHDFSを利用します。
Sparkのファイルフォーマットと言えばParquetが有名ですが、Delta Lakeも実はParquetファイルの集まりです。
HDFSを使わないのが最もシンプルに基盤構築できますが、やっぱり本番運用を見据えるとHDFSにデータを書き溜めることはとても強力なので、今回の検証ではHDFS上にDelta Lakeの環境を構築してSparkで操作してみようと思います。
※Hadoopの環境構築に関する章が長いので、時間のない方は第4章からどうぞ。
2. Delta Lakeを使うSparkとHadoopの環境準備
1. 利用技術
今回利用するミドルウェアは、以下のものでCentOS7.8で試しました。
- Java 8 (java-1.8.0-openjdk) ※yumでインストール
- Hadoop 3.2.1
- Zookeeper 3.4.14
- Spark 3.0.0
- Python 2.7.5
- Delta Lake 0.7 (delta-core_2.12)
2. サーバースペック
検証に当たりお馴染みの GMO アプリクラウドのサーバーを使いました。スペックは以下です。
- マスターノード(主にNameNode, ResouceManager, Zookeeper)用のサーバー ×3
- OS : CentOS 7.8
- 仮想 CPU : 2
- メモリ容量 : 8GB
- ディスク容量 : 160GB
- スレーブノード(主にDataNode)用のサーバー ×3
- OS : CentOS 7.8
- 仮想 CPU : 4
- メモリ容量 : 16GB
- ディスク容量 : 320GB
参考までに、GMOアプリクラウドと同じ技術を使ったより汎用的な Z.com クラウドもあるので興味のある方はぜひ。
3. 最初Ambariを試してみた
上述したようにCDHやHDPを使わないソリューションも探すと言いましたが、Delta lakeの検証自体はHadoop環境であればできるので、実は当初は以前のブログで構築したHDP3.0.1を使おうと考えていました。
が、Delta LakeはSpark2.4.2以上のバージョンでないとダメで、HDP3.0.1のSparkは2.3.1だったため断念しました。
次にHadoop環境を自分で構築&運用するのは大変なので、ある程度それを手助けしてくれることを期待して最新のAmbariを落としてきて試してみました。AmbariはHortonwork社が出していたHDP向けのHadoop環境構築&運用ツールでオープンソースとして公開されているものです。GUIで操作することができます。
最新のAmbariの2.7.5をGitレポジトリから落としてきて、mavenでビルドしてエラーを解決しながらも完了し、前準備やAmbariセットアップも済んで、さあHadoop環境を作るぞというところで大きな問題が。
Ambari Serverを起動して最初にGUIの画面にログインすると、Hadoop環境構築Wizardの画面が用意されます。そこまでは順調でこのWizardを進めたんですが、手順1のSelect Versionのところで、HDPを構成するHadoopの各種コンポーネントを選ぶのですが、そのコンポーネントがあるレポジトリの参照先がCloudera社のサブスクリプションを購入した人だけがアクセスできるサイトに向けられていてここで完全に進めなくなりました。。
自分でビルドした各種コンポーネントを選んだり、このWizardを回避できれば良かったんですが、そのままではどうもできないみたいで、今回は自分でHadoop環境を構築することにしました。
4. 前準備
Ambariを試したときも同じですが、Hadoop環境を構築するのに事前にやっておくことをHadoopクラスタを組む全サーバーで実施しています。
- Hadoopクラスタを組むサーバーのホスト名(FQDN)が書かれた/etc/hostsファイルを準備
- ちなみに、今回のホスト名・構成は以下の通り。
xx.xx.xx.101 k-deltalake-m1.testdelta.com k-deltalake-m1 # マスターノード1
xx.xx.xx.102 k-deltalake-m2.testdelta.com k-deltalake-m2 # マスターノード2
xx.xx.xx.103 k-deltalake-m3.testdelta.com k-deltalake-m3 # マスターノード3
xx.xx.xx.111 k-deltalake-s1.testdelta.com k-deltalake-s1 # スレーブノード1
xx.xx.xx.112 k-deltalake-s2.testdelta.com k-deltalake-s2 # スレーブノード2
xx.xx.xx.113 k-deltalake-s3.testdelta.com k-deltalake-s3 # スレーブノード3
- ちなみに、今回のホスト名・構成は以下の通り。
- ntpによるサーバー時刻同期
- 環境構築で使うOSパッケージ(curl、wget、rsync)のyum install
- transparent_hugepagesの無効化
- /etc/security/limits.confのリソース制限の変更
- Javaのインストール(java-1.8.0-openjdk, java-1.8.0-openjdk-devel) ⇒今回はOracleでなくOpenJDKを利用
このあたりは前に書いたこちらのブログに詳細を書いていますので参照してみてください。
3. HadoopおよびSparkの環境構築
前準備が終わったら自分でHadoop環境を構築していきます。
1. Hadoop用ユーザー&グループ作成
一つのユーザーを使いまわすわけではなく、本番運用を見据えてそれぞれユーザーを作ることにしました。その代わりグループはhadoopというグループに所属させました。hdfsユーザーだけはHadoopの管理ユーザーにする予定なので、hdfsグループも作っています。
# 全サーバーで実施 groupadd hadoop groupadd hdfs useradd -N -g hadoop zookeeper useradd -g hadoop -G hdfs hdfs useradd -N -g hadoop yarn useradd -N -g hadoop spark
2. Hadoopのインストール
どうせなら最新のHadoop3系を使いたいので、最新安定版のHadoop3.2.1を使います。インストール自体はバイナリをダウンロードして展開するだけで簡単です(大変なのは設定です・・)。必要なディレクトリも予め作成してパーミッションを整えておきます。
# 全サーバーで実施 cd /var/tmp/ wget https://ftp.riken.jp/net/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz tar zxvf hadoop-3.2.1.tar.gz chown -R root:root hadoop-3.2.1 mkdir /usr/hadoop mv hadoop-3.2.1 /usr/hadoop/ cd /usr/hadoop ln -s hadoop-3.2.1 current chown -R root:hadoop /usr/hadoop/hadoop-3.2.1 chmod 775 /usr/hadoop/current/etc/hadoop cd /etc ln -s /usr/hadoop/current/etc/hadoop hadoop mkdir -p /var/log/{hadoop,yarn} mkdir -p /var/run/{hadoop,yarn} mkdir -p /var/hadoop/yarn/{local,recovery} cat << _EOF_ > /etc/hadoop/workers k-deltalake-s1.testdelta.com k-deltalake-s2.testdelta.com k-deltalake-s3.testdelta.com _EOF_ # k-deltalake-m1, m2, m3で実施 mkdir -p /var/hadoop/dfs/{jn,nn,snn} # k-deltalake-s1, s2, s3で実施 mkdir -p /var/hadoop/dfs/dn # 全サーバーで実施 chown root:hadoop /var/hadoop chown -R hdfs:hadoop /var/run/hadoop chown -R hdfs:hadoop /var/log/hadoop chown -R hdfs:hadoop /var/hadoop/dfs chown -R root:hadoop /usr/hadoop/hadoop-3.2.1 chmod 775 /usr/hadoop/current/etc/hadoop chown -R yarn:hadoop /var/log/yarn chown -R yarn:hadoop /var/run/yarn chmod 775 /var/log/hadoop chmod 775 /var/log/yarn
3. Zookeeperのインストール&構築
次にZookeeperをインストールします。Zookeeperもバイナリをダウンロードして展開します。Zookeeperはあんまり新しすぎるとHadoopが対応していないかもしれないと思ったので、少し前の3.4系の最新版にしました。
ZookeeperはHadoop関連技術でとても良く使われる分散アプリケーションのためのコーディネーションエンジンで、HadoopのHA構成をするときなどは必須になります。
# k-deltalake-m1, m2, m3で実施 cd /var/tmp wget https://ftp.riken.jp/net/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz tar zxvf zookeeper-3.4.14.tar.gz chown -R root:root zookeeper-3.4.14 mkdir /usr/zookeeper mkdir -p /var/hadoop/zookeeper mkdir -p /var/log/zookeeper mkdir -p /var/run/zookeeper mv zookeeper-3.4.14 /usr/zookeeper/ cd /usr/zookeeper ln -s zookeeper-3.4.14 current cd /etc ln -s /usr/zookeeper/current/conf zookeeper
続いて、Zookeeperの設定を行います。zoo.cfgファイルにZookepperのパラメータ値を書き込みます。
# k-deltalake-m1, m2, m3で実施 cat << _EOF_ > /etc/zookeeper/zoo.cfg clientPort=2181 initLimit=10 autopurge.purgeInterval=24 syncLimit=5 tickTime=3000 dataDir=/var/hadoop/zookeeper autopurge.snapRetainCount=30 server.1=k-deltalake-m1.testdelta.com:2888:3888 server.2=k-deltalake-m2.testdelta.com:2888:3888 server.3=k-deltalake-m3.testdelta.com:2888:3888 _EOF_
それから、Zookeeper用の環境変数をセットするシェルをzookeeper-env.shという名前で作成します。また、Zookeeperのクラスタを構成するノード固有の識別子をmyidファイルに書き込みます。
# k-deltalake-m1, m2, m3で実施 cat << _EOF_ > /usr/zookeeper/current/conf/zookeeper-env.sh export JAVA_HOME=/usr/lib/jvm/java export ZOOKEEPER_HOME=/usr/zookeeper/current export ZOO_LOG_DIR=/var/log/zookeeper export ZOOPIDFILE=/var/run/zookeeper/zookeeper_server.pid export SERVER_JVMFLAGS=-Xmx1024m export JAVA=$JAVA_HOME/bin/java _EOF_ chmod 755 /usr/zookeeper/current/conf/zookeeper-env.sh # k-deltalake-m1で実施 echo 1 > /var/hadoop/zookeeper/myid # k-deltalake-m2で実施 echo 2 > /var/hadoop/zookeeper/myid # k-deltalake-m3で実施 echo 3 > /var/hadoop/zookeeper/myid
以上の設定が終わったらZookepperを起動します。うまく起動できていればどのノードからも接続確認して同じ結果が返ってきます。
# k-deltalake-m1, m2, m3で実施 # zookeeperユーザーで実施 /usr/zookeeper/current/bin/zkServer.sh start # 接続確認 /usr/zookeeper/current/bin/zkCli.sh -server 127.0.0.1:2181 [zk: 127.0.0.1:2181(CONNECTED) 0] ls / ===== [zookeeper] =====
4. HDFSの構築&HA
Zookepperが構築・起動できたら次はHadoopのHDFSを構築します。今回はHDFSのNameNodeをマスターノード1と2に立てる構成にするので、それぞれのノードのhdfsユーザーがノーパスワードでサーバーにssh接続できるようにします。
- k-deltalake-m1からk-deltalake-m2, s1, s2, s3にhdfsユーザーでノーパスワードでssh可能に
- k-deltalake-m2からk-deltalake-m1, s1, s2, s3にhdfsユーザーでノーパスワードでssh可能に
それが終わったらいよいよHDFSの設定を行っていきます。最初はhadoop-env.shをやります。
Hadoop(HDFSとYARN)の設定ファイル一式は$HADOOP_HOME/etc/hadoopに置かれています。今回そのディレクトリを/etc/hadoopへシンボリックリンクを張りました。
hadoop-env.shではHadoopの環境変数をセットします。Hadoop関連技術のJava/JVMのオプションは〇〇-env.shで環境変数にセットするので、本番運用するときはちゃんとやらないといけないところ。今回は色々参考にしながら主に以下の項目を設定してみました。
# k-deltalake-m1で実施 vi /etc/hadoop/hadoop-env.sh # hadoop-env.shの以下の箇所を修正 ------------------------------------------------------------ ### # Generic settings for HADOOP ### export JAVA_HOME=/usr/lib/jvm/java export HADOOP_HOME=/usr/hadoop/current export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_HEAPSIZE_MAX=1024 export HADOOP_HEAPSIZE_MIN=1024 export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true ${HADOOP_OPTS}" export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)} export HADOOP_CLIENT_OPTS="-Xmx${HADOOP_HEAPSIZE_MAX}m $HADOOP_CLIENT_OPTS" export HADOOP_LIBEXEC_DIR=${HADOOP_HOME}/libexec ### # Options for remote shell connectivity ### export HADOOP_SSH_OPTS="-o ConnectTimeout=5 -o SendEnv=HADOOP_CONF_DIR" ### # Options for all daemons ### USER="$(whoami)" export HADOOP_LOG_DIR=/var/log/hadoop export HADOOP_IDENT_STRING=$USER export HADOOP_PID_DIR=/var/run/hadoop export HADOOP_SECURITY_LOGGER=INFO,DRFAS ### # Secure/privileged execution ### export HDFS_DATANODE_SECURE_USER=${HDFS_DATANODE_SECURE_USER:-""} export HADOOP_SECURE_PID_DIR=${HADOOP_SECURE_PID_DIR:-/var/run/hadoop/$HDFS_DATANODE_SECURE_USER} #export HADOOP_SECURE_LOG= export HADOOP_SECURE_LOG_DIR=${HADOOP_SECURE_LOG_DIR:-/var/log/hadoop/$HDFS_DATANODE_SECURE_USER} ### # NameNode specific parameters ### export HDFS_AUDIT_LOGGER=INFO,DRFAAUDIT HDFS_NAMENODE_OPTS_COMMON="-server -XX:ParallelGCThreads=8 -XX:+UseConcMarkSweepGC -XX:ErrorFile=/var/log/hadoop/$USER/nn_err_%p.log -XX:NewSize=256m -XX:MaxNewSize=256m -Xloggc:/var/log/hadoop/$USER/gc.log-`date +'%Y%m%d%H%M'` -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms2048m -Xmx2048m" export HDFS_NAMENODE_OPTS="${HDFS_NAMENODE_OPTS_COMMON} ${HDFS_NAMENODE_OPTS}" ### # SecondaryNameNode specific parameters ### export HDFS_SECONDARYNAMENODE_OPTS="${HDFS_NAMENODE_OPTS_COMMON} ${HDFS_SECONDARYNAMENODE_OPTS}" ### # DataNode specific parameters ### export HDFS_DATANODE_OPTS="-server -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC -XX:ErrorFile=/var/log/hadoop/$USER/dn_err_pid%p.log -XX:NewSize=200m -XX:MaxNewSize=200m -Xloggc:/var/log/hadoop/$USER/gc.log-`date +'%Y%m%d%H%M'` -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xms1024m -Xmx1024m ${HDFS_DATANODE_OPTS} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly" # export HDFS_DATANODE_SECURE_USER= ### # HDFS Balancer specific parameters ### export HADOOP_BALANCER_OPTS="-server -Xmx1024m ${HADOOP_BALANCER_OPTS}" ### # YARN -- add ### export YARN_RESOURCEMANAGER_OPTS="-Dyarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY" ------------------------------------------------------------
次にcore-site.xmlを設定します。core-site.xmlを開いて設定したい項目を</configuration></configuration>の間に追加します。これも色々参考にして以下の設定を行いました。
<property> <name>ha.zookeeper.quorum</name> <value>k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181</value> </property> <property> <name>fs.defaultFS</name> <value>hdfs://k-deltalake</value> <final>true</final> </property> <property> <name>ha.failover-controller.active-standby-elector.zk.op.retries</name> <value>120</value> </property> <property> <name>hadoop.http.filter.initializers</name> <value>org.apache.hadoop.security.AuthenticationFilterInitializer,org.apache.hadoop.security.HttpCrossOriginFilterInitializer</value> </property> <property> <name>hadoop.proxyuser.hdfs.groups</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hdfs.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.hosts</name> <value>k-deltalake-m1.testdelta.com,k-deltalake-m2.testdelta.com,k-deltalake-m3.testdelta.com</value> </property> <property> <name>hadoop.proxyuser.yarn.hosts</name> <value>k-deltalake-m1.testdelta.com,k-deltalake-m2.testdelta.com,k-deltalake-m3.testdelta.com</value> </property> <property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec</value> </property> <property> <name>io.file.buffer.size</name> <value>131072</value> </property> <property> <name>io.serializations</name> <value>org.apache.hadoop.io.serializer.WritableSerialization</value> </property> <property> <name>ipc.client.connect.max.retries</name> <value>50</value> </property> <property> <name>ipc.server.tcpnodelay</name> <value>true</value> </property> <property> <name>net.topology.script.file.name</name> <value>/etc/hadoop/topology.sh</value> </property>
Hadoopはデータブロックをreplica数の分だけデータノードにコピーしますが、そのときに同じラックにあるサーバーを選ばないようにするRack Awarenessのためのスクリプトを今回適当に自作してみます(本番運用するときはもっと運用しやすいスクリプトにするはず)。 core-site.xmlの「net.topology.script.file.name」で設定したスクリプトです。
cat << _EOF_ > /etc/hadoop/topology.sh #!/bin/bash HOST=\`hostname\` case \$HOST in "k-deltalake-s1.testdelta.com") echo "/hadoop-rack01" ;; "k-deltalake-s2.testdelta.com") echo "/hadoop-rack02" ;; "k-deltalake-s3.testdelta.com") echo "/hadoop-rack03" ;; *) echo "/hadoop-default-rack" ;; esac _EOF_ chmod 755 /etc/hadoop/topology.sh
最後にhdfs-site.xmlを設定します。core-site.xmlと同じように</configuration></configuration>の間に追加します。
HDFSもたくさん設定項目があるんですが、色々参考にして今回は主に以下のものを設定しました。本番運用のときはワークロードにあわせてもっと設定項目を追加することになると思います。
大事なポイントはHDFSのHA構成の設定で、Journal Nodeに関する項目や「k-deltalake」を名前に含む項目です。 「k-deltalake」は今回私がcore-site.xmlのfs.defaultFSやhdfs-site.xmlのdfs.internal.nameservicesに設定したHDFSの一種の名前で、任意の名前にできます。
この名前によって、幾つかの項目名も変えて設定しないといけないので注意が必要です。
<property> <name>dfs.cluster.administrators</name> <value>hdfs</value> </property> <property> <name>dfs.blocksize</name> <value>134217728</value> </property> <property> <name>dfs.client.read.shortcircuit</name> <value>true</value> </property> <property> <name>dfs.client.read.shortcircuit.streams.cache.size</name> <value>4096</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/var/hadoop/dfs/dn</value> <final>true</final> </property> <property> <name>dfs.journalnode.edits.dir</name> <value>/var/hadoop/dfs/jn</value> </property> <property> <name>dfs.domain.socket.path</name> <value>/var/hadoop/dfs/dn_socket</value> </property> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> <property> <name>dfs.internal.nameservices</name> <value>k-deltalake</value> </property> <property> <name>dfs.client.failover.proxy.provider.k-deltalake</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <property> <name>dfs.ha.namenodes.k-deltalake</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.http-address.k-deltalake.nn1</name> <value>k-deltalake-m1.testdelta.com:50070</value> </property> <property> <name>dfs.namenode.http-address.k-deltalake.nn2</name> <value>k-deltalake-m2.testdelta.com:50070</value> </property> <property> <name>dfs.namenode.https-address.k-deltalake.nn1</name> <value>k-deltalake-m1.testdelta.com:50470</value> </property> <property> <name>dfs.namenode.https-address.k-deltalake.nn2</name> <value>k-deltalake-m2.testdelta.com:50470</value> </property> <property> <name>dfs.namenode.rpc-address.k-deltalake.nn1</name> <value>k-deltalake-m1.testdelta.com:8020</value> </property> <property> <name>dfs.namenode.rpc-address.k-deltalake.nn2</name> <value>k-deltalake-m2.testdelta.com:8020</value> </property> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://k-deltalake-m1.testdelta.com:8485;k-deltalake-m2.testdelta.com:8485;k-deltalake-m3.testdelta.com:8485/k-deltalake</value> </property> <property> <name>dfs.permissions.enabled</name> <value>true</value> </property> <property> <name>dfs.permissions.superusergroup</name> <value>hdfs</value> </property> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.hosts.exclude</name> <value>/etc/hadoop/dfs-exclude.hosts</value> </property> <property> <name>dfs.namenode.checkpoint.dir</name> <value>/var/hadoop/dfs/snn</value> </property> <property> <name>dfs.namenode.handler.count</name> <value>100</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/var/hadoop/dfs/nn</value> <final>true</final> </property> <property> <name>dfs.hosts.exclude</name> <value>/etc/hadoop/dfs-exclude.hosts</value> </property>
dfs.hosts.excludeの項目で設定しているファイルを予め作ります。今回は使わないので空ファイルです。
# k-deltalake-m1, m2で実施 # hdfsユーザー touch /etc/hadoop/dfs-exclude.hosts
hdfsユーザーの.bash_profileを設定します。hdfs-env.shと内容が被っていて冗長な気もしますがとりあえず以下で設定します。
cat << _EOF_ >> /home/hdfs/.bash_profile export JAVA_HOME=/usr/lib/jvm/java export HADOOP_HOME=/usr/hadoop/current export HADOOP_MAPRED_HOME=\${HADOOP_HOME} export HADOOP_YARN_HOME=\${HADOOP_HOME} export HADOOP_LIBEXEC_DIR=\${HADOOP_HOME}/libexec export PATH=\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:\${PATH} _EOF_
hdfsユーザーの.bash_profileを読み直して、HDFSのためのフォーマット処理を実施します。
# k-deltalake-m1で実施 # hdfsユーザーで実施 hdfs zkfc -formatZK start-dfs.sh hdfs namenode -format stop-dfs.sh && start-dfs.sh
マスターノード2のNameNodeが起動できないはずなので、Standbyとして起動します。
# k-deltalake-m2で実施 # hdfsユーザーで実施 hdfs namenode -bootstrapStandby hadoop-daemon.sh --config /etc/hadoop start namenode
うまくいってなくても何も返してこなかったりするので、HDFSのログやプロセスの確認は必須です。これでHDFSをHA構成で起動できました!
5. YARNの構築&HA
HDFSが構築できたら次はYARNです。今回YARNのResourceManagerをマスターノード2と3に立てる構成にするので、それぞれのノードのyarnユーザーがノーパスワードでサーバーにsshできるようにします。
- k-deltalake-m2からk-deltalake-m3, s1, s2, s3にyarnユーザーでノーパスワードでssh可能に
- k-deltalake-m3からk-deltalake-m2, s1, s2, s3にyarnユーザーでノーパスワードでssh可能に
終わったらyarn-env.shの環境変数を設定します。HDFSのときみたいに色々参考にして以下の項目について設定を行いました。
vi yarn-env.sh # yarn-env.shの以下の箇所を修正 ------------------------------------------------------------ export JAVA_HOME=/usr/lib/jvm/java export HADOOP_YARN_HOME=/usr/hadoop/current export HADOOP_LOG_DIR=/var/log/yarn export HADOOP_SECURE_LOG_DIR=/var/log/yarn export HADOOP_PID_DIR=/var/run/yarn export HADOOP_SECURE_PID_DIR=/var/run/yarn export HADOOP_LIBEXEC_DIR=${HADOOP_YARN_HOME}/libexec export HADOOP_LOGLEVEL=${HADOOP_LOGLEVEL:-INFO} export HADOOP_ROOT_LOGGER=${HADOOP_ROOT_LOGGER:-INFO,console} export HADOOP_DAEMON_ROOT_LOGGER=${HADOOP_DAEMON_ROOT_LOGGER:-${HADOOP_LOGLEVEL},EWMA,RFA} export HADOOP_YARN_USER=${HADOOP_YARN_USER:-yarn} export HADOOP_OPTS="$HADOOP_OPTS -Dyarn.id.str=${HADOOP_YARN_USER} -Dyarn.policy.file=hadoop-policy.xml" ### # Resource Manager specific parameters ### export YARN_RESOURCEMANAGER_HEAPSIZE=1024 export YARN_RESOURCEMANAGER_OPTS="$YARN_RESOURCEMANAGER_OPTS -Dyarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY -Drm.audit.logger=INFO,RMAUDIT" ### # Node Manager specific parameters ### export YARN_NODEMANAGER_HEAPSIZE=2048 export YARN_NODEMANAGER_OPTS="$YARN_NODEMANAGER_OPTS -Dnm.audit.logger=INFO,NMAUDIT" export YARN_TIMELINE_HEAPSIZE=4096 ### # Registry DNS specific parameters ### export YARN_REGISTRYDNS_SECURE_USER=yarn export YARN_REGISTRYDNS_SECURE_EXTRA_OPTS="-jvm server" ------------------------------------------------------------
それからyarn-site.xmlを設定します。これがhdfs-site.xmlのときみたいかそれ以上に設定項目があり、本番運用のときは相当調べないといけないと思いますが、今回は主に以下の項目を追加しました。もちろん、</configuration></configuration>の間に追加です。
ポイントはやはりResourceManagerのHA構成に関する設定で、yarn.resourcemanager.ha.rm-idsで指定したid(ここではrm1, rm2)を名前につけた項目たちです。それからyarn.scheduler.(minimum|maximum)-allocation-(mb|vcores)の項目あたりでしょうか。本番運用のときはチューニングで必ず適切な値を設定することになると思います。
今回の検証ではYARNでtimelineサービスを使うのはやめました。
<property> <name>yarn.resourcemanager.zk-address</name> <value>k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181</value> </property> <property> <name>hadoop.registry.zk.quorum</name> <value>k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181</value> </property> <property> <name>yarn.admin.acl</name> <value>yarn</value> </property> <property> <name>yarn.application.classpath</name> <value>$HADOOP_CONF_DIR,/usr/hadoop/current/share/hadoop/*,/usr/hadoop/current/share/hadoop/client/*,/usr/hadoop/current/share/hadoop/common/*,/usr/hadoop/current/share/hadoop/common/lib/*,/usr/hadoop/current/share/hadoop/hdfs/*,/usr/hadoop/current/share/hadoop/hdfs/lib/*,/usr/hadoop/current/share/hadoop/mapreduce/*,/usr/hadoop/current/share/hadoop/mapreduce/lib/*,/usr/hadoop/current/share/hadoop/tools/lib/*,/usr/hadoop/current/share/hadoop/yarn/*,/usr/hadoop/current/share/hadoop/yarn/lib/*</value> </property> <property> <name>yarn.log.server.url</name> <value>http://k-deltalake-m3.testdelta.com:19888/jobhistory/logs</value> </property> <property> <name>yarn.log.server.web-service.url</name> <value>http://k-deltalake-m3.testdelta.com:8188/ws/v1/applicationhistory</value> </property> <property> <name>yarn.nodemanager.address</name> <value>0.0.0.0:45454</value> </property> <property> <name>yarn.nodemanager.bind-host</name> <value>0.0.0.0</value> </property> <property> <name>yarn.nodemanager.local-dirs</name> <value>/var/hadoop/yarn/local</value> </property> <property> <name>yarn.nodemanager.log-dirs</name> <value>/var/log/yarn</value> </property> <property> <name>yarn.nodemanager.recovery.dir</name> <value>/var/hadoop/yarn/recovery</value> </property> <property> <name>yarn.nodemanager.recovery.enabled</name> <value>true</value> </property> <property> <name>yarn.nodemanager.recovery.supervised</name> <value>true</value> </property> <property> <name>yarn.nodemanager.remote-app-log-dir</name> <value>/app-logs</value> </property> <property> <name>yarn.resourcemanager.address</name> <value>k-deltalake-m2.testdelta.com:8050</value> </property> <property> <name>yarn.resourcemanager.admin.address</name> <value>k-deltalake-m2.testdelta.com:8141</value> </property> <property> <name>yarn.resourcemanager.bind-host</name> <value>0.0.0.0</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>yarn-cluster</value> </property> <property> <name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name> <value>/yarn-leader-election</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>k-deltalake-m2.testdelta.com</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>k-deltalake-m2.testdelta.com</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>k-deltalake-m3.testdelta.com</value> </property> <property> <name>yarn.resourcemanager.nodes.exclude-path</name> <value>/etc/hadoop/yarn-exclude.nodes</value> </property> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>k-deltalake-m2.testdelta.com:8025</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>k-deltalake-m2.testdelta.com:8030</value> </property> <property> <name>yarn.resourcemanager.scheduler.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> </property> <property> <name>yarn.resourcemanager.scheduler.monitor.enable</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <property> <name>yarn.resourcemanager.webapp.address</name> <value>k-deltalake-m2.testdelta.com:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>k-deltalake-m2.testdelta.com:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>k-deltalake-m3.testdelta.com:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.https.address</name> <value>k-deltalake-m2.testdelta.com:8090</value> </property> <property> <name>yarn.resourcemanager.webapp.https.address.rm1</name> <value>k-deltalake-m2.testdelta.com:8090</value> </property> <property> <name>yarn.resourcemanager.webapp.https.address.rm2</name> <value>k-deltalake-m3.testdelta.com:8090</value> </property> <property> <name>yarn.resourcemanager.webapp.cross-origin.enabled</name> <value>true</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>12288</value> </property> <property> <name>yarn.scheduler.minimum-allocation-vcores</name> <value>1</value> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>4</value> </property> <property> <name>yarn.timeline-service.enabled</name> <value>false</value> </property> <property> <name>yarn.webapp.api-service.enable</name> <value>true</value> </property> <property> <name>yarn.webapp.ui2.enable</name> <value>true</value> </property>
yarn.resourcemanager.nodes.exclude-pathの項目で設定しているファイルを予め作ります。今回は使わないので空ファイルです。
# k-deltalake-m2, m3で実施 # yarnユーザー touch /etc/hadoop/yarn-exclude.nodes
YARNキューにはCapacity Schedulerを使うので、その設定ファイルのcapacity-scheduler.xmlに今回以下の項目だけ変更を加えてみました。
<property> <name>yarn.scheduler.capacity.maximum-am-resource-percent</name> <value>0.2</value> <description> Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. </description> </property> <property> <name>yarn.scheduler.capacity.root.capacity</name> <value>100</value> </property> <property> <name>yarn.scheduler.capacity.root.acl_administer_queue</name> <value>*</value> </property> <property> <name>yarn.scheduler.capacity.root.acl_submit_applications</name> <value>*</value> </property> <property> <name>yarn.scheduler.capacity.root.acl_application_max_priority</name> <value>*</value> </property> <property> <name>yarn.scheduler.capacity.root.ordering-policy</name> <value>priority-utilization</value> </property> <property> <name>yarn.scheduler.capacity.root.priority</name> <value>0</value> </property> <property> <name>yarn.scheduler.capacity.root.default.priority</name> <value>0</value> </property>
yarnユーザーの.bash_profileを設定します。yarn-env.shと内容が被っていますがhdfsユーザーのときと同様に設定します。
cat << _EOF_ >> /home/yarn/.bash_profile export JAVA_HOME=/usr/lib/jvm/java export HADOOP_HOME=/usr/hadoop/current export HADOOP_MAPRED_HOME=\${HADOOP_HOME} export HADOOP_YARN_HOME=\${HADOOP_HOME} export HADOOP_LIBEXEC_DIR=\${HADOOP_HOME}/libexec export PATH=\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:\${PATH} _EOF_
yarnユーザーの.bash_profileを読み込みなおしてYARNを起動します。
# k-deltalake-m2で実施 # yarnユーザーで実施 start-yarn.sh # ResourceManagerを確認 yarn rmadmin -getServiceState rm1 yarn rmadmin -getServiceState rm2
YARN用のサンプルジョブ実施してみます。
# k-deltalake-m2で実施 yarn jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar pi 16 1000
エラー発生がしてしまいました。。hdfsのパーミッションやディレクトリが整ってなかったことが原因だったので、以下を行ってもう一度サンプルジョブを動かします。
# hdfsユーザー hdfs dfs -chown hdfs:hadoop / hdfs dfs -chmod 775 / hdfs dfs -mkdir /tmp hdfs dfs -chmod 777 /tmp hdfs dfs -mkdir /app-logs hdfs dfs -chown yarn:hadoop /app-logs
サンプルジョブが成功したらYARN構築も完了です!
6. Sparkのインストール&構築
HDFSとYARNの構築が終わったら、今度はSparkの環境を構築していきます。
Sparkも最新のSpark3.0.0をインストールすることにしました。hadoopやzookeeperと同様、バイナリをダウンロードし展開します。
# 全サーバーで実施 cd /var/tmp wget https://ftp.riken.jp/net/apache/spark/spark-3.0.0/spark-3.0.0-bin-without-hadoop.tgz tar zxvf spark-3.0.0-bin-without-hadoop.tgz mkdir /usr/spark mv spark-3.0.0-bin-without-hadoop /usr/spark/ chown -R spark:hadoop /usr/spark cd /usr/spark ln -s spark-3.0.0-bin-without-hadoop current cd /etc ln -s /usr/spark/current/conf spark mkdir /var/log/spark mkdir /var/run/spark chown -R spark:hadoop /var/log/spark /var/run/spark
インストールが終わったら、Sparkの環境変数をspark-env.shを作成して設定します。
cd /usr/spark/current/conf cp -p spark-env.sh.template spark-env.sh cat << _EOF_ >> spark-env.sh export SPARK_DIST_CLASSPATH=\$(hadoop classpath) export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=/var/run/spark export SPARK_DAEMON_MEMORY=2048m export SPARK_IDENT_STRING=\$USER export HADOOP_HOME=/usr/hadoop/current export HADOOP_CONF_DIR=\${HADOOP_HOME}/conf export JAVA_HOME=/usr/lib/jvm/java export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_HOME}/lib/native export HADOOP_OPTS="\${HADOOP_OPTS} -Djava.library.path=\${HADOOP_COMMON_LIB_NATIVE_DIR}" export LD_LIBRARY_PATH="\${LD_LIBRARY_PATH}:\${HADOOP_COMMON_LIB_NATIVE_DIR}" _EOF_
続いて、Sparkの各種パラメータをspark-defaults.confを作成して設定します。
cp -p spark-defaults.conf.template spark-defaults.conf cat << _EOF_ >> spark-defaults.conf spark.driver.extraLibraryPath /usr/hadoop/current/lib/native spark.eventLog.dir hdfs:///spark-logs/ spark.eventLog.enabled true spark.executor.extraJavaOptions -XX:+UseNUMA spark.executor.extraLibraryPath /usr/hadoop/current/lib/native spark.history.fs.cleaner.enabled true spark.history.fs.cleaner.interval 7d spark.history.fs.cleaner.maxAge 90d spark.history.fs.logDirectory hdfs:///spark-logs/ spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider spark.history.ui.port 18080 spark.io.compression.lz4.blockSize 128kb spark.master yarn spark.shuffle.file.buffer 1m spark.shuffle.io.backLog 8192 spark.shuffle.io.serverThreads 128 spark.shuffle.unsafe.file.output.buffer 5m spark.sql.autoBroadcastJoinThreshold 26214400 spark.sql.statistics.fallBackToHdfs true spark.sql.warehouse.dir /apps/spark/warehouse spark.unsafe.sorter.spill.reader.buffer.size 1m spark.yarn.historyServer.address k-deltalake-m3.testdelta.com:18080 spark.yarn.queue default _EOF_
sparkユーザーの.bash_profileを設定します。冗長な気もしつつ設定しておきます。
cat << _EOF_ >> /home/spark/.bash_profile export JAVA_HOME=/usr/lib/jvm/java export SPARK_HOME=/usr/spark/current export HADOOP_HOME=/usr/hadoop/current export PATH=\${SPARK_HOME}/bin:\${SPARK_HOME}/sbin:\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:${PATH} _EOF_
HDFS上に/user/sparkを作るか、または/user配下に書き込める権限を与えておかないとSparkを動かすときにこけてしまうので、権限を付与しておきます。
hdfs dfs -chown hdfs:hadoop /user hdfs dfs -chmod 775 /user
また、HDFS上に設定ファイルで指定した/spark-logsというディレクトリが必要になるので作っておきます。
hdfs dfs -mkdir /spark-logs hdfs dfs -chown spark:hadoop /spark-logs
yarn-site.xmlに以下のSpark用の項目を修正または追記して、YARNを再起動しておきます。
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name> <value>/usr/spark/current/yarn/*</value> </property>
設定が一通り済んだら、マスターノード3でSpark Historyサーバーを起動します。
# k-deltalake-m3で実施 # sparkユーザーで実施 start-history-server.sh
Spark環境ができたので、spark-submitでサンプルプログラムをYARN上で動かしてみます。
# k-deltalake-m3で実施 # sparkユーザーで実施 spark-submit --class org.apache.spark.examples.SparkPi \ --deploy-mode cluster \ --driver-memory 2g \ --executor-memory 2g \ --executor-cores 1 \ ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.0.jar \ 10
そして、エラー。やっぱり設定が多いものは初回実行はエラーがつきものですね。。
ERROR ui.SparkUI: Failed to bind SparkUI java.lang.IllegalStateException: class org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter is not a javax.servlet.Filter
このエラーの解決策がわからずすごいハマりましたが、–conf spark.ui.enabled=falseをつけたら取り敢えず回避できたのでそうしました。 根本的な解決策ではないのでこれは今後要調査です。
# k-deltalake-m3で実施 # sparkユーザーで実施 spark-submit --class org.apache.spark.examples.SparkPi \ --conf spark.ui.enabled=false \ --deploy-mode cluster \ --driver-memory 2g \ --executor-memory 2g \ --executor-cores 1 \ ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.0.jar \ 10
サンプルプログラムをspark-submitで動かせたら、これでようやくHadoopとSparkの環境が出来上がりです!
YARNのクラスタモードでspark-submitできるところまできました!
4. Delta Lakeを検証
幾多の設定を乗り越えてHadoopおよびSpark環境が構築できたので、ついにDelta Lakeの検証にいきたいと思います。
1. PySparkでSQL操作
最初どうやってDelta Lakeを使うかわからなかったんですが、以下のようにpysparkに接続するときに–packagesや–confを設定するだけで使うことができました。
# k-deltalake-m3で実施 # sparkユーザーで実施 pyspark \ --packages io.delta:delta-core_2.12:0.7.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore"
pysparkに接続できたらSparkSessionを作り、Delta Lakeフォーマットのテーブルを書き込みます。
>>> from pyspark import SparkContext from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions from pyspark.sql.functions import * from py4j.java_collections import MapConverter from delta.tables import * import shutil import threading >>> spark = SparkSession \ .builder \ .appName("test-deltalake") \ .getOrCreate() >>> print("############# Creating a table ###############") data = spark.range(0, 5) data.write.format("delta").save("/tmp/delta-table")
今度は書き込んだDelta Lakeのテーブルを読み込みます。
>>> print("############ Reading the table ###############") df = spark.read.format("delta").load("/tmp/delta-table") df.show() +---+ | id| +---+ | 2| | 3| | 4| | 0| | 1| +---+
あっさりできてしまっていい感じです。
Delta Lakeはこの読み書きの書き方が基本なんだと思いますが、もっと使いやすいようにRDBのテーブルっぽくSQLで扱えるようにしたいところ。先に.saveで書き込んだ場合は後からcreateOrReplaceTempViewを使えばできます。
df.createOrReplaceTempView("delta_table") sqlDF = spark.sql("SELECT * FROM delta_table WHERE id > 1") +---+ | id| +---+ | 2| | 3| | 4| +---+
SQL使えるのはやっぱり使いやすいですね。ただもっとRDBのテーブルっぽく扱うにはテーブル作成自体もSQLでやれないと今一つと思うので試してみます。Delta Lakeはテーブルのパーティションにも対応していて良い感じです。
ddl = """ CREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING ) USING DELTA """ spark.sql(ddl) ddl = """ CREATE TABLE events_p ( date DATE, eventId STRING, eventType STRING, data STRING ) USING DELTA PARTITIONED BY (date) """ spark.sql(ddl)
テーブルのレコードがないと何も始まらないので、INSERT文で格納します。
dml = """ INSERT INTO events VALUES ('2020-02-21', 'J1-01-01', 'sports', '湘南ベルマーレvs浦和レッズ'), ('2020-02-22', 'J1-01-05', 'sports', 'セレッソ大阪vs大分トリニータ'), ('2020-02-23', 'J1-01-06', 'sports', '清水エスパルスvsFC東京') """ spark.sql(dml) spark.sql("SELECT * FROM events ORDER BY date, eventId").show() +----------+--------+---------+----------------------------+ | date| eventId|eventType| data| +----------+--------+---------+----------------------------+ |2020-02-21|J1-01-01| sports| 湘南ベルマーレvs浦和レッズ| |2020-02-22|J1-01-05| sports|セレッソ大阪vs大分トリニータ| |2020-02-23|J1-01-06| sports| 清水エスパルスvsFC東京| +----------+--------+---------+----------------------------+
INSERT後に実際のデータファイルを見てみると以下のような構成になってました。ちなみにテーブル作成したばかりの時点ではparquetファイルはなく、_delta_logディレクトリが配下にあるだけです。
# hdfsユーザー hdfs dfs -ls /apps/spark/warehouse/events Found 3 items drwxr-xr-x - spark hadoop 0 2020-07-05 11:19 /apps/spark/warehouse/events/_delta_log -rw-r--r-- 3 spark hadoop 1443 2020-07-05 11:19 /apps/spark/warehouse/events/part-00000-59e05afe-4e87-4252-9d7f-d11ea610913a-c000.snappy.parquet -rw-r--r-- 3 spark hadoop 1318 2020-07-05 11:19 /apps/spark/warehouse/events/part-00001-cc028cfa-4c31-4cbf-87f6-7bf96d05e984-c000.snappy.parquet
events_pテーブルについて上記と同じ内容のINSERT文を実施すると、パーティショニングされてデータが格納されるのがわかります。
# hdfsユーザー hdfs dfs -ls /apps/spark/warehouse/events_p Found 4 items drwxr-xr-x - spark hadoop 0 2020-07-05 11:20 /apps/spark/warehouse/events_p/_delta_log drwxr-xr-x - spark hadoop 0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-21 drwxr-xr-x - spark hadoop 0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-22 drwxr-xr-x - spark hadoop 0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-23 hdfs dfs -ls /apps/spark/warehouse/events_p/date* Found 1 items -rw-r--r-- 3 spark hadoop 1244 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-21/part-00000-0281547d-7327-43cd-9bd1-5c51722088ca.c000.snappy.parquet Found 1 items -rw-r--r-- 3 spark hadoop 1271 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-22/part-00001-98283bac-128c-44fc-8a2e-889ed6589193.c000.snappy.parquet Found 1 items -rw-r--r-- 3 spark hadoop 1181 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-23/part-00001-c65f3f0d-fe13-4d28-9bed-fb9ced662e95.c000.snappy.parquet
続けてUPDATE文を試してみます。従来のビッグデータ技術が対応してなかったのがUPDATEで、1レコードだけ修正して直したいようなときでもデータの塊を取り出して修正しまた塊を戻したりしないといけませんでした。
dml = """ UPDATE events_p SET data = concat(data, ':スコア(1-3):FC東京WIN') WHERE eventId = 'J1-01-06' """ spark.sql(dml) dml = """ UPDATE events_p SET data = concat(data, ':スコア(2-3):浦和レッズWIN') WHERE eventId = 'J1-01-01' """ spark.sql(dml) dml = """ UPDATE events_p SET data = concat(data, ':スコア(1-0):セレッソ大阪WIN') WHERE eventId = 'J1-01-05' """ spark.sql(dml)
Hiveも3系になって最初からUPDATE対応になりましたが、Delta LakeもUPDATEできました。さらに便利なのはUPSERT(レコードがあればUPDATE、なければINSERT)に対応していることです。Delta LakeでUPSERTを行うにはMERGE文を使います。
MERGE文の検証として、events_pテーブルにレコードを3件追加した後、MERGE文でeventsテーブルにevents_pテーブルの内容を反映させます。MERGE文が成功すると、eventテーブルはまだ一回もUPDATEしていないので、既存の3レコードはevents_pテーブルの内容にUPDATE、入ってないレコードはevents_pテーブルからINSERTされるはずです。
dml = """ INSERT INTO events_p VALUES ('2020-02-22', 'J1-01-02', 'sports', 'ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW'), ('2020-02-22', 'J1-01-03', 'sports', '柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN'), ('2020-02-23', 'J1-01-04', 'sports', '川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW') """ spark.sql(dml) spark.sql("SELECT * FROM events_p ORDER BY date, eventId").show(truncate=False) +----------+--------+---------+----------------------------------------------------------+ |date |eventId |eventType|data | +----------+--------+---------+----------------------------------------------------------+ |2020-02-21|J1-01-01|sports |湘南ベルマーレvs浦和レッズ:スコア(2-3):浦和レッズWIN | |2020-02-22|J1-01-02|sports |ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW | |2020-02-22|J1-01-03|sports |柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN | |2020-02-22|J1-01-05|sports |セレッソ大阪vs大分トリニータ:スコア(1-0):セレッソ大阪WIN| |2020-02-23|J1-01-04|sports |川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW | |2020-02-23|J1-01-06|sports |清水エスパルスvsFC東京:スコア(1-3):FC東京WIN | +----------+--------+---------+----------------------------------------------------------+ dml = """ MERGE INTO events e USING events_p ep ON e.eventId = ep.eventId WHEN MATCHED THEN UPDATE SET e.data = ep.data WHEN NOT MATCHED THEN INSERT * """ spark.sql(dml) spark.sql("SELECT * FROM events ORDER BY date, eventId").show(truncate=False) +----------+--------+---------+----------------------------------------------------------+ |date |eventId |eventType|data | +----------+--------+---------+----------------------------------------------------------+ |2020-02-21|J1-01-01|sports |湘南ベルマーレvs浦和レッズ:スコア(2-3):浦和レッズWIN | |2020-02-22|J1-01-02|sports |ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW | |2020-02-22|J1-01-03|sports |柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN | |2020-02-22|J1-01-05|sports |セレッソ大阪vs大分トリニータ:スコア(1-0):セレッソ大阪WIN| |2020-02-23|J1-01-04|sports |川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW | |2020-02-23|J1-01-06|sports |清水エスパルスvsFC東京:スコア(1-3):FC東京WIN | +----------+--------+---------+----------------------------------------------------------+
MERGE文を実行したときに以下のエラーが出てしまったんですが、処理は想定通りにできていました。どうもこのバグのようですが、このあたりは自分の設定が悪いのかDelta Lakeがまだ不安定なのか要観察です。
。
ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception java.lang.ClassCastException: java.util.Collections$SynchronizedSet cannot be cast to java.util.List
ここまでで触れてなかったですが、一回あたりのSQL処理時間は今回の検証では体感で数秒~数十秒くらいでした。
2. Rollbackについて
Delta Lakeはトランザクション対応とあるので、Rollbackをどうするのか調べてみました。
結論から言うと、RDBのRollbackとは全然違います。Delta Lakeはテーブルが更新されると更新履歴とその値を一定時間保持しているため、タイムスタンプかバージョン番号を覚えておくことで更新前の値を取り出すことができます。これを使って自分でRollbackするというわけです。
正直イケてるとは言い難いですが、ビッグデータの技術はそもそも今までINSERTのみが基本でUPDATEやRollbackの機能がないか制約付きだったことを思えば、悪くはないと思います。DESCRIBE HISTORY文を使うと、そのテーブルの更新履歴をとることができます。
spark.sql("DESCRIBE HISTORY events_p").show(truncate=False) +-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+ |version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata| +-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+ |3 |2020-07-05 14:53:28.639|null |null |UPDATE |[predicate -> (eventId#4213 = J1-01-06)] |null|null |null |2 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]|null | |2 |2020-07-05 11:36:41.243|null |null |UPDATE |[predicate -> (eventId#2056 = J1-01-06)] |null|null |null |1 |null |false |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]|null | |1 |2020-07-05 11:20:25.17 |null |null |WRITE |[mode -> Append, partitionBy -> []] |null|null |null |0 |null |true |[numFiles -> 3, numOutputBytes -> 3696, numOutputRows -> 3] |null | |0 |2020-07-05 10:12:34.831|null |null |CREATE TABLE|[isManaged -> true, description ->, partitionBy -> ["date"], properties -> {}]|null|null |null |null |null |true |[] |null | +-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+
このversionかtimestampの値をみて、そのときのデータを取り出すことができます。versionが1のときのデータを取り出してみます。
df = spark.read.format("delta").load("/apps/spark/warehouse/events_p@v1") df.show() +----------+--------+---------+----------------------------+ | date| eventId|eventType| data| +----------+--------+---------+----------------------------+ |2020-02-22|J1-01-05| sports|セレッソ大阪vs大分トリニータ| |2020-02-21|J1-01-01| sports| 湘南ベルマーレvs浦和レッズ| |2020-02-23|J1-01-06| sports| 清水エスパルスvsFC東京| +----------+--------+---------+----------------------------+
更新前のデータの取り出しはSQLでもできるはずですが、今回はSQLだと何故か上手く行きませんでした。
上記のようにテーブルの実体があるHDFS上のディレクトリを指定して、ディレクトリ名の後ろに「@v{バージョン番号}」をつけるだけで取り出せます。取り出したv1データのデータフレームに対して以下のような処理すれば特定のレコードに対して自作Rollbackができます。
rollbackRow = df.select("eventId","data").where(col("eventId") == "J1-01-06").collect() rbValue = rollbackRow[0]["data"] dml = "UPDATE events_p SET data = '" + rbValue + "' WHERE eventId = 'J1-01-06' " spark.sql(dml)
3. spark-submitでDelta Lakeを操作
Sparkの真骨頂は分散処理にあると思いますが、つまりそれはspark-submitのクラスタモードでDelta Lakeが操作できるということなので、最後にspark-submitを使ってHadoop環境上の分散処理を検証してみました。Delta Lakeを操作するPythonファイルを作り、それを呼ぶspark-submitのシェルスクリプトを用意します。
pysparkコマンドのときと同様に–packagesや–confで必要な項目を指定します。–master yarn –deploy-mode cluster と指定することでYARNを使ったHadoopクラスタ上の分散処理になります。
# k-deltalake-m3 # sparkユーザー cd ~ cat << _EOF_ > test_deltalake.sh #!/usr/bin/bash spark-submit \\ --master yarn \\ --deploy-mode cluster \\ --num-executors 1 \\ --executor-memory 2G \\ --executor-cores 1 \\ --driver-memory 2G \\ --packages io.delta:delta-core_2.12:0.7.0 \\ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \\ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \\ --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore" \\ --conf "spark.ui.enabled=false" \\ test_deltalake.py 2>&1 |tee test_deltalake.log _EOF_
spark-submitで呼び出すPythonファイルは以下です。events_pテーブルのeventTypeカラムの値を「sports」から「sport/soccer」に一括更新する単純な処理です。
ここでポイントは、createOrReplaceTempViewを使っていることです。本来なら何もしないでもevent_pというテーブル名を使ってSQLの操作をしたいのですが、どうもCREATE TABLEを行ったセッションの中ではそのテーブル名で何でもSQL操作できますが、別のセッションになると認識してくれませんでした。
HiveやRDBなら接続したらメタ情報を取ってきて困ることはないのですが、Delta Lakeはそうではないようです。このあたりはまだ調べ切れていないので、createOrReplaceTempViewをしないでもメタ情報を取ってくるスマートなやり方があるかもしれないです。
cat << _EOF_ > test_deltalake.py from pyspark import SparkContext from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions from pyspark.sql.functions import * from py4j.java_collections import MapConverter from delta.tables import * spark = SparkSession \\ .builder \\ .appName("test-deltalake") \\ .getOrCreate() df = spark.read.format("delta").load("/apps/spark/warehouse/events_p") df.createOrReplaceTempView("events_p") q = "SELECT * FROM events_p ORDER BY date, eventId" spark.sql(q).show(truncate=False) dml = "UPDATE events_p SET eventType = concat(eventType, '/soccer')" spark.sql(dml) spark.sql(q).show(truncate=False) _EOF_
準備ができたら、用意したtest_deltalake.shを叩いてspark-submitを実行します。実行したところ、またしてもエラーが。。
UnicodeEncodeError: 'ascii' codec can't encode characters in position 304-310: ordinal not in range(128)
このページを参考に、上記のPythonファイル(test_delta_lake.py)に下記を追記しました。
import sys reload(sys) sys.setdefaultencoding('utf-8')
再度実行したところ、エラーもなく、ついにHadoop環境上でDelta Lakeのテーブルに対してSparkの分散処理を行えるようになりました!!
5. まとめ
1. Delta Lakeの感想
Delta Lakeを使うのはシンプルなので、自分たちでSparkに統一したビッグデータ解析基盤を作る選択肢としては悪くないように思いました。昨今のデータ解析はJupyter Notebookを使ってPythonでやるのが最も多いユースケースと思いますが、Pythonと親和性が高くSQLで操作できるのは強力です。
本番運用を考えるとHadoop環境(HDFS+YARN)を使った方が良いと思いますが、Hadoop環境の構築・運用を自前でやるのはHDFSとYARNだけでもかなりしんどいです。。YARNを使わないでHDFSだけ使うのもまだありかもしれません。
いずれにしても一番困るのはHadoop環境の運用で、設定ファイルを変更したときの対象サーバーへの配布だったり、各プロセスの起動・停止を一元管理したりできないと運用が回らなくなりそうです。どうやってやるかが自前でやるときの大きな課題ですね。
2. 今後の課題
Delta Lakeの検証でまだできていない大事なことは、
- トランザクションの更なる検証(更新時のロックなど)
- 大量のデータを入れた巨大テーブルの検証(処理性能や更新履歴の肥大化など)
- ODBC/JDBCを使ったBIツールとの連携(できるかできないか含めて)
このあたりでしょうか。Spark + Delta Lakeの組み合わせは今後も注目していきたいと思います。
最後に
次世代システム研究室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。 皆さんのご応募をお待ちしています。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD