2021.10.02

KubernetesベースのSparkクラスターを検証するーアッドホック分析環境構築編

こんにちは。次世代システム研究室のT.D.Qです。
前回のブログよりConoha VPSで構築したSpark on Kubernetesクラスター既存HDFSクラスターと連携してpySparkバッチ実行環境構築について書きました。今回の記事でこのKubernetesクラスター上にDelta Lakeを対応するPySparkのアッドホック分析環境構築及び検証内容について紹介したいと思います。

アッドホック分析の実行環境構築

アドホック分析とは、データ調査、集計、分析やビックデータ処理の際に用いられる分析手法であり、定期的に行われるデータ分析や、項目も内容も決まっているデータ分析と違って、その都度単発的に行われるというのが特徴です。

今回やりたいこと

今回構築するアッドホック分析環境で実現したいことは以下の4項目です。
  1. 構築したKubernetesクラスター上にpySparkアッドホック分析環境を構築したい
  2. NotebookからSpark Sessionを起動時にリソースを調整できるようにしたい
  3. 構築したHDFSクラスター・Delta Lakeから抽出・永続化できるようにしたい
  4. ビッグデータを対応するKoalas DataFrameを軽く検証したい

アッドホック分析環境Dockerfileの実装

今回アッドホック分析のpySpark実行環境は必要なライブラリを用意する必要があり、Kubernetesクラスターで稼働するため、専用のDockerfileを実装する必要があります。
HDFSクラスターとの連携、Deltalakeとの連携を対応するため、今回は以下のイメージでDocker Imageを構築していきたいと思います。

今回実装するDockerfileは基本的に前回のDocker Imageの上にJupyterlab及び関連するプラグインをインストールすることになりますので、これから変更箇所のみ説明します。

Jupyterlabのインストール

今回、Kubernetesクラスター上にアッドホック分析のGUI兼IDEはデータ分析者に愛されているJupyter Labで構築したいと思います。
既存Dockerfileに以下のコマンドを追加することでjupyter Labをインストールすることができます。
# Install jupyterlab & extentions
RUN pip3 install \
    notebook==6.2.0 \
    ipynb==0.5.1 \ 
    jupyterlab==3.1.12
次にデータ解析時に作ったNotebookの保存場所(今回は/home/spark/notebookにしました)を設定します。また、Docker Containerを起動時にJupyterlabも起動するためDockerfileの後尾に以下のコマンドを追加しましょう。
USER ${spark_uid}

VOLUME /home/spark/notebook

CMD jupyter lab --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --notebook-dir=/home/spark/notebook 

WORKDIR /opt/spark/work-dir
ここでJupyer Labのインストールは基本的に完了しました。

Spark Monitorプラグインの導入

Sparkジョブを実行時に実行状況を確認するため、Spark UIがよく使われていますが、Kubernetes上にSpark History UIでジョブの実行状況の確認に少し複雑です。
自分がSpark Historyを確認方法について調査した時にJupyterlabならSpark Monitorという無料プラグインを見つけました。このプラグインを使うことで以下のイメージのようにJupyter lab上に直接Spark UI画面を見ることができるし、コマンド実行するときに実行状況がわかりやすく可視化してくれるのでとても便利です。 Spark monitor plugin Spark MonitorをインストールするためDockerfileに以下のコマンドを追加しました。
RUN pip3 install jupyterlab-sparkmonitor
JupyterlabのExtensionを使うため、Nodejsをインストールする必要があるので、以下のコマンドをDockerfileに追加しました。
# Install Node.js
RUN curl -sL https://deb.nodesource.com/setup_14.x | bash
RUN apt-get install --yes nodejs
RUN node -v
RUN npm -v
RUN npm i -g nodemon
RUN nodemon -v
次に、Spark monitorプラグインの手順書の通りにipythonの設定を行います。
# run with root permission
USER 0
RUN ln -s /usr/local/lib/python3.8/site-packages/sparkmonitor/listener_2.12.jar /opt/spark/jars/listener_2.12.jar \
    && chown -R spark:hadoop /opt/spark/jars \
    && mkdir /home/spark/notebook && chown -R spark:hadoop /home/spark/notebook

# Switch to spark user
USER ${spark_uid}
ENV PATH=/home/spark/.local/bin:$PATH
RUN ipython profile create && \
echo "c.InteractiveShellApp.extensions.append('sparkmonitor.kernelextension')" >>  /home/spark/.ipython/profile_default/ipython_config.py
ここまで一旦Spark monitorプラグインの設定が完了しました。

Delta lakeと連携するための設定

今回のDeltalakeはHive Metastoreを使うので、連携するためのhive-site.xmlファイルをSPARK_HOME直下のconfディレクトリに配置する必要があります。
また、Hive Metastoreに接続するためJDBCを事前準備しました。
pip3 install delta-spark
COPY kubernetes/dockerfiles/spark8s/hadoop-3.2.2/etc/hadoop/hive-site.xml /opt/spark/conf/hive-site.xml
# Install Postgresql JDBC for Hive metastore
WORKDIR /opt/spark/jars
RUN curl https://jdbc.postgresql.org/download/postgresql-42.2.23.jar -o postgresql-42.2.23.jar
RUN chmod +x /opt/spark/jars/postgresql-42.2.23.jar

その他:Koalasのインストール

KoalasがPySparkのDataFrame APIを拡張してpandasと互換性を持たせる新しいオープンソースプロジェクトです。PandasはPython データサイエンスの標準表現ですが、単一のマシンで扱えるスモールデータ用に設計されています。なので、膨大なデータセットを扱うためにSparkを活用するには、PySparkに移行するか、pandasで扱えるようにデータをダウンサンプリングしなければなりません。Koalasを使用することで、データサイエンティストは新しいフレームワークを覚えなくとも、単一のマシンから分散環境へ移行することが可能です。Spark Dataset/DataFrameと呼ばれるPandasのDataFrameに近い概念が存在しますが各種APIが異なるためpandasとSpark Dataset/DataFrame間でオブジェクト変換した際に混乱します。それを解決するアプローチがKoalasになります。
Koalasはpipから直接インストールできますのでDockerfileに以下のコマンドを追加しました。
RUN pip3 install koalas plotly
ENV PYARROW_IGNORE_TIMEZONE 1
Koalasデータフレームのデータ可視化にデフォルトはplotlyを使うのでplotlyをインストールしておきました。
また、Koalasはpyarrowを使っていて、PYARROW_IGNORE_TIMEZONE環境変数を1にしないと怒られるので設定しておきました。

Docker ImageをビルドしてDockerHubにPush

ここまで必要なライブラリのインストールや設定をDockerfileに実装しましたのでKubernetesクラスターに展開するためDocker ImageをビルドしてDocker RepositoryにPushしておきたいと思います。
k8sadmin@quytd-spark-tool:~$ cd /usr/local/spark
k8sadmin@quytd-spark-tool:~$ sudo ./bin/docker-image-tool.sh -n -r quytd -t pyspark-3.1.1-hadoop-3.2.0-jupyterhub -p /usr/local/spark/kubernetes/dockerfiles/jupyterhub/Dockerfile build
k8sadmin@quytd-spark-tool:~$ sudo bin/docker-image-tool.sh -r quytd -t pyspark-3.1.1-hadoop-3.2.0-jupyterhub push

アッドホック分析環境のConfigファイル実装

アッドホック分析環境のDocker Imageを準備できましたので、以下の内容で「jupyterlab-config.yaml」ファイルを作成します。
内容がやや長いですが主にjupyterというサービスアカウントを生成して、sparkの命名空間に管理権限を設定します。次にクラスターの外からアクセスするためjupyterというServiceを定義します。このサービスはJupyter Labの標準ポート番号が「8888」を定義します。また、headless(固定IPが不要)のServiceがSpark driverとして使うのでSpark executorからアクセスするためポート番号を「2222」、Sparkのblock managerのポート番号を「7777」として明確に定義しました。最後に、データ分析時に作ったNotebookがクラスターが壊れてもちゃんと残るためにStatefulSetとして定義しました。
apiVersion: v1
kind: ServiceAccount
metadata:
  name: jupyter
  labels:
    release: jupyter
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: jupyter
  labels:
    release: jupyter
  namespace: spark
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - create
  - get
  - delete
  - list
  - watch
- apiGroups:
  - ""
  resources:
  - services
  verbs:
  - get
  - create
- apiGroups:
  - ""
  resources:
  - pods/log
  verbs:
  - get
  - list
- apiGroups:
  - ""
  resources:
  - pods/exec
  verbs:
  - create
  - get
- apiGroups:
  - ""
  resources:
  - configmaps
  verbs:
  - get
  - create
  - list
  - watch
  - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: jupyter
  labels:
    release: jupyter
  namespace: spark
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: jupyter
subjects:
- kind: ServiceAccount
  name: jupyter
  namespace: spark
---
apiVersion: v1
kind: Service
metadata:
  name: jupyter
  labels:
    release: jupyter
spec:
  type: ClusterIP
  selector:
    release: jupyter
  ports:
  - name: http
    port: 8888
    protocol: TCP
  - name: blockmanager
    port: 7777
    protocol: TCP
  - name: driver
    port: 2222
    protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: jupyter-headless
  labels:
    release: jupyter
spec:
  type: ClusterIP
  clusterIP: None
  publishNotReadyAddresses: false
  selector:
    release: jupyter
  ports:
  - name: http
    port: 8888
    protocol: TCP
  - name: blockmanager
    port: 7777
    protocol: TCP
  - name: driver
    port: 2222
    protocol: TCP
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: jupyter
  labels:
    release: jupyter
spec:
  replicas:
  updateStrategy:
    type: RollingUpdate
  serviceName: jupyter-headless
  podManagementPolicy: Parallel
  volumeClaimTemplates:
  - metadata:
      name: notebook-data
      labels:
        release: jupyter
    spec:
      accessModes:
      - ReadWriteOnce
      # volumeMode: Filesystem
      resources:
        requests:
          storage: 1Gi
      storageClassName: local-storage
  selector:
    matchLabels:
      release: jupyter
  template:
    metadata:
      labels:
        release: jupyter
      annotations:
    spec:
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      serviceAccountName: jupyter
      dnsConfig:
        options:
        - name: ndots
          value: "1"
      containers:
      - name: jupyter
        image: "quytd/spark-py:pyspark-3.1.1-hadoop-3.2.0-deltalake-jupyterhub"
        imagePullPolicy: IfNotPresent
        ports:
        - name: http
          containerPort: 8888
          protocol: TCP
        - name: blockmanager
          containerPort: 7777
          protocol: TCP
        - name: driver
          containerPort: 2222
          protocol: TCP
        volumeMounts:
        - name: notebook-data
          mountPath: /home/spark/notebook
        resources:
          limits:
            cpu: 8000m
            memory: 16Gi
          requests:
            cpu: 1000m
            memory: 1Gi

Notebookファイルを保持するため永続ボリュームを作成しておく

Kubernetesに永続ボリュームを作成するためPersistentVolume (PV)とPersistentVolumeClaim (PVC)を定義する必要があります。
まずは以下の定義でクラスターのストレージの一部を使ってNotebook専用のPVを定義しました。特定ノードのLocal Storageを使いたいのでストレージクラスはlocal-storageにしました。
ファイル名が「local_persistent_volume_jupyterhub_pyspark_slave2.yaml」にしました。
apiVersion: v1
kind: PersistentVolume
metadata:
  name: notebook-data-jupyter-0
spec:
  capacity:
    storage: 1Gi
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: /mnt/data/jupyterhub_pyspark
  nodeAffinity:
    required:
      nodeSelectorTerms:
      - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
          - quytd-spark-s2
次にPersistentVolumeClaim (PVC)を以下のように定義しました。
ファイル名が「local_persistent_volume_claim_jupyterhub_pyspark_s2.yaml」にしました。少し長いですが。
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: notebook-data-jupyter-0
spec:
  accessModes:
  - ReadWriteOnce
  storageClassName: local-storage
  resources:
    requests:
      storage: 512Mi
LensのConsoleを使って上記のファイルの内容を適用しましょう。
jupyterhub-pyspark % kubectl apply -f volume-mount/local_persistent_volume_jupyterhub_pyspark_slave2.yaml 
persistentvolume/notebook-data-jupyter-0 created
upyterhub-pyspark % kubectl apply -f volume-mount/local_persistent_volume_claim_jupyterhub_pyspark_s2.yaml 
persistentvolumeclaim/notebook-data-jupyter-0 created

PySparkのアッドホック分析環境を立ち上がる

jupyterhub-pyspark % kubectl create ns spark
namespace/spark created
jupyterhub-pyspark % kubectl apply -n spark -f jupyterhub-config.yaml
serviceaccount/jupyter created
role.rbac.authorization.k8s.io/jupyter created
rolebinding.rbac.authorization.k8s.io/jupyter created
service/jupyter created
service/jupyter-headless created
statefulset.apps/jupyter created
Lensで起動したServiceが問題なく稼働していること確認しました。

クラスターの外から起動したJupyterlabにアクセスするため、まずport-forwardをしましょう。
jupyterhub-pyspark % kubectl port-forward -n spark service/jupyter 8888:8888
Forwarding from 127.0.0.1:8888 -> 8888
Forwarding from [::1]:8888 -> 8888
jupyterのServiceIPを確認できましたが、今回はインターネットから直接アクセスできないように設定しましたので、Local PCからそのIPに接続するためPort-forwardを設定することでアクセスできまるようになりました。

HDFS及びDelta Lakeと連携の検証

起動したJupyterlabにアクセスできましたので早速pySparkでHDFSやDeltalakeに連携できるか検証を行いましょう。

SparkConfigでアッドホック分析Notebookのリソースを設定する

NotebookからSparkのリソースを定義することでSpark Sessionを起動する時にKubernetesクラスターにリソースを要求できます。
今回の検証はデータ量がやや多いので以下の内容でSparkConfigを定義しました。
・ pySpark及び既存HDFSクラスター、Deltalakeと連携するので、構築したDocker Imageを使う
・ Spark executorのインスタンス数が8個で、1個にメモリ2GB、CPU 1 coreを設定
・ Sparkの実行状況に応じてリソースが動的に調整することを可能にする
・ Deltalakeを対応する
・ Spark monitorを有効にする
・ Koalasを使うので必要な設定を行う

Spark Driverの設定

まず構築したDocker Imageを使うため、configに以下の内容で指定しました。
config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "quytd/spark-py:pyspark-3.1.1-hadoop-3.2.0-jupyterhub"
}
Spark Driverの設定になりますが、Spark executorのPodからアクセスするため、IPとポートを明確に指定する必要があります。
driver.hostがjupyterのServiceを起動したときのIPアドレスまたはホスト名を指定します。上記のconfigに以下の内容を追加しましょう。
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "10.233.xx.xx",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.ui.port": "4040",
    "spark.network.timeout": "240"

Spark Executorのインスタンス数とリソース設定

今回のデータ処理のニーズでSpark executorのインスタンス数が6個で、1個にメモリ2GB、CPU 1 coreを調整したいので、configに以下の内容を追加しました。
    "spark.executor.instances": "6",
    "spark.executor.memory": "2g",
    "spark.executor.cores": "1",

Dynamic Resource Allocation設定

Sparkの実行状況に応じてリソースが6個以上に動的に増加することを可能にするため、SparkのDynamic Allocation機能を利用します。
今回はexecutorの最大数が12個まで制限したいので以下の設定を追加しました。
"spark.dynamicAllocation.executorIdleTimeout":"60s",
"spark.dynamicAllocation.executorAllocationRatio": "0.2",
"spark.dynamicAllocation.schedulerBacklogTimeout":"1s",
"spark.dynamicAllocation.shuffleTracking.timeout": "10s" ,
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "6",
"spark.dynamicAllocation.maxExecutors": "12",

Delta Lakeと連携するための設定

Hive Catalogを対応するDeltalakeを使うため、以下の設定内容をconfigに追加しました。
"spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.delta.logStore.class":"org.apache.spark.sql.delta.storage.HDFSLogStore",
"spark.sql.catalogImplementation": "hive"

その他の設定

次の1行〜2行目はSpark Monitorプラグインの設定になります。最後の3行目はKoalasがArrowを使えるようになるための設定です。
"spark.driver.extraClassPath":"/usr/local/lib/python3.9/dist-packages/sparkmonitor/listener.jar",
"spark.extraListeners": "sparkmonitor.listener.JupyterSparkMonitorListener",
"spark.sql.execution.arrow.pyspark.enabled": True

SparkSessionを起動

