2022.01.11

GKE及びGCSを用いてSpark on K8s, Delta Lakeデータ解析基盤を構築してみた

こんにちは。次世代システム研究室のT.D.Qです。
ビッグデータ分析は、データドリブン経営を実現するうえで欠かせない要素であり、市場ニーズの変化が著しい近年では特に需要を増しています。Google Cloudにはビッグデータ向けのDataProcサービスがありまして、GCP上にHadoopやSparkを使った大規模データを取り扱うための環境構築をより簡単に行うために作られました。ですが、場合によってはこのサービスの料金が大量発生してしまうので、Dataproc及びDataproc Metastoreを使わずにGCP上にデータ解析基盤を作れるか今回の記事で検証したいと思います。

今回やりたいこと

  1. GKE上にApache Sparkクラスターを構築したい
  2. GCS上にDelta Lakeを構築できるようにしたい
  3. Delta lakeのHiveメタストアを構築して使いたい
  4. 構築したSparkクラスターにてDelta Lakeから抽出・永続化できるようにしたい

GKE上にK8sクラスター作成

GKEの概要

Google Kubernetes Engine (GKE)は、 Google Cloud (GCP)上で提供されている Kubernetes のマネージドサービスです。マスターノードは GKE が管理を行うため、ユーザー側は管理の必要がありません。
GKEはコンソールを利用してクラスターを容易に構築でき、負荷に応じたノードの自動スケーリングも可能となっています。また、 Google Cloud (GCP)のサービスアカウントや IAM を連携して権限を制御するなど、セキュリティを強化するための機能も多く実装されています。
次の図は、GKE 内のゾーンクラスタのアーキテクチャの概要を示しています。
GKE 内のゾーンクラスタのアーキテクチャ
GKEについて詳しく知りらい方はこちらのページをご参照ください。

GKE上にK8sクラスター作成

GKE上にK8sクラスターを作成するため、Googleは複数の方法をサポートしています。google-cloud-sdkを利用するTerraformGcloudコンソールGKEのWEB画面で作成できます。主なステップは以下の4ステップですね。
  1. Google Cloudで使うGoogleアカウントのセットアップ
  2. Kubernetes Engine APIを有効化する
  3. Gcloudコマンドのインストールとkubectlのインストール
  4. Google Container Engine上にKubernetesのクラスタを構築
K8sクラスター作成手順はインターネットで公開しているブログは結構あるので今回の記事は詳細な説明を割愛します。

自前のMetaStoreを構築

データレイクでオープンソース テクノロジーを使用している場合、信頼できるメタストアとしてビッグデータの処理用に Hive メタストアをすでに利用したことがあるかもしれません。Hive メタストアは、オープンソースのデータシステムがデータ構造を共有するために使用するメカニズムとして標準化されています。次の図は、Hive メタストアを中心にすでに構築されているエコシステムの一部を示しています。

Hive メタストア

特にSparkはテーブルをHiveと連携してデータ基盤のメタデータを管理することで色々便利ですので今回はMySQLでHive Metastoreを構築したいと思います。

Secret及びConfigMapを作成

以下の感じでMySQLのSecret情報及び設定情報を定義しておきます。
ファイル名はmetastore_configmap_secret.ymlとします。
---
apiVersion: v1
kind: Secret
metadata:
  name: mysql-secret
data:
  MYSQL_ROOT_PASSWORD: 0YXb3JlX3Jxxxxx
  MYSQL_DATABASE: bW0YXN0b3JlX2Rhxxxxx
  MYSQL_USER: 0YN0b3JlX3VzZIxxxxxx
  MYSQL_PASSWORD: bYXN0b3Jl3xxxxx
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: mysql-config-file
data:
  custom.cnf: |
    [mysqld]
    default_authentication_plugin=mysql_native_password
    character-set-server=utf8mb4

    [client]
    default-character-set=utf8mb4
※Secretの値にはbase64エンコードされた値を入れておく必要があります。
echo -n "<TEXT>" | openssl enc -e -base64
MySQLの設定に必要なConfigMapとSecretをapplyしておきます。
kubectl apply -f metastore_configmap_secret.yml

MetaStore用のServiceを作成

ファイル名はmetastore_service.ymlとしましょう。
---
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  replicas: 1
  serviceName: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: mysql:8.0.15
        name: mysql
        ports:
        - containerPort: 3306
          name: mysql
        envFrom:
        - configMapRef:
            name: mysql-config-file
        - secretRef:
            name: mysql-secret
        volumeMounts:
        - name: mysql-persistent-storage
          mountPath: /var/lib/mysql
          subPath: mysql
        - name: mysql-config-volume
          mountPath: /etc/mysql/conf.d/custom.cnf
          subPath: custom.cnf
      volumes:
      - name: mysql-config-volume
        configMap:
          name: mysql-config-file
  volumeClaimTemplates:
  - metadata:
      name: mysql-persistent-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi

kubectl apply -f metastore_service.yml

Metastoreの動作確認

MySQLのMetastoreのPodにSSHしてMetastoreが稼働しているか確認したいと思います。
tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$ kubectl exec -it mysql-0 -- /bin/bash

root@mysql-0:/# mysql -u metastore_username -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 14
Server version: 8.0.15 MySQL Community Server - GPL

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| metastore_database |
+--------------------+
2 rows in set (0.00 sec)

mysql> use metastore_database;
Database changed
mysql> show tables;
Empty set (0.00 sec)
問題なく動いていますね。

Hive Metastoreのテーブルを作成

テーブルが作成されてないのでHive Metastoreのテーブルを作成しましょう。Spark3.1.2はHive2.3.7を使っているので、Hive2.3.7のMetastore SQLを使った方が良さそうですね。
以下のコマンドでHive Metastoreのテーブルを作成しましょう。
mysql -u metastore_user -p metastore_database < hive-schema-2.3.0.mysql.sql
mysql -u metastore_user -p metastore_database < hive-txn-schema-2.3.0.mysql.sql
mysql> use metastore_database;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+------------------------------+
| Tables_in_metastore_database |
+------------------------------+
| AUX_TABLE                    |
| BUCKETING_COLS               |
| CDS                          |
| COLUMNS_V2                   |
| COMPACTION_QUEUE             |
| COMPLETED_COMPACTIONS        |
| COMPLETED_TXN_COMPONENTS     |
| DATABASE_PARAMS              |
| DBS                          |
| DB_PRIVS                     |
| DELEGATION_TOKENS            |
| FUNCS                        |
| FUNC_RU                      |
| GLOBAL_PRIVS                 |
| HIVE_LOCKS                   |
| IDXS                         |
| INDEX_PARAMS                 |
| KEY_CONSTRAINTS              |
| MASTER_KEYS                  |
| NEXT_COMPACTION_QUEUE_ID     |
| NEXT_LOCK_ID                 |
| NEXT_TXN_ID                  |
| NOTIFICATION_LOG             |
| NOTIFICATION_SEQUENCE        |
| NUCLEUS_TABLES               |
| PARTITIONS                   |
| PARTITION_EVENTS             |
| PARTITION_KEYS               |
| PARTITION_KEY_VALS           |
| PARTITION_PARAMS             |
| PART_COL_PRIVS               |
| PART_COL_STATS               |
| PART_PRIVS                   |
| ROLES                        |
| ROLE_MAP                     |
| SDS                          |
| SD_PARAMS                    |
| SEQUENCE_TABLE               |
| SERDES                       |
| SERDE_PARAMS                 |
| SKEWED_COL_NAMES             |
| SKEWED_COL_VALUE_LOC_MAP     |
| SKEWED_STRING_LIST           |
| SKEWED_STRING_LIST_VALUES    |
| SKEWED_VALUES                |
| SORT_COLS                    |
| TABLE_PARAMS                 |
| TAB_COL_STATS                |
| TBLS                         |
| TBL_COL_PRIVS                |
| TBL_PRIVS                    |
| TXNS                         |
| TXN_COMPONENTS               |
| TYPES                        |
| TYPE_FIELDS                  |
| VERSION                      |
| WRITE_SET                    |
+------------------------------+
57 rows in set (0.00 sec)
ここでHive Metastoreを構築できました。

データ分析基盤のDockerfileを実装

Apache Sparkのインストール

今回はSparkの最新版(3.1.2)を使いたいのでこのバージョンでDocker Imageをビルドします。
もちろん他のSparkバージョンを使いたいときはそのバージョンを指定してDocker Imageをビルドできますが、今回はDelta Lakeのデータレイクを構築していくのでDelta LakeがサポートするSparkバージョンを考慮する必要がある。現時点はSparkの最新バージョンが3.2.0ですが、Delta Lakeの最新バージョンはこのSparkバージョンをまだサポートしていないので、3.1.2にしました。
# Spark dependencies
# Default values can be overridden at build time
# (ARGS are in lower case to distinguish them from ENV)
ARG spark_version=&quot;3.1.2&quot;
ARG hadoop_version=&quot;3.2&quot;
ARG spark_checksum=&quot;2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B70FAE62B5A96146F0421CB93D4E51491308EF5D5&quot;
ARG openjdk_version=&quot;11&quot;

