2020.07.08

ビッグデータの新しい基盤としてHadoop上でSpark+Delta Lakeを検証してみた

こんにちは。次世代システム研究室のデータベース と Hadoop を担当している M.K. です。

前回データベースのテーマでPostgreSQL&Patroniを検証しましたが、今回はまたHadoop系のテーマでDelta Lakeを検証しました。

 

Delta LakeはDatabricks社が開発したApache Spark用のストレージレイヤーの技術です。去年オープンソース化になってまだそれほど日が経っていません。Databricks社はビッグデータを処理する技術としてとても良く使われているApache Sparkの生みの親が創業した会社ですね。  

目次

  1. 何故Delta Lakeか
  2. Delta Lakeを使うSparkとHadoopの環境準備
    1. 利用技術
    2. サーバースペック
    3. 最初Ambariを試してみた
    4. 前準備
  3. HadoopおよびSparkの環境構築
    1. Hadoop用ユーザー&グループ作成
    2. Hadoopのインストール
    3. Zookeeperのインストール&構築
    4. HDFSの構築&HA
    5. YARNの構築&HA
    6. Sparkのインストール&構築
  4. Delta Lakeを検証
    1. PySparkでSQL操作
    2. Rollbackについて
    3. spark-submitでDelta Lakeを操作
  5. まとめ
    1. Delta Lakeの感想
    2. 今後の課題

 

1. 何故Delta Lakeか

ビッグデータを溜めて解析する基盤として、ずっとオープンソースのHadoopとその関連技術を使ってきました。

 

今まではHadoop分野の大手ベンダー2社であるCloudera社やHortonwork社が出しているHadoopのオープンソースの統合パッケージ(前者がCDH、後者がHDP)を使うことが多かったのですが、両社が統合し新生Cloudera社となってパッケージをデプロイ&運用管理するツールがサブスクリプションを契約しないと事実上使うことができなくなりました。

 

そのような事情があり気軽に新技術が検証できなくなってしまったのと、以前の技術ブログでも何回かに渡って最新のHDPを検証してきましたが、Hadoop関連技術が発展・拡張するにつれて仕組みや設定がますます複雑化し運用が大変になってきたため、選択肢の一つとしてもう少しシンプルなソリューションも調べておこうと考えたのがきっかけです。

 

以前の技術ブログ:

 

自分の周りではビッグデータを解析する上で最も必要不可欠なのはSparkなので、Sparkだけでビッグデータ解析基盤を作れないかと思っていたところに、ちょうどDelta Lakeを見つけました。

 

元々SparkはHadoopと親和性が高く、冗長性の高い分散ファイルシステムのHDFSとその上でRDBのようなテーブル管理ができるHiveと連携できることが強みですが、Hiveが3系になって今までとはだいぶ違う製品になり、Sparkとの連携が今までとは同じようにはいかなくなってきたこととSparkだけでもある程度テーブルのような管理ができるので、Hiveを使わずにSparkだけでできるやり方を探していたらまさにそのためのDelta Lakeがありました。

 

前置きがちょっと長くなりましたが、Delta LakeはSpark用のテーブル管理の技術です。ストレージレイヤーの技術ですが独自のファイルシステムや分散処理の仕組みを持っているわけではありません。ファイルシステムはローカルファイルだったりHDFSを利用します。

Sparkのファイルフォーマットと言えばParquetが有名ですが、Delta Lakeも実はParquetファイルの集まりです。

 

HDFSを使わないのが最もシンプルに基盤構築できますが、やっぱり本番運用を見据えるとHDFSにデータを書き溜めることはとても強力なので、今回の検証ではHDFS上にDelta Lakeの環境を構築してSparkで操作してみようと思います。

 

※Hadoopの環境構築に関する章が長いので、時間のない方は第4章からどうぞ。

 

2. Delta Lakeを使うSparkとHadoopの環境準備

1. 利用技術

今回利用するミドルウェアは、以下のものでCentOS7.8で試しました。

  • Java 8 (java-1.8.0-openjdk) ※yumでインストール
  • Hadoop 3.2.1
  • Zookeeper 3.4.14
  • Spark 3.0.0
  • Python 2.7.5
  • Delta Lake 0.7 (delta-core_2.12)

 

2. サーバースペック

検証に当たりお馴染みの GMO アプリクラウドのサーバーを使いました。スペックは以下です。

  • マスターノード(主にNameNode, ResouceManager, Zookeeper)用のサーバー ×3
    • OS : CentOS 7.8
    • 仮想 CPU : 2
    • メモリ容量 : 8GB
    • ディスク容量 : 160GB
  • スレーブノード(主にDataNode)用のサーバー ×3
    • OS : CentOS 7.8
    • 仮想 CPU : 4
    • メモリ容量 : 16GB
    • ディスク容量 : 320GB

参考までに、GMOアプリクラウドと同じ技術を使ったより汎用的な Z.com クラウドもあるので興味のある方はぜひ。  

 

3. 最初Ambariを試してみた

上述したようにCDHやHDPを使わないソリューションも探すと言いましたが、Delta lakeの検証自体はHadoop環境であればできるので、実は当初は以前のブログで構築したHDP3.0.1を使おうと考えていました。

 

が、Delta LakeはSpark2.4.2以上のバージョンでないとダメで、HDP3.0.1のSparkは2.3.1だったため断念しました。

 

次にHadoop環境を自分で構築&運用するのは大変なので、ある程度それを手助けしてくれることを期待して最新のAmbariを落としてきて試してみました。AmbariはHortonwork社が出していたHDP向けのHadoop環境構築&運用ツールでオープンソースとして公開されているものです。GUIで操作することができます。

 

最新のAmbariの2.7.5をGitレポジトリから落としてきて、mavenでビルドしてエラーを解決しながらも完了し、前準備やAmbariセットアップも済んで、さあHadoop環境を作るぞというところで大きな問題が。

 

Ambari Serverを起動して最初にGUIの画面にログインすると、Hadoop環境構築Wizardの画面が用意されます。そこまでは順調でこのWizardを進めたんですが、手順1のSelect Versionのところで、HDPを構成するHadoopの各種コンポーネントを選ぶのですが、そのコンポーネントがあるレポジトリの参照先がCloudera社のサブスクリプションを購入した人だけがアクセスできるサイトに向けられていてここで完全に進めなくなりました。。

 

自分でビルドした各種コンポーネントを選んだり、このWizardを回避できれば良かったんですが、そのままではどうもできないみたいで、今回は自分でHadoop環境を構築することにしました。

 

4. 前準備

Ambariを試したときも同じですが、Hadoop環境を構築するのに事前にやっておくことをHadoopクラスタを組む全サーバーで実施しています。

  • Hadoopクラスタを組むサーバーのホスト名(FQDN)が書かれた/etc/hostsファイルを準備
    • ちなみに、今回のホスト名・構成は以下の通り。
      xx.xx.xx.101 k-deltalake-m1.testdelta.com k-deltalake-m1  # マスターノード1
      xx.xx.xx.102 k-deltalake-m2.testdelta.com k-deltalake-m2  # マスターノード2
      xx.xx.xx.103 k-deltalake-m3.testdelta.com k-deltalake-m3  # マスターノード3
      xx.xx.xx.111 k-deltalake-s1.testdelta.com k-deltalake-s1    # スレーブノード1
      xx.xx.xx.112 k-deltalake-s2.testdelta.com k-deltalake-s2    # スレーブノード2
      xx.xx.xx.113 k-deltalake-s3.testdelta.com k-deltalake-s3    # スレーブノード3
  • ntpによるサーバー時刻同期
  • 環境構築で使うOSパッケージ(curl、wget、rsync)のyum install
  • transparent_hugepagesの無効化
  • /etc/security/limits.confのリソース制限の変更
  • Javaのインストール(java-1.8.0-openjdk, java-1.8.0-openjdk-devel) ⇒今回はOracleでなくOpenJDKを利用

このあたりは前に書いたこちらのブログに詳細を書いていますので参照してみてください。

 

3. HadoopおよびSparkの環境構築

前準備が終わったら自分でHadoop環境を構築していきます。

1. Hadoop用ユーザー&グループ作成

一つのユーザーを使いまわすわけではなく、本番運用を見据えてそれぞれユーザーを作ることにしました。その代わりグループはhadoopというグループに所属させました。hdfsユーザーだけはHadoopの管理ユーザーにする予定なので、hdfsグループも作っています。

 

# 全サーバーで実施
groupadd hadoop
groupadd hdfs
useradd -N -g hadoop zookeeper
useradd -g hadoop -G hdfs hdfs
useradd -N -g hadoop yarn
useradd -N -g hadoop spark

 

2. Hadoopのインストール

どうせなら最新のHadoop3系を使いたいので、最新安定版のHadoop3.2.1を使います。インストール自体はバイナリをダウンロードして展開するだけで簡単です(大変なのは設定です・・)。必要なディレクトリも予め作成してパーミッションを整えておきます。

 

# 全サーバーで実施
cd /var/tmp/
wget https://ftp.riken.jp/net/apache/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz
tar zxvf hadoop-3.2.1.tar.gz
chown -R root:root hadoop-3.2.1

mkdir /usr/hadoop
mv hadoop-3.2.1 /usr/hadoop/
cd /usr/hadoop
ln -s hadoop-3.2.1 current
chown -R root:hadoop /usr/hadoop/hadoop-3.2.1
chmod 775 /usr/hadoop/current/etc/hadoop
cd /etc
ln -s /usr/hadoop/current/etc/hadoop hadoop

mkdir -p /var/log/{hadoop,yarn}
mkdir -p /var/run/{hadoop,yarn}
mkdir -p /var/hadoop/yarn/{local,recovery}

cat << _EOF_ > /etc/hadoop/workers
k-deltalake-s1.testdelta.com
k-deltalake-s2.testdelta.com
k-deltalake-s3.testdelta.com
_EOF_

# k-deltalake-m1, m2, m3で実施
mkdir -p /var/hadoop/dfs/{jn,nn,snn}

# k-deltalake-s1, s2, s3で実施
mkdir -p /var/hadoop/dfs/dn

# 全サーバーで実施
chown root:hadoop /var/hadoop
chown -R hdfs:hadoop /var/run/hadoop
chown -R hdfs:hadoop /var/log/hadoop
chown -R hdfs:hadoop /var/hadoop/dfs
chown -R root:hadoop /usr/hadoop/hadoop-3.2.1
chmod 775 /usr/hadoop/current/etc/hadoop
chown -R yarn:hadoop /var/log/yarn
chown -R yarn:hadoop /var/run/yarn
chmod 775 /var/log/hadoop
chmod 775 /var/log/yarn

 

3. Zookeeperのインストール&構築

次にZookeeperをインストールします。Zookeeperもバイナリをダウンロードして展開します。Zookeeperはあんまり新しすぎるとHadoopが対応していないかもしれないと思ったので、少し前の3.4系の最新版にしました。

ZookeeperはHadoop関連技術でとても良く使われる分散アプリケーションのためのコーディネーションエンジンで、HadoopのHA構成をするときなどは必須になります。

 

# k-deltalake-m1, m2, m3で実施
cd /var/tmp
wget https://ftp.riken.jp/net/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar zxvf zookeeper-3.4.14.tar.gz
chown -R root:root zookeeper-3.4.14
mkdir /usr/zookeeper
mkdir -p /var/hadoop/zookeeper
mkdir -p /var/log/zookeeper
mkdir -p /var/run/zookeeper
mv zookeeper-3.4.14 /usr/zookeeper/
cd /usr/zookeeper
ln -s zookeeper-3.4.14 current
cd /etc
ln -s /usr/zookeeper/current/conf zookeeper

 

続いて、Zookeeperの設定を行います。zoo.cfgファイルにZookepperのパラメータ値を書き込みます。

 

# k-deltalake-m1, m2, m3で実施
cat << _EOF_ > /etc/zookeeper/zoo.cfg
clientPort=2181
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=3000
dataDir=/var/hadoop/zookeeper
autopurge.snapRetainCount=30
server.1=k-deltalake-m1.testdelta.com:2888:3888
server.2=k-deltalake-m2.testdelta.com:2888:3888
server.3=k-deltalake-m3.testdelta.com:2888:3888
_EOF_

 

それから、Zookeeper用の環境変数をセットするシェルをzookeeper-env.shという名前で作成します。また、Zookeeperのクラスタを構成するノード固有の識別子をmyidファイルに書き込みます。

 

# k-deltalake-m1, m2, m3で実施
cat << _EOF_ > /usr/zookeeper/current/conf/zookeeper-env.sh
export JAVA_HOME=/usr/lib/jvm/java
export ZOOKEEPER_HOME=/usr/zookeeper/current
export ZOO_LOG_DIR=/var/log/zookeeper
export ZOOPIDFILE=/var/run/zookeeper/zookeeper_server.pid
export SERVER_JVMFLAGS=-Xmx1024m
export JAVA=$JAVA_HOME/bin/java
_EOF_
chmod 755 /usr/zookeeper/current/conf/zookeeper-env.sh

# k-deltalake-m1で実施
echo 1 > /var/hadoop/zookeeper/myid
# k-deltalake-m2で実施
echo 2 > /var/hadoop/zookeeper/myid
# k-deltalake-m3で実施
echo 3 > /var/hadoop/zookeeper/myid

 

以上の設定が終わったらZookepperを起動します。うまく起動できていればどのノードからも接続確認して同じ結果が返ってきます。

 

# k-deltalake-m1, m2, m3で実施
# zookeeperユーザーで実施
/usr/zookeeper/current/bin/zkServer.sh start

# 接続確認
/usr/zookeeper/current/bin/zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
=====
[zookeeper]
=====

 

4. HDFSの構築&HA

Zookepperが構築・起動できたら次はHadoopのHDFSを構築します。今回はHDFSのNameNodeをマスターノード1と2に立てる構成にするので、それぞれのノードのhdfsユーザーがノーパスワードでサーバーにssh接続できるようにします。

  • k-deltalake-m1からk-deltalake-m2, s1, s2, s3にhdfsユーザーでノーパスワードでssh可能に
  • k-deltalake-m2からk-deltalake-m1, s1, s2, s3にhdfsユーザーでノーパスワードでssh可能に

それが終わったらいよいよHDFSの設定を行っていきます。最初はhadoop-env.shをやります。

Hadoop(HDFSとYARN)の設定ファイル一式は$HADOOP_HOME/etc/hadoopに置かれています。今回そのディレクトリを/etc/hadoopへシンボリックリンクを張りました。

hadoop-env.shではHadoopの環境変数をセットします。Hadoop関連技術のJava/JVMのオプションは〇〇-env.shで環境変数にセットするので、本番運用するときはちゃんとやらないといけないところ。今回は色々参考にしながら主に以下の項目を設定してみました。

 

# k-deltalake-m1で実施

vi /etc/hadoop/hadoop-env.sh
# hadoop-env.shの以下の箇所を修正
------------------------------------------------------------
###
# Generic settings for HADOOP
###

export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/usr/hadoop/current
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_HEAPSIZE_MAX=1024
export HADOOP_HEAPSIZE_MIN=1024
export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true ${HADOOP_OPTS}"
export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
export HADOOP_CLIENT_OPTS="-Xmx${HADOOP_HEAPSIZE_MAX}m $HADOOP_CLIENT_OPTS"
export HADOOP_LIBEXEC_DIR=${HADOOP_HOME}/libexec

###
# Options for remote shell connectivity
###

export HADOOP_SSH_OPTS="-o ConnectTimeout=5 -o SendEnv=HADOOP_CONF_DIR"

###
# Options for all daemons
###

USER="$(whoami)"
export HADOOP_LOG_DIR=/var/log/hadoop
export HADOOP_IDENT_STRING=$USER
export HADOOP_PID_DIR=/var/run/hadoop
export HADOOP_SECURITY_LOGGER=INFO,DRFAS

###
# Secure/privileged execution
###

export HDFS_DATANODE_SECURE_USER=${HDFS_DATANODE_SECURE_USER:-""}
export HADOOP_SECURE_PID_DIR=${HADOOP_SECURE_PID_DIR:-/var/run/hadoop/$HDFS_DATANODE_SECURE_USER}
#export HADOOP_SECURE_LOG=
export HADOOP_SECURE_LOG_DIR=${HADOOP_SECURE_LOG_DIR:-/var/log/hadoop/$HDFS_DATANODE_SECURE_USER}


###
# NameNode specific parameters
###

export HDFS_AUDIT_LOGGER=INFO,DRFAAUDIT

HDFS_NAMENODE_OPTS_COMMON="-server -XX:ParallelGCThreads=8 -XX:+UseConcMarkSweepGC -XX:ErrorFile=/var/log/hadoop/$USER/nn_err_%p.log -XX:NewSize=256m -XX:MaxNewSize=256m -Xloggc:/var/log/hadoop/$USER/gc.log-`date +'%Y%m%d%H%M'` -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly -Xms2048m -Xmx2048m"

export HDFS_NAMENODE_OPTS="${HDFS_NAMENODE_OPTS_COMMON} ${HDFS_NAMENODE_OPTS}"

###
# SecondaryNameNode specific parameters
###

export HDFS_SECONDARYNAMENODE_OPTS="${HDFS_NAMENODE_OPTS_COMMON} ${HDFS_SECONDARYNAMENODE_OPTS}"

###
# DataNode specific parameters
###

export HDFS_DATANODE_OPTS="-server -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC -XX:ErrorFile=/var/log/hadoop/$USER/dn_err_pid%p.log -XX:NewSize=200m -XX:MaxNewSize=200m -Xloggc:/var/log/hadoop/$USER/gc.log-`date +'%Y%m%d%H%M'` -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xms1024m -Xmx1024m ${HDFS_DATANODE_OPTS} -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly"
# export HDFS_DATANODE_SECURE_USER=

###
# HDFS Balancer specific parameters
###

export HADOOP_BALANCER_OPTS="-server -Xmx1024m ${HADOOP_BALANCER_OPTS}"

###
# YARN -- add
###
export YARN_RESOURCEMANAGER_OPTS="-Dyarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY"
------------------------------------------------------------

 

次にcore-site.xmlを設定します。core-site.xmlを開いて設定したい項目を</configuration></configuration>の間に追加します。これも色々参考にして以下の設定を行いました。

 

    <property>
      <name>ha.zookeeper.quorum</name>
      <value>k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181</value>
    </property>

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://k-deltalake</value>
      <final>true</final>
    </property>

    <property>
      <name>ha.failover-controller.active-standby-elector.zk.op.retries</name>
      <value>120</value>
    </property>

    <property>
      <name>hadoop.http.filter.initializers</name>
      <value>org.apache.hadoop.security.AuthenticationFilterInitializer,org.apache.hadoop.security.HttpCrossOriginFilterInitializer</value>
    </property>

    <property>
      <name>hadoop.proxyuser.hdfs.groups</name>
      <value>*</value>
    </property>

    <property>
      <name>hadoop.proxyuser.hdfs.hosts</name>
      <value>*</value>
    </property>

    <property>
      <name>hadoop.proxyuser.root.groups</name>
      <value>*</value>
    </property>

    <property>
      <name>hadoop.proxyuser.root.hosts</name>
      <value>k-deltalake-m1.testdelta.com,k-deltalake-m2.testdelta.com,k-deltalake-m3.testdelta.com</value>
    </property>

    <property>
      <name>hadoop.proxyuser.yarn.hosts</name>
      <value>k-deltalake-m1.testdelta.com,k-deltalake-m2.testdelta.com,k-deltalake-m3.testdelta.com</value>
    </property>

    <property>
      <name>io.compression.codecs</name>
      <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
    </property>

    <property>
      <name>io.file.buffer.size</name>
      <value>131072</value>
    </property>

    <property>
      <name>io.serializations</name>
      <value>org.apache.hadoop.io.serializer.WritableSerialization</value>
    </property>

    <property>
      <name>ipc.client.connect.max.retries</name>
      <value>50</value>
    </property>

    <property>
      <name>ipc.server.tcpnodelay</name>
      <value>true</value>
    </property>

    <property>
      <name>net.topology.script.file.name</name>
      <value>/etc/hadoop/topology.sh</value>
    </property>

 

Hadoopはデータブロックをreplica数の分だけデータノードにコピーしますが、そのときに同じラックにあるサーバーを選ばないようにするRack Awarenessのためのスクリプトを今回適当に自作してみます(本番運用するときはもっと運用しやすいスクリプトにするはず)。 core-site.xmlの「net.topology.script.file.name」で設定したスクリプトです。

 