最終的にSparkConfigの内容は以下のようになります。このConfig内容でSparkSessionを生成しましょう。
config = {
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "quytd/spark-py:pyspark-3.1.1-hadoop-3.2.0-jupyterhub",
    "spark.executor.instances": "6",
    "spark.executor.memory": "2g",
    "spark.executor.cores": "1",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "10.233.xx.xx",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.ui.port": "4040",
    "spark.network.timeout": "240",
    "spark.dynamicAllocation.executorIdleTimeout":"60s",
    "spark.dynamicAllocation.executorAllocationRatio": "0.2",
    "spark.dynamicAllocation.schedulerBacklogTimeout":"1s",
    "spark.dynamicAllocation.shuffleTracking.timeout": "10s" ,
    "spark.dynamicAllocation.shuffleTracking.enabled": "true",
    "spark.dynamicAllocation.minExecutors": "6",
    "spark.dynamicAllocation.maxExecutors": "12",
    "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.delta.logStore.class":"org.apache.spark.sql.delta.storage.HDFSLogStore",
    "spark.sql.catalogImplementation": "hive",
        "spark.driver.extraClassPath":"/usr/local/lib/python3.9/dist-packages/sparkmonitor/listener.jar",
    "spark.extraListeners": "sparkmonitor.listener.JupyterSparkMonitorListener",
    "spark.sql.execution.arrow.pyspark.enabled": True
}
上記のConfigをSparkConfigに適用しましょう。SparkConfにKubernetesクラスターのIP:portを指定する必要があります。
def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://192.168.1.14:6443")
    for key, value in config.items():
        conf.set(key, value)    
    builder = SparkSession.builder.appName(app_name).config(conf=conf)
    return configure_spark_with_delta_pip(builder).getOrCreate()

spark_config = SparkConf()

spark = get_spark_session("my_deltalake_pyspark_app", spark_config)
上記のコマンドを実行するとSparkSessionのインスタンスが生成されます。


HDFSに準備したForexデータの確認

今回の検証データはHistData.comから各クロス円通貨ペアの直近3年間のCSVデータをダウンロードしてHDFSに事前に格納しました。Jupyterlabからこのデータを閲覧できるか確認したいと思います。
!hdfs dfs -ls /tmp/spark8s/HISTDATA/

ご覧の通り、HDFS上にフォルダー一覧を見ることができました。データサイズは38.7GB前後です。無事にHDFSクラスターと連携できることを確認できました。

Delta lakeに新しいスキーマを作成

次にDelta lakeとの連携について検証したいと思います。Delta lakeに既にあるスキーマ一覧をリストするコマンドを実施します。

スキーマ一覧を見ることができましたので、Delta lakeとの連携ができるようになっていますね。
次に、以下の関数で検証用のスキーマを作成してデータの永続化を検証したいと思います。
    database_name = "gold_forex"
    spark.sql(f"DROP DATABASE IF EXISTS {database_name}")
    spark.catalog.listDatabases()
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    spark.catalog.listDatabases()
    spark.sql(f"USE {database_name}")

ForexデータをDelta Lakeに取り込む

まず、以下の関数でSpark Dataframeに用意した銘柄別のCSVファイルを一括ロードして、その後に適切なクラム名変更やデータ型変更などを実施しました。
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType,StructField, StringType, FloatType, IntegerType
 
def get_df_from_csv_paths(commodity):
        file_path="/tmp/spark8s/HISTDATA/{0}/*.csv".format(commodity)
         
        read_schema = StructType([ \
            StructField("tick_timestamp",StringType(),True), \
            StructField("bid",FloatType(),True), \
            StructField("ask",FloatType(),True), \
            StructField("volume", StringType(), True)
        ])
         
        df = spark.read.format("csv"). \
            option("header", "false"). \
            option('delimiter', ','). \
            option('mode', 'DROPMALFORMED'). \
            schema(read_schema). \
            load(file_path)
         
        # volumeカラムは使わないのでforex_pairに変更。
        new_df = df.withColumn("volume", lit(commodity)).withColumnRenamed("volume", "forex_pair")
        # 検索時間を短縮する目的でレート日付でテーブルのPartitionを作るため、quote_dateカラムを作成する。
        new_df = new_df.withColumn("business_date", new_df.tick_timestamp[0:7])
        new_df = new_df.withColumn("business_date", new_df.business_date.cast(IntegerType()))
        # Midプライスを計算する
        new_df = new_df.withColumn("mid", (new_df.bid + new_df.ask)/2)
        new_df = new_df.withColumn("mid", new_df.mid.cast(FloatType()))
        

        return new_df
次は、今回構築したアッドホック環境はデータサイズの大きいDataFrameを対応できるか確認するため、以下の関数で今回用意した通貨ペア全部のデータをSpark Dataframeのunionメソッドで巨大なデータフレームにしておきます。
def load_all_rates_data():
    forex_pairs=["audjpy","cadjpy","chfjpy","eurjpy","gbpjpy","nzdjpy","sgdjpy","usdjpy"]
    all_quote_df = None
    for forex_pair in forex_pairs:
        quote_df=get_df_from_csv_paths(forex_pair)
        if all_quote_df is None:
            all_quote_df = quote_df
        else:
            all_quote_df = all_quote_df.union(quote_df)
    return all_quote_df
以下は連携したDelta lakeにデータを永続化できるか確認するための関数です。
def save_rates_data(df, table_name, table_location, partitions, checkpoint_location):
    df.write \
        .format("delta") \
        .partitionBy(partitions) \
        .option("mergeSchema", "true") \
        .option("overwriteSchema", "true")  \
        .option("checkpointLocation", checkpoint_location) \
        .mode('overwrite') \
        .saveAsTable(table_name)
上記でデータ操作関数を用意で来ましたので、早速HDFSとDelta lakeと連携できるか検証を行いましょう!
table_name = "commodity_rates"
table_location = f"/apps/spark/warehouse/{database_name}.db/{table_name}"
partitions = ["business_date", "forex_pair"]
checkpoint_location = table_location + "/_checkpoints/streaming_ckp"

table_schema = StructType([
    StructField("business_date", IntegerType(), False),
    StructField("forex_pair", StringType(), False), 
    StructField("tick_timestamp", StringType(), False),
    StructField("bid", FloatType(), False),
    StructField("ask", FloatType(), False),
    StructField("mid", FloatType(), False)
])

forex_data_df = load_all_rates_data()
save_rates_data(df=forex_data_df, table_name=table_name, table_location=table_location, partitions=partitions, checkpoint_location=checkpoint_location)
用意したCSVデータをSpark Dataframeにロードできました!

約8秒でFXのCSVデータが325,540,745件をロードできました。まあ、SparkはLazy load仕組みを使っているので実際のデータを操作しないと時間がかからないはずですね。
無事にサイズの大きなDataframeを準備できたのでこのDataframeを使ってDelta Lakeに永続化しましょう!ちなみに、テーブル名が「commodity_rates」にしました。

やや件数が多かったのでデータ永続化の実行時間が4分47秒かかりました。チューニングすると実行時間を短縮できると思いますが別の機会で。

KoalasでDelta Lakeからデータを抽出して確認する

最後に、インストールしたKoalas Dataframeを軽く確認しましょう。Notebookに以下のようにkoalasをimportして使います。関連するArrowも設定しておきました。
import databricks.koalas as ks
ks.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimization
直接DeltalakeからKoalas Dataframeにデータをロードできないので、以下のようにSpark Dataframeを経由してデータをロードしました。
kdf_usdjpy_rates = spark.sql(f"SELECT * FROM {table_name} WHERE forex_pair='usdjpy'").to_koalas()
Koalas Dataframe内にtick_timestampカラムをindexカラムとして設定しましょう。
kdf_usdjpy_rates.set_index('tick_timestamp', inplace=True, drop=True)
Dataframeの最初の5件を確認しましょう。Spark Dataframeより綺麗に表示されています。

次に、Pandasと同じ感じでKoalas DataframeからUSDJPYのレートデータを可視化してましょう。

綺麗にPlotlyのグラフが出力してくれました。良さそうですね。記事が長くなりましたのでKoalasの検証はここで以上です。

まとめ

前回HDFS・Delta lakeクラスターと連携してpySparkバッチ実行環境構築を続いて、今回はKubernetesクラスター上にDelta Lakeを対応するPySparkのアッドホック分析環境構築及び検証内容について紹介しました。このアッドホック分析環境でPySparkを用いるデータ解析ライブラリを揃っているのでJupyter Labでデータ収集・データ分析・可視化及び分析結果の永続化も可能になりましたので便利ですね。また、分析データのサイズによってクラスターのリソースを理由に調整できるのと、分析終わったらリソースを解放することで無駄なくクラスターのリソースを効率的に利用することができると思います。最後にビッグデータを対応するKoalas Dataframeを使うことでPandasに慣れたデータサイエンスティストも早くpySparkでデータ分析に集中できると思います。

最後に

次世代システム研究室では、ビッグデータ解析プラットホームの設計・開発を行うアーキテクトとデータサイエンティストを募集しています。次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ 募集職種一覧 からご応募をお願いします。

皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事