ENV APACHE_SPARK_VERSION=&quot;${spark_version}&quot; \
    HADOOP_VERSION=&quot;${hadoop_version}&quot;

RUN apt-get update --yes &amp;&amp; \
    apt-get install --yes --no-install-recommends curl \
    &quot;openjdk-${openjdk_version}-jre-headless&quot; \
    ca-certificates-java &amp;&amp; \
    apt-get clean &amp;&amp; rm -rf /var/lib/apt/lists/*

# Spark installation
WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
    tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"

WORKDIR /usr/local

# Configure Spark
ENV SPARK_HOME=/usr/local/spark
ENV SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
    PATH="${PATH}:${SPARK_HOME}/bin"

RUN ln -s "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" spark && \
    # Add a link in the before_notebook hook in order to source automatically PYTHONPATH
    mkdir -p /usr/local/bin/before-notebook.d && \
    ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh

# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"  
 

Delta lakeの構築

Delta lakeのライブラリを用意する必要がありますので以下のコマンドでDocker Imageにインストールしておきます。
# Download libraries for Delta lake and Hive metastore
WORKDIR /opt/spark/jars
RUN wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar && \
    wget https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/failureaccess-1.0.1.jar && \ 
    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz && \
    tar xzvf mysql-connector-java-8.0.27.tar.gz && mv mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar ./mysql-connector-java-8.0.27.jar && \
    rm -rf mysql-connector-java-8.0.27.tar.gz && rm -rf mysql-connector-java-8.0.27

RUN chmod +x /opt/spark/jars/mysql-connector-java-8.0.27.jar && \
    chmod +x /opt/spark/jars/failureaccess-1.0.1.jar && \
    chmod +x /opt/spark/jars/guava-30.1-jre.jar

# Install delta lake spark libraries
RUN pip install delta-spark pandas pyarrow

注意点としては、PySparkが使うguavaライブラリとGCS connectorが使うguavaのバージョンが違うため、SparkからGCSにうまく接続できません。
この問題を解決するため、以下の感じでGuavaのバージョンを統一します。
# Fix Guava version issue
RUN mv /opt/conda/lib/python3.9/site-packages/pyspark/jars/guava-14.0.1.jar /opt/conda/lib/python3.9/site-packages/pyspark/jars/guava-14.0.1.jar.bak && \
    rm $SPARK_HOME/jars/guava-14.0.1.jar && \
    cp /opt/spark/jars/guava-30.1-jre.jar /opt/conda/lib/python3.9/site-packages/pyspark/jars/ && \
    cp /opt/spark/jars/*.jar $SPARK_HOME/jars/

Metastoreに接続するための準備

構築したMySQL Serviceのアドレスを指定し、接続するためのアカウント情報を設定する必要があります。また、GCSにデータ永続化するため、hive.metastore.warehouse.dirがGCSに作成するBucket名を指定します。
ファイル名はhive-site.xmlなので以下の内容で実装します。
<configuration>
      <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://xx.xx.xx.xx:3306/metastore_database</value>
      </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>metastore_username</value>
      </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>xxxxxxx</value>
      </property>
      <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
      </property>
      <property>
         <name>hive.metastore.warehouse.dir</name>
         <value>gs://forex_data_xxxx/delta_lake/warehouse</value>
         <description>location of default database for the warehouse</description>
      </property>
</configuration>
このhive-site.xmlファイルをSPARK_HOME直下のconfにコピーしましょう。
COPY ./hive-site.xml /usr/local/spark/conf/hive-site.xml
ENV HIVE_CONF_DIR=/usr/local/spark/conf
MetastoreはMySQLで構築したのでMySQLのJDBCを用意する必要がありますので、
上のステップにmysql-connector-java-8.0.27.jarをImageにダウンロードしておきました。

Dockerfileの最終ソースコード

# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
ARG OWNER=jupyter
ARG BASE_CONTAINER=$OWNER/base-notebook
FROM $BASE_CONTAINER


# Fix DL4006
SHELL ["/bin/bash", "-o", "pipefail", "-c"]

USER root

# Spark dependencies
# Default values can be overridden at build time
# (ARGS are in lower case to distinguish them from ENV)
ARG spark_version="3.1.2"
ARG hadoop_version="3.2"
ARG spark_checksum="2385CB772F21B014CE2ABD6B8F5E815721580D6E8BC42A26D70BBCDDA8D303D886A6F12B36D40F6971B5547B70FAE62B5A96146F0421CB93D4E51491308EF5D5"
ARG openjdk_version="11"

ENV APACHE_SPARK_VERSION="${spark_version}" \
    HADOOP_VERSION="${hadoop_version}"

ENV JUPYTER_ENABLE_LAB=yes

RUN apt-get update --yes && \
    apt-get install --yes --no-install-recommends curl \
    "openjdk-${openjdk_version}-jre-headless" \
    ca-certificates-java && \
    apt-get clean && rm -rf /var/lib/apt/lists/*

# Spark installation
WORKDIR /tmp
RUN wget -q "https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    echo "${spark_checksum} *spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" | sha512sum -c - && \
    tar xzf "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -C /usr/local --owner root --group root --no-same-owner && \
    rm "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz"

WORKDIR /usr/local

# Configure Spark
ENV SPARK_HOME=/usr/local/spark
ENV SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info" \
    PATH="${PATH}:${SPARK_HOME}/bin"

RUN ln -s "spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}" spark && \
    # Add a link in the before_notebook hook in order to source automatically PYTHONPATH
    mkdir -p /usr/local/bin/before-notebook.d && \
    ln -s "${SPARK_HOME}/sbin/spark-config.sh" /usr/local/bin/before-notebook.d/spark-config.sh

# Fix Spark installation for Java 11 and Apache Arrow library
# see: https://github.com/apache/spark/pull/27356, https://spark.apache.org/docs/latest/#downloading
RUN cp -p "${SPARK_HOME}/conf/spark-defaults.conf.template" "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.driver.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf" && \
    echo 'spark.executor.extraJavaOptions -Dio.netty.tryReflectionSetAccessible=true' >> "${SPARK_HOME}/conf/spark-defaults.conf"  

COPY ./hive-site.xml /usr/local/spark/conf/hive-site.xml
ENV HIVE_CONF_DIR=/usr/local/spark/conf

# Download libraries for Delta lake and Hive metastore
WORKDIR /opt/spark/jars
RUN wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar && \
    wget https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/failureaccess-1.0.1.jar && \ 
    wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz && \
    tar xzvf mysql-connector-java-8.0.27.tar.gz && mv mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar ./mysql-connector-java-8.0.27.jar && \
    rm -rf mysql-connector-java-8.0.27.tar.gz && rm -rf mysql-connector-java-8.0.27

RUN chmod +x /opt/spark/jars/mysql-connector-java-8.0.27.jar && \
    chmod +x /opt/spark/jars/failureaccess-1.0.1.jar && \
    chmod +x /opt/spark/jars/guava-30.1-jre.jar

# Install delta lake spark libraries
RUN pip install delta-spark pandas pyarrow

# Fix Guava version issue
RUN mv /opt/conda/lib/python3.9/site-packages/pyspark/jars/guava-14.0.1.jar /opt/conda/lib/python3.9/site-packages/pyspark/jars/guava-14.0.1.jar.bak && \
    rm $SPARK_HOME/jars/guava-14.0.1.jar && \
    cp /opt/spark/jars/guava-30.1-jre.jar /opt/conda/lib/python3.9/site-packages/pyspark/jars/ && \
    cp /opt/spark/jars/*.jar $SPARK_HOME/jars/

USER ${NB_UID}

RUN mkdir -p ${HOME}/.ivy2/jars
WORKDIR ${HOME}/.ivy2/jars
RUN wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.1.0/delta-core_2.12-1.1.0.jar && \
    wget https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.8/antlr4-runtime-4.8.jar && \
    wget https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar

RUN chmod +x ${HOME}/.ivy2/jars/delta-core_2.12-1.1.0.jar && \
    chmod +x ${HOME}/.ivy2/jars/antlr4-runtime-4.8.jar && \
    chmod +x ${HOME}/.ivy2/jars/jackson-core-asl-1.9.13.jar

WORKDIR "${HOME}"

Docker Imageのビルド及びRepositoryにPush

Docker Imageをビルドする

$ docker build -t gcr.io/evident-time-3366xx/pyspark-jupyterlab:latest .
Successfully built 9f2a3284fc38
Successfully tagged gcr.io/evident-time-3366xx/pyspark-jupyterlab:latest
$ docker images
REPOSITORY                                      TAG       IMAGE ID       CREATED         SIZE
gcr.io/evident-time-3366xx/pyspark-jupyterlab   latest    9f2a3284fc38   7 seconds ago   1.76GB

Docker ImageをGoogle Container RegistryにPush

tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$ docker push gcr.io/evident-time-3366xx/pyspark-jupyterlab:latest
The push refers to repository [gcr.io/evident-time-3366xx/pyspark-jupyterlab]
latest: digest: sha256:7c4f0d7109e268a547d0690901e345977fc5e456f40e6e0ac5d461872b598f84 size: 5137

GCS上にデータ永続化の準備

Google Cloud Storage(以降GCS)とはGoogle Cloud Platform(GCP)が提供しているオンラインストレージサービスです。
GCSを利用する為には用語の理解をする必要があります。
  • バケット
    1. データを格納するコンテナ
    2. 地理的なロケーションや、ストレージクラスの設定が可能
  • オブジェクト
    1. GCSに保存する個々のデータ
    2. ストレージクラスの設定が可能
https://cloud-ace.jp/column/detail95/

GCS上にBucketの作成

Google Storage Serviceの管理画面でデータレイクの永続化先を作成します。名前はforex_data_20211229とします。
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil ls
gs://forex_data_20211229/

IAM管理でサービスアカウント作成

Docker container内に実行するPySparkプログラムからGCS上に構築したDelta Lakeに接続するため、サービスアカウントが必要です。サービス アカウントは、ユーザーではなく、Compute Engine 仮想マシン(VM)インスタンスなどのアプリケーションやコンピューティング ワークロードで使用される特別なアカウントです。アプリケーションはサービス アカウントを使用して、承認された API 呼び出しを行います。このサービスアカウントがGCPのIAM管理サービスで作成できますので、GCPが公開している手順を従って作成しました。

K8sクラスターにデータ解析基盤を展開する

データ解析基盤のサービスを作成

基本的には前回のブログに使ったYMLファイル内容と同じですが、GCPの秘密情報の管理はConfigMapSecretを使うように変更しました。
apiVersion: v1
kind: ServiceAccount
metadata:
  name: jupyter
  labels:
    release: jupyter
secrets:
- name: gcp-service-account
- name: gcs-access-key
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: jupyter
  labels:
    release: jupyter
  namespace: spark
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - create
  - get
  - delete
  - list
  - watch
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - get
  - create
- apiGroups:
  - ""
  resources:
  - pods/log
  verbs:
  - get
  - list
- apiGroups:
  - ""
  resources:
  - pods/exec
  verbs:
  - create
  - get
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - create
  - list
  - watch
  - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: jupyter
  labels:
    release: jupyter
  namespace: spark
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: jupyter
subjects:
- kind: ServiceAccount
  name: jupyter
  namespace: spark
- kind: User
  name: [email protected]
  namespace: spark
- kind: User
  name: [email protected]
  namespace: spark
---
apiVersion: v1
kind: Service
metadata:
  name: jupyter
  labels:
    release: jupyter
spec:
  type: ClusterIP
  selector:
    release: jupyter
  ports:
  - name: http
    port: 8888
    protocol: TCP
  - name: blockmanager
    port: 7777
    protocol: TCP
  - name: driver
    port: 2222
    protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: jupyter-headless
  labels:
    release: jupyter
spec:
  type: ClusterIP
  clusterIP: None
  publishNotReadyAddresses: false
  selector:
    release: jupyter
  ports:
  - name: http
    port: 8888
    protocol: TCP
  - name: blockmanager
    port: 7777
    protocol: TCP
  - name: driver
    port: 2222
    protocol: TCP
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: jupyter
  labels:
    release: jupyter
spec:
  replicas: 1
  updateStrategy:
    type: RollingUpdate
  serviceName: jupyter-headless
  podManagementPolicy: Parallel
  volumeClaimTemplates:
  - metadata:
      name: notebook-data
      labels:
        release: jupyter
    spec:
      accessModes:
       - ReadWriteOnce
      volumeMode: Filesystem
      resources:
        requests:
          storage: 100Mi
  selector:
    matchLabels:
      release: jupyter
  template:
    metadata:
      labels:
        release: jupyter
    spec:
      restartPolicy: Always
      ImagePullPollicy: Always
      terminationGracePeriodSeconds: 30
      serviceAccountName: jupyter
      dnsConfig:
        options:
        - name: ndots
          value: "1"
      containers:
      - name: jupyter
        image: "gcr.io/evident-time-3366xx/pyspark-jupyterlab:latest"
        imagePullPolicy: IfNotPresent
        ports:
        - name: http
          containerPort: 8888
          protocol: TCP
        - name: blockmanager
          containerPort: 7777
          protocol: TCP
        - name: driver
          containerPort: 2222
          protocol: TCP
        volumeMounts:
        - name: notebook-data
          mountPath: /home/spark/notebook
        - name: gcp-service-account
          mountPath: /usr/local/spark/conf/evident-time-3366xx-xxx.json
          subPath: evident-time-3366xx-xxx.json
        env:
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /usr/local/spark/conf/evident-time-3366xx-xxx.json
        resources:
          limits:
            cpu: 400m
            memory: 1Gi
          requests:
            cpu: 200m
            memory: 0.5Gi
      volumes:
      - name: gcp-service-account
        configMap:
          name: gcp-service-account
      - name: google-cloud-key
        secret:
          secretName: gcs-access-key

クラスターにServiceを適用する

YMLファイルを準備できましたので、K8sクラスターにSparkクラスターを展開しましょう。
tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$ kubectl create namespace spark
namespace/spark created
tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$ kubectl apply -f data-platform-config.yml
serviceaccount/jupyter created
role.rbac.authorization.k8s.io/jupyter created
rolebinding.rbac.authorization.k8s.io/jupyter created
service/jupyter created
service/jupyter-headless created
statefulset.apps/jupyter created
tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$
想定の通り、各サービスが立ち上がりました。
tranduc_quy@cloudshell:~/metastore (evident-time-3366xx)$ kubectl get services
NAME                  TYPE           CLUSTER-IP    EXTERNAL-IP     PORT(S)                      AGE
jupyter               ClusterIP      10.xx.xx.xx   <none>          8888/TCP,7777/TCP,2222/TCP   3m44s
jupyter-headless      ClusterIP      None          <none>          8888/TCP,7777/TCP,2222/TCP   3m43s
kubernetes            ClusterIP      10.xx.xx.xx     <none>        443/TCP                      45h
mysql                 ClusterIP      None          <none>          3306/TCP                     43h

動作確認

GKE上に稼働するContainerからGCSにアクセスできるか

GKE上に構築したpySpark on K8sクラスターがGCSにアクセスできるか検証したいと思います。
検証するため、GCSに検証用のCSVファイルを準備しておきます。このCSVファイルはHISTDATAサイトが公開している為替レートデータファイルです。
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil cat gs://forex_data_20211229/DAT_ASCII_USDJPY_T_202109.csv | head
20210901 000000866,110.177000,110.181000,0
20210901 000001581,110.177000,110.181000,0
20210901 000003278,110.178000,110.182000,0
20210901 000003429,110.179000,110.182000,0
20210901 000006023,110.179000,110.184000,0
20210901 000006125,110.180000,110.183000,0
20210901 000008995,110.179000,110.183000,0
20210901 000018162,110.178000,110.182000,0
20210901 000034261,110.178000,110.182000,0
20210901 000035889,110.177000,110.181000,0
tranduc_quy@cloudshell:~ (evident-time-3366xx)$
SparkOperatorを導入してSpark-summitコマンドを実行してPySparkの実行は可能になると思いますが、
今回は簡単に検証するため、まずはContainer内にログインしてGCS接続及びDelta Lake対応するpySparkを起動するため以下のコマンドを実行します。
kubectl exec -it jupyter-0 -- /bin/bash
pyspark \
  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.4,io.delta:delta-core_2.12:1.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
  --conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
  --conf "spark.hadoop.fs.gs.project.id=evident-time-3366xx"  \
  --conf "spark.hadoop.google.cloud.auth.service.account.enable=true" \
  --conf "spark.hadoop.google.cloud.auth.service.account.json.keyfile=/usr/local/spark/conf/evident-time-3366xx-xxxx.json" \
  --conf "spark.sql.catalogImplementation=hive" 
無事にPySpark Shellを起動されました。
$ pyspark \
>   --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.4,io.delta:delta-core_2.12:1.0.0 \
>   --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
>   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
>   --conf "spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS" \
>   --conf "spark.hadoop.fs.gs.project.id=evident-time-3366xx"  \
>   --conf "spark.hadoop.google.cloud.auth.service.account.enable=true" \
>   --conf "spark.hadoop.google.cloud.auth.service.account.json.keyfile=/usr/local/spark/conf/evident-time-3366xx-xxxx.json" \
>   --conf "spark.sql.catalogImplementation=hive"

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.9.7 (default, Sep 29 2021 19:20:46)
Spark context Web UI available at http://jupyter-0.jupyter-headless.default.svc.cluster.local:4040
Spark context available as 'sc' (master = local[*], app id = local-1641711782546).
SparkSession available as 'spark'.
次に、以下のコマンドでSpark Sessionを生成します。Delta Lake及びGCSに接続することを明言する必要があるので以下の感じでSparkSessionのConfigを設定しておきます。
from pyspark import SparkContext
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
from py4j.java_collections import MapConverter
from delta.tables import *

spark = SparkSession \
    .builder \
    .appName("test-gcs-deltalake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .enableHiveSupport() \
    .getOrCreate()

次に、GCSに格納したCSVファイルをちゃんと読み込めるか確認しましょう。
df = spark.read.format("csv"). \
     schema(read_schema). \
     load("gs://forex_data_20211229/DAT_ASCII_USDJPY_T_202109.csv")

df.show()
データフレームにCSVデータが格納されているのでContainer内のPySparkからGCSにアクセスできることを確認しました!
>>> df = spark.read.format("csv"). \
...      schema(read_schema). \
...      load("gs://forex_data_20211229/DAT_ASCII_USDJPY_T_202109.csv")

>>>
>>> df.show()
+------------------+-------+-------+------+                                     
|    tick_timestamp|    bid|    ask|volume|
+------------------+-------+-------+------+
|20210901 000000866|110.177|110.181|     0|
|20210901 000001581|110.177|110.181|     0|
|20210901 000003278|110.178|110.182|     0|
|20210901 000003429|110.179|110.182|     0|
|20210901 000006023|110.179|110.184|     0|
|20210901 000006125| 110.18|110.183|     0|
|20210901 000008995|110.179|110.183|     0|
|20210901 000018162|110.178|110.182|     0|
|20210901 000034261|110.178|110.182|     0|
|20210901 000035889|110.177|110.181|     0|
|20210901 000045068|110.176|110.178|     0|
|20210901 000045169|110.173|110.177|     0|
|20210901 000045663|110.172|110.178|     0|
|20210901 000054698|110.175|110.177|     0|
|20210901 000054749|110.175|110.179|     0|
|20210901 000102208|110.175|110.177|     0|
|20210901 000102360|110.172|110.177|     0|
|20210901 000102461|110.172|110.176|     0|
|20210901 000102564|110.169|110.176|     0|
|20210901 000104822|110.171|110.175|     0|
+------------------+-------+-------+------+
only showing top 20 rows

SparkがMetastoreでDelta Lakeのメタデータが連携できるか確認する

以下のコマンドでMetastoreにスキーマがあるか確認しましょう。
database_name = "gold_forex"
spark.catalog.listDatabases()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.catalog.listDatabases()
実行結果は以下の感じですので、構築したHive Metastoreに接続できていると思います。
>>> database_name = "gold_forex"
>>> spark.catalog.listDatabases()
[Database(name='default', description='Default Hive database', locationUri='file:/apps/delta_lake/warehouse')]
>>> spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
DataFrame[]
>>> spark.catalog.listDatabases()
[Database(name='default', description='Default Hive database', locationUri='file:/apps/delta_lake/warehouse'), 
Database(name='gold_forex', description='', locationUri='gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db')]
>>> spark.sql(f"USE {database_name}")
DataFrame[]
GCSにgold_forex.dbがあるか確認したいと思います。
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil ls gs://forex_data_20211229/delta_lake/warehouse/
gs://forex_data_20211229/delta_lake/warehouse/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/

GCS上のDelta Lakeにデータを永続化できるか確認する

以下のコマンドで上記に作ったデータフレームから新しいカラムを計算したり、カラム名変更したり、データ型をキャストしたりしてからDelta Lakeに永続化したいと思います。
>>> commodity = "USDJPY"
>>> # volumeカラムは使わないのでforex_pairに変更。
>>> df = df.withColumn("volume", lit(commodity)).withColumnRenamed("volume", "forex_pair")
>>> # 検索時間を短縮する目的でレート日付でテーブルのPartitionを作るため、quote_dateカラムを作成する。
>>> df = df.withColumn("business_date", df.tick_timestamp[0:8])
>>> df = df.withColumn("business_date", df.business_date.cast(IntegerType()))
>>> # Midプライスを計算する
>>> df = df.withColumn("mid", (df.bid + df.ask)/2)
>>> df = df.withColumn("mid", df.mid.cast(FloatType()))
>>>
>>> df.show()
+------------------+-------+-------+----------+-------------+----------+        
|    tick_timestamp|    bid|    ask|forex_pair|business_date|       mid|
+------------------+-------+-------+----------+-------------+----------+
|20210901 000000866|110.177|110.181|    USDJPY|     20210901|   110.179|
|20210901 000001581|110.177|110.181|    USDJPY|     20210901|   110.179|
|20210901 000003278|110.178|110.182|    USDJPY|     20210901|    110.18|
|20210901 000003429|110.179|110.182|    USDJPY|     20210901|  110.1805|
|20210901 000006023|110.179|110.184|    USDJPY|     20210901|  110.1815|
|20210901 000006125| 110.18|110.183|    USDJPY|     20210901|  110.1815|
|20210901 000008995|110.179|110.183|    USDJPY|     20210901|   110.181|
|20210901 000018162|110.178|110.182|    USDJPY|     20210901|    110.18|
|20210901 000034261|110.178|110.182|    USDJPY|     20210901|    110.18|
|20210901 000035889|110.177|110.181|    USDJPY|     20210901|   110.179|
|20210901 000045068|110.176|110.178|    USDJPY|     20210901|   110.177|
|20210901 000045169|110.173|110.177|    USDJPY|     20210901|   110.175|
|20210901 000045663|110.172|110.178|    USDJPY|     20210901|   110.175|
|20210901 000054698|110.175|110.177|    USDJPY|     20210901|   110.176|
|20210901 000054749|110.175|110.179|    USDJPY|     20210901|   110.177|
|20210901 000102208|110.175|110.177|    USDJPY|     20210901|   110.176|
|20210901 000102360|110.172|110.177|    USDJPY|     20210901|  110.1745|
|20210901 000102461|110.172|110.176|    USDJPY|     20210901|110.173996|
|20210901 000102564|110.169|110.176|    USDJPY|     20210901|  110.1725|
|20210901 000104822|110.171|110.175|    USDJPY|     20210901|110.173004|
+------------------+-------+-------+----------+-------------+----------+
only showing top 20 rows

>>> table_name = "commodity_rates"
>>> table_location = f"gs://forex_data_20211229/delta_lake/warehouse/{database_name}.db/{table_name}"
>>> checkpoint_location = table_location + "/_checkpoints/streaming_ckp"
>>> partitions = ["business_date", "forex_pair"]
>>> df.write \
...         .format("delta") \
...         .partitionBy(partitions) \
...         .option("mergeSchema", "true") \
...         .option("overwriteSchema", "true")  \
...         .option("checkpointLocation", checkpoint_location) \
...         .mode('overwrite') \
...         .saveAsTable(table_name)
                                    
無事に永続化できたようですので、gsutilコマンドでGCS上のデータレイクを確認しましょう。
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil ls gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/_delta_log/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210901/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210902/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210903/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210905/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210906/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210907/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210908/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210909/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210910/
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil ls gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210901/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210901/forex_pair=USDJPY/
tranduc_quy@cloudshell:~ (evident-time-3366xx)$ gsutil ls gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210901/forex_pair=USDJPY/
gs://forex_data_20211229/delta_lake/warehouse/gold_forex.db/commodity_rates/business_date=20210901/forex_pair=USDJPY/part-00000-240228c7-2b28-444a-990b-0c69cb06de23.c000.snappy.parquet
問題なくGCS上のDelta Lakeにデータを格納されました。良さそうですね。

まとめ

長くなりましたが、今回はDataProcなどを使わずにGCP上にデータ解析基盤を構築してみました。GKE上にpySpark on K8sクラスターを構築する前に自社のConoha VPS上に同様のK8sクラスターを構築したが、その時はVPSの初期化設定やセキュリティ周りの設定は自分で対応する必要がありました。GCP上にクラスター構築はこの作業が発生しないので大分楽になると感じました。Docker Imageは自分でビルドして使うのでSparkバージョンを変更することが可能ですが、データ基盤のStorageレイヤーがGCSを使いまして、さらにGCS上にデータレイクレイヤーはDelta Lakeを構築したので、Sparkバージョンの選択時にDelta Lakeが対応しているバージョンかどうかを考慮したほうが良さそうですね。検証の最初の段階にGCSにうまく接続できなくて色々調査してみたら各製品のJarバージョンが違うので上手く連携できないということもわかりましたので、やはり自分で環境構築にこれらの問題が発生しやすいかと感じました。
最後、データレイクのMetastoreはMySQLで構築して使いました。使い勝手は良さそうですが、Metastoreのバージョンやテーブル設計に色々な細かい問題があったので調査時間が発生しました。最近、GoogleはDataProcやHive Metastoreといったビックデータ解析向けのサービスを公開しているので、自分で構築しなくても良いですが、コスト削減などで自分で構築して運用するケースもありますね。

最後に

次世代システム研究室では、ビッグデータ解析プラットホームの設計・開発を行うアーキテクトとデータサイエンティストを募集しています。次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事