cat << _EOF_ > /etc/hadoop/topology.sh
#!/bin/bash
HOST=\`hostname\`
case \$HOST in
  "k-deltalake-s1.testdelta.com")
    echo "/hadoop-rack01"
    ;;
  "k-deltalake-s2.testdelta.com")
    echo "/hadoop-rack02"
    ;;
  "k-deltalake-s3.testdelta.com")
    echo "/hadoop-rack03"
    ;;
  *)
    echo "/hadoop-default-rack"
    ;;
esac
_EOF_
chmod 755 /etc/hadoop/topology.sh

 

最後にhdfs-site.xmlを設定します。core-site.xmlと同じように</configuration></configuration>の間に追加します。

HDFSもたくさん設定項目があるんですが、色々参考にして今回は主に以下のものを設定しました。本番運用のときはワークロードにあわせてもっと設定項目を追加することになると思います。

大事なポイントはHDFSのHA構成の設定で、Journal Nodeに関する項目や「k-deltalake」を名前に含む項目です。 「k-deltalake」は今回私がcore-site.xmlのfs.defaultFSやhdfs-site.xmlのdfs.internal.nameservicesに設定したHDFSの一種の名前で、任意の名前にできます。

この名前によって、幾つかの項目名も変えて設定しないといけないので注意が必要です。

 

    &lt;property&gt;
      &lt;name&gt;dfs.cluster.administrators&lt;/name&gt;
      &lt;value&gt;hdfs&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.blocksize&lt;/name&gt;
      &lt;value&gt;134217728&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.client.read.shortcircuit&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.client.read.shortcircuit.streams.cache.size&lt;/name&gt;
      &lt;value&gt;4096&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.datanode.data.dir&lt;/name&gt;
      &lt;value&gt;/var/hadoop/dfs/dn&lt;/value&gt;
      &lt;final&gt;true&lt;/final&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.journalnode.edits.dir&lt;/name&gt;
      &lt;value&gt;/var/hadoop/dfs/jn&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.domain.socket.path&lt;/name&gt;
      &lt;value&gt;/var/hadoop/dfs/dn_socket&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.ha.automatic-failover.enabled&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.internal.nameservices&lt;/name&gt;
      &lt;value&gt;k-deltalake&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.client.failover.proxy.provider.k-deltalake&lt;/name&gt;
      &lt;value&gt;org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.ha.namenodes.k-deltalake&lt;/name&gt;
      &lt;value&gt;nn1,nn2&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.http-address.k-deltalake.nn1&lt;/name&gt;
      &lt;value&gt;k-deltalake-m1.testdelta.com:50070&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.http-address.k-deltalake.nn2&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com:50070&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.https-address.k-deltalake.nn1&lt;/name&gt;
      &lt;value&gt;k-deltalake-m1.testdelta.com:50470&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.https-address.k-deltalake.nn2&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com:50470&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.rpc-address.k-deltalake.nn1&lt;/name&gt;
      &lt;value&gt;k-deltalake-m1.testdelta.com:8020&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.rpc-address.k-deltalake.nn2&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com:8020&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.shared.edits.dir&lt;/name&gt;
      &lt;value&gt;qjournal://k-deltalake-m1.testdelta.com:8485;k-deltalake-m2.testdelta.com:8485;k-deltalake-m3.testdelta.com:8485/k-deltalake&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.permissions.enabled&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.permissions.superusergroup&lt;/name&gt;
      &lt;value&gt;hdfs&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.replication&lt;/name&gt;
      &lt;value&gt;3&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.hosts.exclude&lt;/name&gt;
      &lt;value&gt;/etc/hadoop/dfs-exclude.hosts&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.checkpoint.dir&lt;/name&gt;
      &lt;value&gt;/var/hadoop/dfs/snn&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.handler.count&lt;/name&gt;
      &lt;value&gt;100&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.namenode.name.dir&lt;/name&gt;
      &lt;value&gt;/var/hadoop/dfs/nn&lt;/value&gt;
      &lt;final&gt;true&lt;/final&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;dfs.hosts.exclude&lt;/name&gt;
      &lt;value&gt;/etc/hadoop/dfs-exclude.hosts&lt;/value&gt;
    &lt;/property&gt;

 

dfs.hosts.excludeの項目で設定しているファイルを予め作ります。今回は使わないので空ファイルです。

 

# k-deltalake-m1, m2で実施
# hdfsユーザー
touch /etc/hadoop/dfs-exclude.hosts

 

hdfsユーザーの.bash_profileを設定します。hdfs-env.shと内容が被っていて冗長な気もしますがとりあえず以下で設定します。

 

cat << _EOF_ >> /home/hdfs/.bash_profile
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/usr/hadoop/current
export HADOOP_MAPRED_HOME=\${HADOOP_HOME}
export HADOOP_YARN_HOME=\${HADOOP_HOME}
export HADOOP_LIBEXEC_DIR=\${HADOOP_HOME}/libexec
export PATH=\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:\${PATH}
_EOF_

 

hdfsユーザーの.bash_profileを読み直して、HDFSのためのフォーマット処理を実施します。

 

# k-deltalake-m1で実施
# hdfsユーザーで実施
hdfs zkfc -formatZK

start-dfs.sh

hdfs namenode -format

stop-dfs.sh && start-dfs.sh

 

マスターノード2のNameNodeが起動できないはずなので、Standbyとして起動します。

 

# k-deltalake-m2で実施
# hdfsユーザーで実施
hdfs namenode -bootstrapStandby
hadoop-daemon.sh --config /etc/hadoop start namenode

 

うまくいってなくても何も返してこなかったりするので、HDFSのログやプロセスの確認は必須です。これでHDFSをHA構成で起動できました!

 

5. YARNの構築&HA

HDFSが構築できたら次はYARNです。今回YARNのResourceManagerをマスターノード2と3に立てる構成にするので、それぞれのノードのyarnユーザーがノーパスワードでサーバーにsshできるようにします。

  • k-deltalake-m2からk-deltalake-m3, s1, s2, s3にyarnユーザーでノーパスワードでssh可能に
  • k-deltalake-m3からk-deltalake-m2, s1, s2, s3にyarnユーザーでノーパスワードでssh可能に

終わったらyarn-env.shの環境変数を設定します。HDFSのときみたいに色々参考にして以下の項目について設定を行いました。

 

vi yarn-env.sh
# yarn-env.shの以下の箇所を修正
------------------------------------------------------------
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_YARN_HOME=/usr/hadoop/current
export HADOOP_LOG_DIR=/var/log/yarn
export HADOOP_SECURE_LOG_DIR=/var/log/yarn
export HADOOP_PID_DIR=/var/run/yarn
export HADOOP_SECURE_PID_DIR=/var/run/yarn
export HADOOP_LIBEXEC_DIR=${HADOOP_YARN_HOME}/libexec
export HADOOP_LOGLEVEL=${HADOOP_LOGLEVEL:-INFO}
export HADOOP_ROOT_LOGGER=${HADOOP_ROOT_LOGGER:-INFO,console}
export HADOOP_DAEMON_ROOT_LOGGER=${HADOOP_DAEMON_ROOT_LOGGER:-${HADOOP_LOGLEVEL},EWMA,RFA}
export HADOOP_YARN_USER=${HADOOP_YARN_USER:-yarn}
export HADOOP_OPTS="$HADOOP_OPTS -Dyarn.id.str=${HADOOP_YARN_USER} -Dyarn.policy.file=hadoop-policy.xml"

###
# Resource Manager specific parameters
###

export YARN_RESOURCEMANAGER_HEAPSIZE=1024
export YARN_RESOURCEMANAGER_OPTS="$YARN_RESOURCEMANAGER_OPTS -Dyarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY -Drm.audit.logger=INFO,RMAUDIT"

###
# Node Manager specific parameters
###

export YARN_NODEMANAGER_HEAPSIZE=2048
export YARN_NODEMANAGER_OPTS="$YARN_NODEMANAGER_OPTS -Dnm.audit.logger=INFO,NMAUDIT"
export YARN_TIMELINE_HEAPSIZE=4096

###
# Registry DNS specific parameters
###

export YARN_REGISTRYDNS_SECURE_USER=yarn
export YARN_REGISTRYDNS_SECURE_EXTRA_OPTS="-jvm server"
------------------------------------------------------------

 

それからyarn-site.xmlを設定します。これがhdfs-site.xmlのときみたいかそれ以上に設定項目があり、本番運用のときは相当調べないといけないと思いますが、今回は主に以下の項目を追加しました。もちろん、</configuration></configuration>の間に追加です。

ポイントはやはりResourceManagerのHA構成に関する設定で、yarn.resourcemanager.ha.rm-idsで指定したid(ここではrm1, rm2)を名前につけた項目たちです。それからyarn.scheduler.(minimum|maximum)-allocation-(mb|vcores)の項目あたりでしょうか。本番運用のときはチューニングで必ず適切な値を設定することになると思います。

今回の検証ではYARNでtimelineサービスを使うのはやめました。

 

    <property>
      <name>yarn.resourcemanager.zk-address</name>
      <value>k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181</value>
    </property>

    &lt;property&gt;
      &lt;name&gt;hadoop.registry.zk.quorum&lt;/name&gt;
      &lt;value&gt;k-deltalake-m1.testdelta.com:2181,k-deltalake-m2.testdelta.com:2181,k-deltalake-m3.testdelta.com:2181&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.admin.acl&lt;/name&gt;
      &lt;value&gt;yarn&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.application.classpath&lt;/name&gt;
      &lt;value&gt;$HADOOP_CONF_DIR,/usr/hadoop/current/share/hadoop/*,/usr/hadoop/current/share/hadoop/client/*,/usr/hadoop/current/share/hadoop/common/*,/usr/hadoop/current/share/hadoop/common/lib/*,/usr/hadoop/current/share/hadoop/hdfs/*,/usr/hadoop/current/share/hadoop/hdfs/lib/*,/usr/hadoop/current/share/hadoop/mapreduce/*,/usr/hadoop/current/share/hadoop/mapreduce/lib/*,/usr/hadoop/current/share/hadoop/tools/lib/*,/usr/hadoop/current/share/hadoop/yarn/*,/usr/hadoop/current/share/hadoop/yarn/lib/*&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.log.server.url&lt;/name&gt;
      &lt;value&gt;http://k-deltalake-m3.testdelta.com:19888/jobhistory/logs&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.log.server.web-service.url&lt;/name&gt;
      &lt;value&gt;http://k-deltalake-m3.testdelta.com:8188/ws/v1/applicationhistory&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.address&lt;/name&gt;
      &lt;value&gt;0.0.0.0:45454&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.bind-host&lt;/name&gt;
      &lt;value&gt;0.0.0.0&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.local-dirs&lt;/name&gt;
      &lt;value&gt;/var/hadoop/yarn/local&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.log-dirs&lt;/name&gt;
      &lt;value&gt;/var/log/yarn&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.recovery.dir&lt;/name&gt;
      &lt;value&gt;/var/hadoop/yarn/recovery&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.recovery.enabled&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.recovery.supervised&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.nodemanager.remote-app-log-dir&lt;/name&gt;
      &lt;value&gt;/app-logs&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.address&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com:8050&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.admin.address&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com:8141&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.bind-host&lt;/name&gt;
      &lt;value&gt;0.0.0.0&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.cluster-id&lt;/name&gt;
      &lt;value&gt;yarn-cluster&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.ha.automatic-failover.zk-base-path&lt;/name&gt;
      &lt;value&gt;/yarn-leader-election&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.ha.enabled&lt;/name&gt;
      &lt;value&gt;true&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.ha.rm-ids&lt;/name&gt;
      &lt;value&gt;rm1,rm2&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.hostname&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.hostname.rm1&lt;/name&gt;
      &lt;value&gt;k-deltalake-m2.testdelta.com&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.hostname.rm2&lt;/name&gt;
      &lt;value&gt;k-deltalake-m3.testdelta.com&lt;/value&gt;
    &lt;/property&gt;

    &lt;property&gt;
      &lt;name&gt;yarn.resourcemanager.nodes.exclude-path&lt;/name&gt;
      &lt;value&gt;/etc/hadoop/yarn-exclude.nodes&lt;/value&gt;
    &lt;/property&gt;

    <property>
      <name>yarn.resourcemanager.recovery.enabled</name>
      <value>true</value>
    </property>

    <property>
      <name>yarn.resourcemanager.resource-tracker.address</name>
      <value>k-deltalake-m2.testdelta.com:8025</value>
    </property>

    <property>
      <name>yarn.resourcemanager.scheduler.address</name>
      <value>k-deltalake-m2.testdelta.com:8030</value>
    </property>

    <property>
      <name>yarn.resourcemanager.scheduler.class</name>
      <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    </property>

    <property>
      <name>yarn.resourcemanager.scheduler.monitor.enable</name>
      <value>true</value>
    </property>

    <property>
      <name>yarn.resourcemanager.store.class</name>
      <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.address</name>
      <value>k-deltalake-m2.testdelta.com:8088</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.address.rm1</name>
      <value>k-deltalake-m2.testdelta.com:8088</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.address.rm2</name>
      <value>k-deltalake-m3.testdelta.com:8088</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.https.address</name>
      <value>k-deltalake-m2.testdelta.com:8090</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.https.address.rm1</name>
      <value>k-deltalake-m2.testdelta.com:8090</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.https.address.rm2</name>
      <value>k-deltalake-m3.testdelta.com:8090</value>
    </property>

    <property>
      <name>yarn.resourcemanager.webapp.cross-origin.enabled</name>
      <value>true</value>
    </property>

    <property>
      <name>yarn.scheduler.minimum-allocation-mb</name>
      <value>2048</value>
    </property>

    <property>
      <name>yarn.scheduler.maximum-allocation-mb</name>
      <value>12288</value>
    </property>

    <property>
      <name>yarn.scheduler.minimum-allocation-vcores</name>
      <value>1</value>
    </property>

    <property>
      <name>yarn.scheduler.maximum-allocation-vcores</name>
      <value>4</value>
    </property>

    <property>
      <name>yarn.timeline-service.enabled</name>
      <value>false</value>
    </property>

    <property>
      <name>yarn.webapp.api-service.enable</name>
      <value>true</value>
    </property>

    <property>
      <name>yarn.webapp.ui2.enable</name>
      <value>true</value>
    </property>

 

yarn.resourcemanager.nodes.exclude-pathの項目で設定しているファイルを予め作ります。今回は使わないので空ファイルです。

 

# k-deltalake-m2, m3で実施
# yarnユーザー
touch /etc/hadoop/yarn-exclude.nodes

 

YARNキューにはCapacity Schedulerを使うので、その設定ファイルのcapacity-scheduler.xmlに今回以下の項目だけ変更を加えてみました。

 

  <property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.2</value>
    <description>
      Maximum percent of resources in the cluster which can be used to run
      application masters i.e. controls number of concurrent running
      applications.
    </description>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.capacity</name>
    <value>100</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.acl_administer_queue</name>
    <value>*</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.acl_submit_applications</name>
    <value>*</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.acl_application_max_priority</name>
    <value>*</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.ordering-policy</name>
    <value>priority-utilization</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.priority</name>
    <value>0</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.priority</name>
    <value>0</value>
  </property>

 

yarnユーザーの.bash_profileを設定します。yarn-env.shと内容が被っていますがhdfsユーザーのときと同様に設定します。

 

cat << _EOF_ >> /home/yarn/.bash_profile
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_HOME=/usr/hadoop/current
export HADOOP_MAPRED_HOME=\${HADOOP_HOME}
export HADOOP_YARN_HOME=\${HADOOP_HOME}
export HADOOP_LIBEXEC_DIR=\${HADOOP_HOME}/libexec
export PATH=\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:\${PATH}
_EOF_

 

yarnユーザーの.bash_profileを読み込みなおしてYARNを起動します。

 

# k-deltalake-m2で実施
# yarnユーザーで実施
start-yarn.sh

# ResourceManagerを確認
yarn rmadmin -getServiceState rm1
yarn rmadmin -getServiceState rm2

 

YARN用のサンプルジョブ実施してみます。

 

# k-deltalake-m2で実施
yarn jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar pi 16 1000

 

エラー発生がしてしまいました。。hdfsのパーミッションやディレクトリが整ってなかったことが原因だったので、以下を行ってもう一度サンプルジョブを動かします。

 

# hdfsユーザー
hdfs dfs -chown hdfs:hadoop /
hdfs dfs -chmod 775 /
hdfs dfs -mkdir /tmp
hdfs dfs -chmod 777 /tmp
hdfs dfs -mkdir /app-logs
hdfs dfs -chown yarn:hadoop /app-logs

 

サンプルジョブが成功したらYARN構築も完了です!

 

6. Sparkのインストール&構築

HDFSとYARNの構築が終わったら、今度はSparkの環境を構築していきます。

Sparkも最新のSpark3.0.0をインストールすることにしました。hadoopやzookeeperと同様、バイナリをダウンロードし展開します。

 

# 全サーバーで実施
cd /var/tmp
wget https://ftp.riken.jp/net/apache/spark/spark-3.0.0/spark-3.0.0-bin-without-hadoop.tgz
tar zxvf spark-3.0.0-bin-without-hadoop.tgz
mkdir /usr/spark
mv spark-3.0.0-bin-without-hadoop /usr/spark/
chown -R spark:hadoop /usr/spark
cd /usr/spark
ln -s spark-3.0.0-bin-without-hadoop current
cd /etc
ln -s /usr/spark/current/conf spark
mkdir /var/log/spark
mkdir /var/run/spark
chown -R spark:hadoop /var/log/spark /var/run/spark

 

インストールが終わったら、Sparkの環境変数をspark-env.shを作成して設定します。

 

cd /usr/spark/current/conf
cp -p spark-env.sh.template spark-env.sh
cat << _EOF_ >> spark-env.sh
export SPARK_DIST_CLASSPATH=\$(hadoop classpath)
export SPARK_LOG_DIR=/var/log/spark
export SPARK_PID_DIR=/var/run/spark
export SPARK_DAEMON_MEMORY=2048m
export SPARK_IDENT_STRING=\$USER
export HADOOP_HOME=/usr/hadoop/current
export HADOOP_CONF_DIR=\${HADOOP_HOME}/conf
export JAVA_HOME=/usr/lib/jvm/java
export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_HOME}/lib/native
export HADOOP_OPTS="\${HADOOP_OPTS} -Djava.library.path=\${HADOOP_COMMON_LIB_NATIVE_DIR}"
export LD_LIBRARY_PATH="\${LD_LIBRARY_PATH}:\${HADOOP_COMMON_LIB_NATIVE_DIR}"
_EOF_

 

続いて、Sparkの各種パラメータをspark-defaults.confを作成して設定します。

 

cp -p spark-defaults.conf.template spark-defaults.conf
cat << _EOF_ >> spark-defaults.conf
spark.driver.extraLibraryPath /usr/hadoop/current/lib/native
spark.eventLog.dir hdfs:///spark-logs/
spark.eventLog.enabled true
spark.executor.extraJavaOptions -XX:+UseNUMA
spark.executor.extraLibraryPath /usr/hadoop/current/lib/native
spark.history.fs.cleaner.enabled true
spark.history.fs.cleaner.interval 7d
spark.history.fs.cleaner.maxAge 90d
spark.history.fs.logDirectory hdfs:///spark-logs/
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
spark.io.compression.lz4.blockSize 128kb
spark.master yarn
spark.shuffle.file.buffer 1m
spark.shuffle.io.backLog 8192
spark.shuffle.io.serverThreads 128
spark.shuffle.unsafe.file.output.buffer 5m
spark.sql.autoBroadcastJoinThreshold 26214400
spark.sql.statistics.fallBackToHdfs true
spark.sql.warehouse.dir /apps/spark/warehouse
spark.unsafe.sorter.spill.reader.buffer.size 1m
spark.yarn.historyServer.address k-deltalake-m3.testdelta.com:18080
spark.yarn.queue default
_EOF_

 

sparkユーザーの.bash_profileを設定します。冗長な気もしつつ設定しておきます。

 

cat << _EOF_ >> /home/spark/.bash_profile
export JAVA_HOME=/usr/lib/jvm/java
export SPARK_HOME=/usr/spark/current
export HADOOP_HOME=/usr/hadoop/current
export PATH=\${SPARK_HOME}/bin:\${SPARK_HOME}/sbin:\${HADOOP_HOME}/bin:\${HADOOP_HOME}/sbin:${PATH}
_EOF_

 

HDFS上に/user/sparkを作るか、または/user配下に書き込める権限を与えておかないとSparkを動かすときにこけてしまうので、権限を付与しておきます。

 

hdfs dfs -chown hdfs:hadoop /user
hdfs dfs -chmod 775 /user

 

また、HDFS上に設定ファイルで指定した/spark-logsというディレクトリが必要になるので作っておきます。

 

hdfs dfs -mkdir /spark-logs
hdfs dfs -chown spark:hadoop /spark-logs

 

yarn-site.xmlに以下のSpark用の項目を修正または追記して、YARNを再起動しておきます。

 

    <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle,spark_shuffle</value>
    </property>

    <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
      <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>

    <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
      <value>/usr/spark/current/yarn/*</value>
    </property> 

 

設定が一通り済んだら、マスターノード3でSpark Historyサーバーを起動します。

 

# k-deltalake-m3で実施
# sparkユーザーで実施
start-history-server.sh

 

Spark環境ができたので、spark-submitでサンプルプログラムをYARN上で動かしてみます。

 

# k-deltalake-m3で実施
# sparkユーザーで実施
spark-submit --class org.apache.spark.examples.SparkPi \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 2g \
  --executor-cores 1 \
  ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.0.jar \
  10

 

そして、エラー。やっぱり設定が多いものは初回実行はエラーがつきものですね。。

 

ERROR ui.SparkUI: Failed to bind SparkUI java.lang.IllegalStateException: class org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter is not a javax.servlet.Filter

 

このエラーの解決策がわからずすごいハマりましたが、–conf spark.ui.enabled=falseをつけたら取り敢えず回避できたのでそうしました。 根本的な解決策ではないのでこれは今後要調査です。

 

# k-deltalake-m3で実施
# sparkユーザーで実施
spark-submit --class org.apache.spark.examples.SparkPi \
  --conf spark.ui.enabled=false \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 2g \
  --executor-cores 1 \
  ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.0.jar \
  10

 

サンプルプログラムをspark-submitで動かせたら、これでようやくHadoopとSparkの環境が出来上がりです!

YARNのクラスタモードでspark-submitできるところまできました!

 

4. Delta Lakeを検証

幾多の設定を乗り越えてHadoopおよびSpark環境が構築できたので、ついにDelta Lakeの検証にいきたいと思います。

1. PySparkでSQL操作

最初どうやってDelta Lakeを使うかわからなかったんですが、以下のようにpysparkに接続するときに–packagesや–confを設定するだけで使うことができました。

 

# k-deltalake-m3で実施
# sparkユーザーで実施
pyspark \
  --packages io.delta:delta-core_2.12:0.7.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore"

 

pysparkに接続できたらSparkSessionを作り、Delta Lakeフォーマットのテーブルを書き込みます。

 

>>> from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
from delta.tables import *
import shutil
import threading
>>> spark = SparkSession \
    .builder \
    .appName("test-deltalake") \
    .getOrCreate()
>>> print("############# Creating a table ###############")
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

 

今度は書き込んだDelta Lakeのテーブルを読み込みます。

 

>>> print("############ Reading the table ###############")
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+

 

あっさりできてしまっていい感じです。

Delta Lakeはこの読み書きの書き方が基本なんだと思いますが、もっと使いやすいようにRDBのテーブルっぽくSQLで扱えるようにしたいところ。先に.saveで書き込んだ場合は後からcreateOrReplaceTempViewを使えばできます。

 

df.createOrReplaceTempView("delta_table")
sqlDF = spark.sql("SELECT * FROM delta_table WHERE id > 1")
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

 

SQL使えるのはやっぱり使いやすいですね。ただもっとRDBのテーブルっぽく扱うにはテーブル作成自体もSQLでやれないと今一つと思うので試してみます。Delta Lakeはテーブルのパーティションにも対応していて良い感じです。

 

ddl = """
CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING
)
USING DELTA
"""
spark.sql(ddl)

ddl = """
CREATE TABLE events_p (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING
)
USING DELTA
PARTITIONED BY (date)
"""
spark.sql(ddl)

 

テーブルのレコードがないと何も始まらないので、INSERT文で格納します。

 

dml = """
INSERT INTO events
VALUES
('2020-02-21', 'J1-01-01', 'sports', '湘南ベルマーレvs浦和レッズ'),
('2020-02-22', 'J1-01-05', 'sports', 'セレッソ大阪vs大分トリニータ'),
('2020-02-23', 'J1-01-06', 'sports', '清水エスパルスvsFC東京')
"""
spark.sql(dml)
spark.sql("SELECT * FROM events ORDER BY date, eventId").show()
+----------+--------+---------+----------------------------+
|      date| eventId|eventType|                        data|
+----------+--------+---------+----------------------------+
|2020-02-21|J1-01-01|   sports|  湘南ベルマーレvs浦和レッズ|
|2020-02-22|J1-01-05|   sports|セレッソ大阪vs大分トリニータ|
|2020-02-23|J1-01-06|   sports|      清水エスパルスvsFC東京|
+----------+--------+---------+----------------------------+

 

INSERT後に実際のデータファイルを見てみると以下のような構成になってました。ちなみにテーブル作成したばかりの時点ではparquetファイルはなく、_delta_logディレクトリが配下にあるだけです。

 

# hdfsユーザー
hdfs dfs -ls /apps/spark/warehouse/events
Found 3 items
drwxr-xr-x   - spark hadoop          0 2020-07-05 11:19 /apps/spark/warehouse/events/_delta_log
-rw-r--r--   3 spark hadoop       1443 2020-07-05 11:19 /apps/spark/warehouse/events/part-00000-59e05afe-4e87-4252-9d7f-d11ea610913a-c000.snappy.parquet
-rw-r--r--   3 spark hadoop       1318 2020-07-05 11:19 /apps/spark/warehouse/events/part-00001-cc028cfa-4c31-4cbf-87f6-7bf96d05e984-c000.snappy.parquet

 

events_pテーブルについて上記と同じ内容のINSERT文を実施すると、パーティショニングされてデータが格納されるのがわかります。

 

# hdfsユーザー
hdfs dfs -ls /apps/spark/warehouse/events_p
Found 4 items
drwxr-xr-x   - spark hadoop          0 2020-07-05 11:20 /apps/spark/warehouse/events_p/_delta_log
drwxr-xr-x   - spark hadoop          0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-21
drwxr-xr-x   - spark hadoop          0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-22
drwxr-xr-x   - spark hadoop          0 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-23

hdfs dfs -ls /apps/spark/warehouse/events_p/date*
Found 1 items
-rw-r--r--   3 spark hadoop       1244 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-21/part-00000-0281547d-7327-43cd-9bd1-5c51722088ca.c000.snappy.parquet
Found 1 items
-rw-r--r--   3 spark hadoop       1271 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-22/part-00001-98283bac-128c-44fc-8a2e-889ed6589193.c000.snappy.parquet
Found 1 items
-rw-r--r--   3 spark hadoop       1181 2020-07-05 11:20 /apps/spark/warehouse/events_p/date=2020-02-23/part-00001-c65f3f0d-fe13-4d28-9bed-fb9ced662e95.c000.snappy.parquet

 

続けてUPDATE文を試してみます。従来のビッグデータ技術が対応してなかったのがUPDATEで、1レコードだけ修正して直したいようなときでもデータの塊を取り出して修正しまた塊を戻したりしないといけませんでした。

 

dml = """
UPDATE events_p
SET data = concat(data, ':スコア(1-3):FC東京WIN')
WHERE eventId = 'J1-01-06'
"""
spark.sql(dml)

dml = """
UPDATE events_p
SET data = concat(data, ':スコア(2-3):浦和レッズWIN')
WHERE eventId = 'J1-01-01'
"""
spark.sql(dml)

dml = """
UPDATE events_p
SET data = concat(data, ':スコア(1-0):セレッソ大阪WIN')
WHERE eventId = 'J1-01-05'
"""
spark.sql(dml)

 

Hiveも3系になって最初からUPDATE対応になりましたが、Delta LakeもUPDATEできました。さらに便利なのはUPSERT(レコードがあればUPDATE、なければINSERT)に対応していることです。Delta LakeでUPSERTを行うにはMERGE文を使います。

MERGE文の検証として、events_pテーブルにレコードを3件追加した後、MERGE文でeventsテーブルにevents_pテーブルの内容を反映させます。MERGE文が成功すると、eventテーブルはまだ一回もUPDATEしていないので、既存の3レコードはevents_pテーブルの内容にUPDATE、入ってないレコードはevents_pテーブルからINSERTされるはずです。

 

dml = """
INSERT INTO events_p
VALUES
('2020-02-22', 'J1-01-02', 'sports', 'ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW'),
('2020-02-22', 'J1-01-03', 'sports', '柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN'),
('2020-02-23', 'J1-01-04', 'sports', '川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW')
"""
spark.sql(dml)
spark.sql("SELECT * FROM events_p ORDER BY date, eventId").show(truncate=False)
+----------+--------+---------+----------------------------------------------------------+
|date      |eventId |eventType|data                                                      |
+----------+--------+---------+----------------------------------------------------------+
|2020-02-21|J1-01-01|sports   |湘南ベルマーレvs浦和レッズ:スコア(2-3):浦和レッズWIN    |
|2020-02-22|J1-01-02|sports   |ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW         |
|2020-02-22|J1-01-03|sports   |柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN  |
|2020-02-22|J1-01-05|sports   |セレッソ大阪vs大分トリニータ:スコア(1-0):セレッソ大阪WIN|
|2020-02-23|J1-01-04|sports   |川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW           |
|2020-02-23|J1-01-06|sports   |清水エスパルスvsFC東京:スコア(1-3):FC東京WIN            |
+----------+--------+---------+----------------------------------------------------------+

dml = """
MERGE INTO events e
USING events_p ep
ON e.eventId = ep.eventId
WHEN MATCHED THEN
  UPDATE SET e.data = ep.data
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(dml)
spark.sql("SELECT * FROM events ORDER BY date, eventId").show(truncate=False)
+----------+--------+---------+----------------------------------------------------------+
|date      |eventId |eventType|data                                                      |
+----------+--------+---------+----------------------------------------------------------+
|2020-02-21|J1-01-01|sports   |湘南ベルマーレvs浦和レッズ:スコア(2-3):浦和レッズWIN    |
|2020-02-22|J1-01-02|sports   |ベガルタ仙台vs名古屋グランパス:スコア(1-1):DRAW         |
|2020-02-22|J1-01-03|sports   |柏レイソルvsコンサドーレ札幌:スコア(4-2):柏レイソルWIN  |
|2020-02-22|J1-01-05|sports   |セレッソ大阪vs大分トリニータ:スコア(1-0):セレッソ大阪WIN|
|2020-02-23|J1-01-04|sports   |川﨑フロンターレvsサガン鳥栖:スコア(0-0):DRAW           |
|2020-02-23|J1-01-06|sports   |清水エスパルスvsFC東京:スコア(1-3):FC東京WIN            |
+----------+--------+---------+----------------------------------------------------------+

 

MERGE文を実行したときに以下のエラーが出てしまったんですが、処理は想定通りにできていました。どうもこのバグのようですが、このあたりは自分の設定が悪いのかDelta Lakeがまだ不安定なのか要観察です。

ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.lang.ClassCastException: java.util.Collections$SynchronizedSet cannot be cast to java.util.List

 

ここまでで触れてなかったですが、一回あたりのSQL処理時間は今回の検証では体感で数秒~数十秒くらいでした。

 

2. Rollbackについて

Delta Lakeはトランザクション対応とあるので、Rollbackをどうするのか調べてみました。

結論から言うと、RDBのRollbackとは全然違います。Delta Lakeはテーブルが更新されると更新履歴とその値を一定時間保持しているため、タイムスタンプかバージョン番号を覚えておくことで更新前の値を取り出すことができます。これを使って自分でRollbackするというわけです。

正直イケてるとは言い難いですが、ビッグデータの技術はそもそも今までINSERTのみが基本でUPDATEやRollbackの機能がないか制約付きだったことを思えば、悪くはないと思います。DESCRIBE HISTORY文を使うと、そのテーブルの更新履歴をとることができます。

 

spark.sql("DESCRIBE HISTORY events_p").show(truncate=False)
+-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                           |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                   |userMetadata|
+-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+
|3      |2020-07-05 14:53:28.639|null  |null    |UPDATE      |[predicate -> (eventId#4213 = J1-01-06)]                                      |null|null    |null     |2          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]|null        |
|2      |2020-07-05 11:36:41.243|null  |null    |UPDATE      |[predicate -> (eventId#2056 = J1-01-06)]                                      |null|null    |null     |1          |null          |false        |[numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 1, numCopiedRows -> 0]|null        |
|1      |2020-07-05 11:20:25.17 |null  |null    |WRITE       |[mode -> Append, partitionBy -> []]                                           |null|null    |null     |0          |null          |true         |[numFiles -> 3, numOutputBytes -> 3696, numOutputRows -> 3]                        |null        |
|0      |2020-07-05 10:12:34.831|null  |null    |CREATE TABLE|[isManaged -> true, description ->, partitionBy -> ["date"], properties -> {}]|null|null    |null     |null       |null          |true         |[]                                                                                 |null        |
+-------+-----------------------+------+--------+------------+------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------+------------+

 

このversionかtimestampの値をみて、そのときのデータを取り出すことができます。versionが1のときのデータを取り出してみます。

 

df = spark.read.format("delta").load("/apps/spark/warehouse/events_p@v1")
df.show()
+----------+--------+---------+----------------------------+
|      date| eventId|eventType|                        data|
+----------+--------+---------+----------------------------+
|2020-02-22|J1-01-05|   sports|セレッソ大阪vs大分トリニータ|
|2020-02-21|J1-01-01|   sports|  湘南ベルマーレvs浦和レッズ|
|2020-02-23|J1-01-06|   sports|      清水エスパルスvsFC東京|
+----------+--------+---------+----------------------------+

 

更新前のデータの取り出しはSQLでもできるはずですが、今回はSQLだと何故か上手く行きませんでした。

上記のようにテーブルの実体があるHDFS上のディレクトリを指定して、ディレクトリ名の後ろに「@v{バージョン番号}」をつけるだけで取り出せます。取り出したv1データのデータフレームに対して以下のような処理すれば特定のレコードに対して自作Rollbackができます。

 

rollbackRow = df.select("eventId","data").where(col("eventId") == "J1-01-06").collect()
rbValue = rollbackRow[0]["data"]
dml = "UPDATE events_p SET data = '" + rbValue + "' WHERE eventId = 'J1-01-06' "
spark.sql(dml)

 

3. spark-submitでDelta Lakeを操作

Sparkの真骨頂は分散処理にあると思いますが、つまりそれはspark-submitのクラスタモードでDelta Lakeが操作できるということなので、最後にspark-submitを使ってHadoop環境上の分散処理を検証してみました。Delta Lakeを操作するPythonファイルを作り、それを呼ぶspark-submitのシェルスクリプトを用意します。

pysparkコマンドのときと同様に–packagesや–confで必要な項目を指定します。–master yarn –deploy-mode cluster と指定することでYARNを使ったHadoopクラスタ上の分散処理になります。

 

# k-deltalake-m3
# sparkユーザー
cd ~
cat << _EOF_ > test_deltalake.sh
#!/usr/bin/bash
spark-submit \\
  --master yarn \\
  --deploy-mode cluster \\
  --num-executors 1 \\
  --executor-memory 2G \\
  --executor-cores 1 \\
  --driver-memory 2G \\
  --packages io.delta:delta-core_2.12:0.7.0 \\
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \\
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \\
  --conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.HDFSLogStore" \\
  --conf "spark.ui.enabled=false" \\
test_deltalake.py 2>&1 |tee test_deltalake.log
_EOF_

 

spark-submitで呼び出すPythonファイルは以下です。events_pテーブルのeventTypeカラムの値を「sports」から「sport/soccer」に一括更新する単純な処理です。

ここでポイントは、createOrReplaceTempViewを使っていることです。本来なら何もしないでもevent_pというテーブル名を使ってSQLの操作をしたいのですが、どうもCREATE TABLEを行ったセッションの中ではそのテーブル名で何でもSQL操作できますが、別のセッションになると認識してくれませんでした。

HiveやRDBなら接続したらメタ情報を取ってきて困ることはないのですが、Delta Lakeはそうではないようです。このあたりはまだ調べ切れていないので、createOrReplaceTempViewをしないでもメタ情報を取ってくるスマートなやり方があるかもしれないです。

 

cat << _EOF_ > test_deltalake.py
from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from py4j.java_collections import MapConverter
from delta.tables import *

spark = SparkSession \\
    .builder \\
    .appName("test-deltalake") \\
    .getOrCreate()

df = spark.read.format("delta").load("/apps/spark/warehouse/events_p")
df.createOrReplaceTempView("events_p")

q = "SELECT * FROM events_p ORDER BY date, eventId"
spark.sql(q).show(truncate=False)

dml = "UPDATE events_p SET eventType = concat(eventType, '/soccer')"
spark.sql(dml)

spark.sql(q).show(truncate=False)
_EOF_

 

準備ができたら、用意したtest_deltalake.shを叩いてspark-submitを実行します。実行したところ、またしてもエラーが。。

 

UnicodeEncodeError: 'ascii' codec can't encode characters in position 304-310: ordinal not in range(128)

 

このページを参考に、上記のPythonファイル(test_delta_lake.py)に下記を追記しました。

 

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

 

再度実行したところ、エラーもなく、ついにHadoop環境上でDelta Lakeのテーブルに対してSparkの分散処理を行えるようになりました!!

 

5. まとめ

1. Delta Lakeの感想

Delta Lakeを使うのはシンプルなので、自分たちでSparkに統一したビッグデータ解析基盤を作る選択肢としては悪くないように思いました。昨今のデータ解析はJupyter Notebookを使ってPythonでやるのが最も多いユースケースと思いますが、Pythonと親和性が高くSQLで操作できるのは強力です。

本番運用を考えるとHadoop環境(HDFS+YARN)を使った方が良いと思いますが、Hadoop環境の構築・運用を自前でやるのはHDFSとYARNだけでもかなりしんどいです。。YARNを使わないでHDFSだけ使うのもまだありかもしれません。

いずれにしても一番困るのはHadoop環境の運用で、設定ファイルを変更したときの対象サーバーへの配布だったり、各プロセスの起動・停止を一元管理したりできないと運用が回らなくなりそうです。どうやってやるかが自前でやるときの大きな課題ですね。

 

2. 今後の課題

Delta Lakeの検証でまだできていない大事なことは、

  • トランザクションの更なる検証(更新時のロックなど)
  • 大量のデータを入れた巨大テーブルの検証(処理性能や更新履歴の肥大化など)
  • ODBC/JDBCを使ったBIツールとの連携(できるかできないか含めて)

このあたりでしょうか。Spark + Delta Lakeの組み合わせは今後も注目していきたいと思います。

 

最後に

次世代システム研究室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。   皆さんのご応募をお待ちしています。





  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事