2021.07.05
KubernetesベースのSparkクラスターを検証するー既存Hadoopクランスターと連携編
こんにちは。次世代システム研究室のT.D.Qです。
前回のブログよりConoha VPSでSpark on Kubernetesクラスターの構築について紹介しました。その後このKubernetesクラスター上にHadoopクラスターを構築してSparkの検証を行おうと思いましたが、同部署別のメンバーが別のVPSサーバでHadoop(+Delta Lake)クラスターを構築したので、代わりにこのHadoopクラスターに接続してうまく使えるか検証したので今回の記事で紹介したいと思います。
今回やりたいこと
今回やりたいことは、一言で言うとHDFS ClusterがSpark on Kubernetes ClusterのStorage Layerにすることです。以下の図でイメージしやすいです。
ちなみに、以下の図でHDFSクラスターのアーキテクチャーです。ご存知な方が多いと思いますが念のため記載しておきます。
上記のHDFSクラスターのアーキテクチャー図から見ると、基本的にはSpark on KubernetesクラスターのContainerに接続対象HDFS ClusterのHDFS Clientがあれば実現できると思います。
Hadoopクラスターのネットワークに移動
もともと同じ部署なので各メンバーが一つの検証ようのConoha VPSアカウントで複数のVPSマシンを起動して別々のクラスターを構築しました。ただし、プライベートネットワークセグメントが全然違うので、Spark on KubernetesクラスターとHadoopクラスターの通信はできませんでした。幸い、Kubernetesクラスターの再構築が比較的にしやすいので以下の手順でSpark on KubernetesクラスターをHadoopクラスターのプライベートネットワークセグメントに移動しました。
① 資材をバックアップし、KubeSprayでK8sクラスターを削除
②Kubernetesクラスターのネットワークセグメント移動
・Conoha Control Panelでマシンごとに停止
・「プライベートネットワーク」画面でHadoopクラスターのネットワークセグメントに接続し、Internal IPを確認
・VPSマシンを起動し、新しいInternal IPでhosts、ネットワーク設定を更新
③ Kubernetesクラスターを再構築
全マシンの設定完了後、KubeSprayのInventoryファイルに新しいIPを更新して、クラスターを再構築
④ LensにKubernetesクラスターを接続するための設定
以下の感じでKubernetesクラスターをHadoopからスターに接続することができました。
HDFS Client対応するContainerの構築
Dockerfileのカスタマイズ
Kubernetes上でSparkを使うため、Sparkのソースと同梱するDockerFileとビルドツールで専用のDocker Imageをビルドする必要がありますが、
このDockerFileはHDFS非対応です。Kubernetesクラスター上で実行するSpark Containerが別のHadoopクラスターに接続するため、DockerFileをカスタマイズする必要があります。
今回はpySparkを使うため、SparkのベースイメージのDockerfileを加えて、Python対応のDockerfileの一部を使います。Dockerfile2個の統合後は以下の内容になります。
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ARG java_image_tag=11-jre-slim FROM openjdk:${java_image_tag} ARG spark_uid=185 # Before building the docker image, first build and make a Spark distribution following # the instructions in http://spark.apache.org/docs/latest/building-spark.html. # If this docker file is being used in the context of building your images from a Spark # distribution, the docker build command should be invoked from the top level directory # of the Spark distribution. E.g.: # docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile . USER 0 RUN set -ex && \ sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \ apt-get update && \ ln -s /lib /lib64 && \ apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/examples && \ mkdir -p /opt/spark/work-dir && \ touch /opt/spark/RELEASE && \ rm /bin/sh && \ ln -sv /bin/bash /bin/sh && \ echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \ chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \ rm -rf /var/cache/apt/* COPY jars /opt/spark/jars COPY bin /opt/spark/bin COPY sbin /opt/spark/sbin COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ COPY kubernetes/dockerfiles/spark/decom.sh /opt/ COPY examples /opt/spark/examples COPY kubernetes/tests /opt/spark/tests COPY data /opt/spark/data ENV SPARK_HOME /opt/spark WORKDIR /opt/spark/work-dir RUN chmod g+w /opt/spark/work-dir RUN chmod a+x /opt/decom.sh ##### ここからはPythonの実行環境を構築する ######### RUN mkdir ${SPARK_HOME}/python RUN apt-get update && \ apt install -y python3 python3-pip && \ pip3 install --upgrade pip setuptools && \ # Removed the .cache to save space rm -r /root/.cache && rm -rf /var/cache/apt/* COPY python/pyspark ${SPARK_HOME}/python/pyspark COPY python/lib ${SPARK_HOME}/python/lib WORKDIR /opt/spark/work-dir ENTRYPOINT [ "/opt/entrypoint.sh" ] # Specify the User that the actual main process will run as USER ${spark_uid}
Hadoop Clientの設定
Hadoopクラスターに接続するため、稼働するContainerがそのクラスターのユーザー及び設定情報を持つ必要があります。Hadoopクラスターの構築手順と同様user、group及び必要なディレクトリを以下の通りに設定しました。
# Add users for Spark and Hadoop cluster RUN groupadd hadoop && \ groupadd hdfs && \ useradd -N -g hadoop zookeeper && \ useradd -g hadoop -G hdfs hdfs -m && \ useradd -N -g hadoop yarn -m && \ useradd -N -g hadoop -u ${spark_uid} -m spark && \ mkdir /usr/lib/hadoop && \ mkdir -p /var/log/hadoop && \ mkdir -p /var/log/yarn && \ mkdir -p /var/run/hadoop && \ mkdir -p /var/run/yarn && \ mkdir /var/log/spark && \ mkdir /var/run/spark && \ mkdir -p /var/hadoop/yarn/{local,recovery} RUN chown root:hadoop /var/hadoop && \ chown -R hdfs:hadoop /var/run/hadoop && \ chown -R hdfs:hadoop /var/log/hadoop && \ chown -R yarn:hadoop /var/log/yarn && \ chown -R yarn:hadoop /var/run/yarn && \ chmod 775 /var/log/hadoop && \ chmod 775 /var/log/yarn && \ chown -R spark:hadoop /var/log/spark /var/run/spark
Hadoopクラスーの設定情報をImageにコピー
SCPコマンドでHadoopクラスターを構築するとき展開した/usr/lib/hadoop/hadoop-3.2.2をDocker Imageをビルドマシンに移動してImageビルド時にコピーするコマンドを用意する。ビルド時に「/usr/local/spark」からbuildコマンド実行するので、マシンの「/usr/local/spark/kubernetes/dockerfiles/spark8s/hadoop-3.2.2」にSCPしておきました。
Dockerfileに以下のビルドコマンドを追加します。
# Set up Hadoop environment COPY kubernetes/dockerfiles/spark8s/hadoop-3.2.2 /usr/lib/hadoop/hadoop-3.2.2 RUN cd /usr/lib/hadoop && \ ln -s hadoop-3.2.2 current && \ chown -R root:hadoop /usr/lib/hadoop/hadoop-3.2.2 && \ chmod 775 /usr/lib/hadoop/current/etc/hadoop && \ cd /etc && \ ln -s /usr/lib/hadoop/current/etc/hadoop hadoop
次に、sparkユーザーの環境変数を設定しましょう。Dockerfileの後尾に以下のコマンドを追加することで対応できます。
USER ${spark_uid} # Set environment variables for spark user in order to access to HDFS RUN touch /home/spark/.bash_profile ENV HADOOP_HOME /usr/lib/hadoop/current ENV HADOOP_CONF_DIR ${HADOOP_HOME}/etc/hadoop ENV HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_HOME}/lib/native ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${HADOOP_COMMON_LIB_NATIVE_DIR}" ENV PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:/opt/spark/bin:/opt/spark/sbin:${PATH} ENV PYTHON_PATH=/usr/bin/python3
早速、Docker Imageをビルドして確認しましょう。Sparkのソースと同梱するビルドツールを使うのでコマンドは以下のとおりです。
k8sadmin@quytd-spark-tool:~$ cd /usr/local/spark k8sadmin@quytd-spark-tool:/usr/local/spark$ sudo bin/docker-image-tool.sh -r quytd -t v3.2.1 -p kubernetes/dockerfiles/spark8s/Dockerfile build 〜〜一部抜粋〜〜 Successfully built f70981d7b243 Successfully tagged quytd/spark:v3.2.1 Successfully built edf35405b7b2 Successfully tagged quytd/spark-py:v3.2.1
ビルドが成功したのでImageを確認しましょう。ツールから2個のImageファイルを作成しましたが、今回使いたいものがquytd/spark-pyになります。
k8sadmin@quytd-spark-tool:/usr/local/spark$ sudo docker images | grep v3.2.1 quytd/spark-py v3.2.1 edf35405b7b2 3 hours ago 3.19GB quytd/spark v3.2.1 f70981d7b243 3 hours ago 613MB k8sadmin@quytd-spark-tool:/usr/local/spark$
ビルドしたImageが問題ないか確認するため、DockerからContainerを立ち上がって確認したいと思います。
sudo docker run -it quytd/spark-py:v3.2.1 /bin/bash spark@2aed70d6b90c:/$ hdfs dfs -ls / 2021-07-04 11:03:21,326 WARN hdfs.DFSUtilClient: Namenode for k-deltalake remains unresolved for ID nn1. Check your hdfs-site.xml file to ensure namenodes are configured properly. 2021-07-04 11:03:21,560 WARN hdfs.DFSUtilClient: Namenode for k-deltalake remains unresolved for ID nn2. Check your hdfs-site.xml file to ensure namenodes are configured properly. 2021-07-04 11:03:22,078 INFO retry.RetryInvocationHandler: java.net.UnknownHostException: Invalid host name: local host is: (unknown); destination host is: "katano-vps-deltalake-m02.testspark.com":8020; java.net.UnknownHostException; For more details see: http://wiki.apache.org/hadoop/UnknownHost, while invoking ClientNamenodeProtocolTranslatorPB.getFileInfo over katano-vps-deltalake-m02.testspark.com:8020 after 1 failover attempts. Trying to failover after sleeping for 925ms.
うまくHDFSクラスターのNamenodeのホスト名を解消できなかったのでようです。原因を調べてみます。
Hadoopクラスターのホスト問題と回避
上記のエラーログを確認したら、以下の内容がありましたので、Hadoopクラスターの設定ファイル(core-site.xml, hdfs-site.xmlなど)にクラスターのNamenodeやDatanodeがFQDNで記述されていますが、Containerの/etc/hostsには記載してないのでうまくHadoopクラスターに接続できなかったです。
Namenode for k-deltalake remains unresolved for ID nn1.Check your hdfs-site.xml file to ensure namenodes are configured properly. Namenode for k-deltalake remains unresolved for ID nn2.Check your hdfs-site.xml file to ensure namenodes are configured properly. java.net.UnknownHostException: Invalid host name
実は、DockerのImageをビルドするときは/etc/hostsが変更してもビルド後にリセットされて変更内容が残らなく、Docker runでContainerを起動するときに「—add-host FQDN:IP」で/etc/hostsにFQDNとIPを追加できますが、今回はKubernetesを使うのでこの方法は難しいですね。一方でKubernetesもこの方法を非推奨でHostAliasesの対応方法を提案しました。
ただし、Spark on Kubernetesのユーザーケースで考えるとやはり適用も難しいかと思いました。StackOverflowなどこの問題で悩んでいる人が結構いますね。よく考えたら今回はContainerからHadoopクラスターの各ノードに通信できたら良いっていう感じなので、ひとまずHadoopの設定ファイルにFQDNがホストのIPに置き換えることで解決できるのではないかと思いましたので、core-site.xml, hdfs-site.xml, yarn-site.xmlで実施し、Docker Imageを再度ビルドしました。まあ、綺麗なやり方ではないと思いますが、稼働中なContainerからHDFSに接続できるようになりました。
ちなみに、ContainerからHDFSにファイル書き込み、読み込みが問題ないことを確認しました。良さそうですね。
sudo docker run -it quytd/spark-py:v3.2.1 /bin/bash spark@2aed70d6b90c:/$ hdfs dfs -ls /tmp Found 5 items drwxr-xr-x - spark hadoop 0 2021-07-03 06:20 /tmp/delta-table drwxr-xr-x - hdfs hadoop 0 2021-06-06 05:42 /tmp/exp_data drwx-wx-wx - spark hadoop 0 2021-04-01 01:50 /tmp/hive drwxrwxrwx - hdfs hadoop 0 2021-04-03 00:11 /tmp/load_data drwxrwxrwx - hdfs hadoop 0 2021-07-02 22:48 /tmp/spark8s spark@2aed70d6b90c:/$ spark@2aed70d6b90c:/$ echo "1,2,3,4,5,6" > /tmp/test.txt spark@2aed70d6b90c:/$ hdfs dfs -put /tmp/test.txt /tmp/spark8s/ spark@2aed70d6b90c:/$ hdfs dfs -ls /tmp/spark8s/test.txt -rw-r--r-- 3 spark hadoop 12 2021-07-03 09:17 /tmp/spark8s/test.txt spark@2aed70d6b90c:/$ hdfs dfs -cat /tmp/spark8s/test.txt 1,2,3,4,5,6 spark@2aed70d6b90c:/$
Spark Operatorを導入する
Dockerより単一ContainerからHadoopクラスターに接続できましたので、Kubernetesを使ってpySparkアプリケーションで複数のContainerを起動して検証したいと思います。
今回、以下のpySparkスクリプトでHDFSに接続の検証します。
from pyspark.sql import SparkSession sparkSession = SparkSession.builder.appName("SparkSimpleHDFSIO").getOrCreate() # Read data from HDFS df_load = sparkSession.read.csv('/tmp/spark8s/test.txt') df_load.show() sparkSession.stop()
前回の記事と同様の場所に展開して、PythonのHTTPサーバーでファイルを提供できるようにしておく。
cd /var/local/www/spark8s/ python3 -m http.server 30001
前回のブログはSpark-submitを対応するホスト(quytd-spark-toolマシン)からSpark-submitコマンドを実行しましたが、作業のMacbook PCなどKubectlを実行可能だがspark-submitを対応しないマシーンからも実行できるようにしたいので、Spark Operatorを導入しました。
Spark OperatorがKubernetesを開発したGoogle社から提供している機能で、Kubernetes上のApache Sparkアプリケーションのライフサイクルを管理するための優れたKubernetesオペレーターです。詳細はSpark Operatorの公開資料をご参照ください。ちなみに、Spark Operatorをうまく使えば、上記の/etc/hosts問題を解決できる可能性があると思います。
Spark OperatorのArchitectureは以下のイメージです。
今回、Spark Operatorのインストール手順を参考してLensのConsoleで以下のHelmコマンドを使ってインストールしました。
$ helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator $ helm install sparkoperator spark-operator/spark-operator --namespace spark-operator --create-namespace
SparkOperatorはKubernetesの世界に入るのでspark-submitはYAMLファイルで実装されます。検証用のスクリプトは以下の感じで、ファイル名が「spark-operator/pyspark-hdfs-simple-io.yaml」にします。
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-hdfs-io namespace: default labels: app: spark8s version: 3.1.1 spec: type: Python pythonVersion: "3" mode: cluster image: "quytd/spark-py:v3.2.1" imagePullPolicy: Always mainApplicationFile: "http://192.168.1.14:30001/hdfs_simple_io.py" sparkVersion: "3.1.1" restartPolicy: type: Never driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.1.1 serviceAccount: spark podSecurityContext: runAsUser: 185 executor: cores: 1 instances: 1 memory: "512m" labels: version: 3.1.1 serviceAccount: spark podSecurityContext: runAsUser: 185
早速LensのConsoleからSpark Operatorを適用コマンドを実行して結果を確認したいと思います。
kubectl apply -f spark-operator/pyspark-hdfs-simple-io.yaml sparkapplication.sparkoperator.k8s.io/spark-hdfs-io created
LensでSparkジョブの実行が完了したことを確認できたのでDriver Podを確認しましょう。
% kubectl describe pod/spark-hdfs-io-driver ~~~一部抜粋〜〜〜 Name: spark-hdfs-io-driver Node: quytd-spark-tool/192.168.1.14 Status: Succeeded IP: 10.233.73.7 Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 2m12s default-scheduler Successfully assigned default/spark-hdfs-io-driver to quytd-spark-tool Normal Pulling 2m11s kubelet Pulling image "quytd/spark-py:v3.2.1" Normal Pulled 2m9s kubelet Successfully pulled image "quytd/spark-py:v3.2.1" in 2.358226966s Normal Created 2m9s kubelet Created container spark-kubernetes-driver Normal Started 2m9s kubelet Started container spark-kubernetes-driver
「Status:Succeeded」なのでpySparkジョブの実行が正常終了したようですね。Driver PodのLogを確認しましょう。
kubectl logs pod/spark-hdfs-io-driver ~~~一部抜粋〜〜〜 2021-07-03 10:28:48,824 INFO codegen.CodeGenerator: Code generated in 57.497623 ms +---+---+---+---+---+---+ |_c0|_c1|_c2|_c3|_c4|_c5| +---+---+---+---+---+---+ | 1| 2| 3| 4| 5| 6| +---+---+---+---+---+---+
KubernetesからもpySparkのスクリプトの実行結果は想定の通りで良さそうですね!
Delta Lakeの検証
最近、DataBricks社がDelta Lake 1.0を発表したので、この機会でDelta Lakeを対応する環境にしてDelta Tableだけ軽く確認したいと思います。Hadoopクラスターが構築された後にアイオワ州のお酒販売のデータセットを以下のDeltaテーブルフォーマットで作成されましたので、そのまま使いたいと思います。
[spark@katano-vps-deltalake-m01 ~]$ hdfs dfs -ls /apps/spark/warehouse/deltalake_db.db Found 2 items drwxr-xr-x - spark hadoop 0 2021-04-03 09:24 /apps/spark/warehouse/deltalake_db.db/iowa_liquor_sales drwxr-xr-x - spark hadoop 0 2021-04-03 10:43 /apps/spark/warehouse/deltalake_db.db/iowa_liquor_sales_large
環境設定
Delta Lakeを使うため、PySparkシェルにDelta Lakeのパッケージを読み込んで接続することは一般的ですが、今回はバッチモードでSparkOperatorからDelta Lakeのテーブルにクエリできるように設定したいと思います。そのため、Kubernetes上に動いているContainerに以下の2点が揃う必要がある
① Pythonのdelta-sparkに関係するPackageがインストールされる
②Sparkのjarsフォルダーにdeltalakeに関係するjarsファイルが存在する
①を対応するため、Python実行環境構築部分に「pip3 install delta-spark」を追加しましょう。
RUN mkdir ${SPARK_HOME}/python RUN apt-get update && \ apt install -y python3 python3-pip && \ pip3 install --upgrade pip setuptools && \ pip3 install numpy && \ pip3 install delta-spark && \ # Removed the .cache to save space rm -r /root/.cache && rm -rf /var/cache/apt/*
②のJarsファイルを用意するため、Dockerfileに以下のコマンドを追加します。
WORKDIR /opt/spark/jars RUN wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar && \ wget https://repo1.maven.org/maven2/org/antlr/antlr4/4.7/antlr4-4.7.jar && \ wget https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.7/antlr4-runtime-4.7.jar && \ wget https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar && \ wget https://repo1.maven.org/maven2/org/antlr/ST4/4.0.8/ST4-4.0.8.jar && \ wget https://repo1.maven.org/maven2/org/abego/treelayout/org.abego.treelayout.core/1.0.3/org.abego.treelayout.core-1.0.3.jar && \ wget https://repo1.maven.org/maven2/org/glassfish/javax.json/1.0.4/javax.json-1.0.4.jar && \ wget https://repo1.maven.org/maven2/com/ibm/icu/icu4j/58.2/icu4j-58.2.jar
DockerFileを保存してSparkのImageビルドツールで再度ビルドし、Kubernetesの各ノードに使えるようにするためRegistryにPushしておきます。
pySparkスクリプトの準備
Delta Lakeを検証するためのpySparkスクリプトは以下の感じで、delta_lake.pyとしてPythonのHTTPサーバーで提供するようにしておきます。
from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr from delta.tables import DeltaTable # Enable SQL commands and Update/Delete/Merge for the current spark session. # we need to set the following configs spark = SparkSession.builder \ .appName("quickstart") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .config("hive.metastore.warehouse.dir", "hdfs://192.168.1.1:8020/apps/spark/warehouse/deltalake_db.db") \ .enableHiveSupport() \ .getOrCreate() # Read the table print("############ Reading the table ###############") df = spark.read.format("delta").load("/apps/spark/warehouse/deltalake_db.db/iowa_liquor_sales") df.show(5) spark.stop()
以下のようにSparkOperatorのYAMLファイルを準備します。
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-delta-lake namespace: default spec: type: Python pythonVersion: "3" mode: cluster image: "quytd/spark-py:v3.2.1" imagePullPolicy: Always mainApplicationFile: "http://192.168.1.14:30001/delta_lake.py" sparkVersion: "3.1.1" restartPolicy: type: Never driver: cores: 1 coreLimit: "1200m" memory: "512m" labels: version: 3.1.1 serviceAccount: spark executor: cores: 1 instances: 2 memory: "512m" labels: version: 3.1.1 serviceAccount: spark
結果確認
Spark OperatorをApplyしてDelta Lakeスクリプトが実行が無事に終了したようです。
% kubectl apply -f spark-operator/pyspark-delta-lake.yaml sparkapplication.sparkoperator.k8s.io/spark-delta-lake created ~ % kubectl describe pod/spark-delta-lake-driver Name: spark-delta-lake-driver Node: quytd-spark-s1/192.168.1.12 Labels: spark-app-selector=spark-8db654d45f3946c187348b9addc2f5fe spark-role=driver version=3.1.1 Status: Succeeded
Delta Tableからデータを取得できるか確認しましょう
% kubectl logs pod/spark-delta-lake-driver ~~一部抜粋~~ +-------------------+----------+------------+----------------+-------------------+------+--------+--------------------+-------------+------+--------+-------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------------+-------------------+------------+------+------------------+-------------------+ |invoice_item_number| sale_date|store_number| store_name| address| city|zip_code| store_location|county_number|county|category|category_name|vendor_number| vendor_name|item_number| item_description|pack|bottle_volume|state_bottle_cost|state_bottle_retail|bottles_sold| sale|volume_sold_liters|volume_sold_gallons| +-------------------+----------+------------+----------------+-------------------+------+--------+--------------------+-------------+------+--------+-------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------------+-------------------+------------+------+------------------+-------------------+ | S32988900055|2016-06-28| 3942|Twin Town Liquor|104 Highway 30 West|Toledo| 52342|104 Highway 30 We...| 86| Tama| 1062310| SPICED RUM| 395| Proximo| 46502| Kraken Black Mini| 4| 750| $10.75| $16.13| 1|$16.13| null| null| | S32988900001|2016-06-28| 3942|Twin Town Liquor|104 Highway 30 West|Toledo| 52342|104 Highway 30 We...| 86| Tama| 1062300| FLAVORED RUM| 260| Diageo Americas| 43314|Captain Morgan Gr...| 12| 750| $8.26| $12.39| 2|$24.78| null| null| | S32988900002|2016-06-28| 3942|Twin Town Liquor|104 Highway 30 West|Toledo| 52342|104 Highway 30 We...| 86| Tama| 1062300| FLAVORED RUM| 260| Diageo Americas| 43352|Captain Morgan Pi...| 12| 750| $8.26| $12.39| 1|$12.39| null| null| | S32988900003|2016-06-28| 3942|Twin Town Liquor|104 Highway 30 West|Toledo| 52342|104 Highway 30 We...| 86| Tama| 1062300| FLAVORED RUM| 65| Jim Beam Brands| 44350| Calico Jack Cherry| 12| 750| $6.30| $9.45| 2|$18.90| null| null| | S32988900004|2016-06-28| 3942|Twin Town Liquor|104 Highway 30 West|Toledo| 52342|104 Highway 30 We...| 86| Tama| 1062300| FLAVORED RUM| 370|Pernod Ricard USA...| 42676|Malibu Passion Fr...| 12| 750| $7.49| $11.24| 2|$22.48| null| null| +-------------------+----------+------------+----------------+-------------------+------+--------+--------------------+-------------+------+--------+-------------+-------------+--------------------+-----------+--------------------+----+-------------+-----------------+-------------------+------------+------+------------------+-------------------+ only showing top 5 rows
HDFSに存在しているDelta Tableからデータをちゃんと取得できるようになっています。よさそうです!
所感
いかがでしょうか?今回はKubernetesクラスターは別のHadoopクラスターと連携し、HDFSクラスターをK8sクラスターのStorage Layerにする内容について紹介しました。既存Hadoopクラスターにすでデータが蓄積しているがSparkクラスターだけもっと柔軟性を向上したい時にこの対応案は選択肢の1つではないでしょうか。自分が連携できた後Spark OperatorでpySparkジョブの実行やDelta Lakeの検証も実施することができましたのでこのクラスター構成でSpark on Kubernetesを引き続き検証して知見を広げていきたいと考えています。Kubernetesで柔軟性向上することを期待できまると思いますが、Kubernetes自体が概念が多くてうまく運用できるまで学習時間がかかってしまい、場合によってはKubernetesのことに時間が集中してしまう恐れがあると思いますので、技術選定のときにちゃんと検討した方が良さそうです。
最後に
次世代システム研究室では、ビッグデータ解析プラットホームの設計・開発を行うアーキテクトとデータサイエンティストを募集しています。次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。
皆さんのご応募をお待ちしています。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD