2021.07.07

spark mllib on kubernetesで機械学習

こんにちは。データ解析グループ・ETLチームのY.S.です。今回はspark on kubernetesで機械学習してみます。

1.概要

機械学習のチュートリアル的問題であるtitanicを、sparkの機械学習ライブラリであるmllib + kubernetesでやってみます。
titanicはタイタニック号の乗客データを学習させ、その乗客が生存したか死亡したかを予測する分類問題です。kaggleのチュートリアルで取り上げられている問題で、耳にしたことや取り組んだことがある方も多いのではないでしょうか。
今回は実行環境としてspark on kubernetesを使用し、pysparkのコードをkubernetesクラスタにsubmitし複数podを立ち上げて計算します。spark on kubernetesや環境についての詳細は、こちらの記事をご参照ください。

2.mllib

pysparkの機械学習ライブラリです。mllibでは、前処理やモデルの学習といったデータの流れをパイプラインとして定義します。このパイプラインコンセプトはscikit-learn projectにインスパイアされているらしいです。
パイプラインのコンポーネントは、transformersとestimatorsの2種類に分類できます。パイプラインはTokenizerやHashingTFのようにデータ変換を行うtransformerと、LogisticRegressionのように機械学習アルゴリズムであるestimatorで構成されています。
画像
パイプラインに対してmodel = pipeline.fit(train_df)を呼ぶことで、train_dfで学習済みのestimatorを含むmodelインスタンスを得ることができます。modelの中ではLogisticRegressionはLogisticRegressionModelというtransformerとなっていて、predictions = model.transform(test_df)を呼ぶことでtest_dfに対する予測であるpredictionを得ることができます。
画像

3.準備

Spark on kubernetessで機械学習する際のポイントとして、まずどのようにpodにデータを共有させるかということがあります。

今回はdocker imageにcsvファイルを含めてしまうことにしました。より実運用チックにやるならPersistent Volume Claimで実行ホストのローカルディレクトをマウントしたり、HDFSを使用したりするのがいいと思います。(最初は前者を試してみましたが、ファイルがディレクトリとしてマウントされてしまい(?)、上手くいきませんでした。後者に関してHDFSの環境がまだできていないので断念しました。)

Dockerfileと同じ階層にdataディレクトリを作成し、kaggleのページからダウンロードしてきたtrain/testデータのcsvファイルを配置します。
DockerfileではCOPYコマンドでdataディレクトリをimageにコピーする記述をします。このimageをspark-submit時に指定すれば、複数起動したpod間でデータを共有できます。

ARG base_img

FROM $base_img
WORKDIR /

# Reset to root to run installation tasks
USER 0

RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
    apt install -y python3 python3-pip && \
    pip3 install --upgrade pip setuptools && \
    pip3 install numpy && \
    # Removed the .cache to save space
    rm -r /root/.cache && rm -rf /var/cache/apt/*

COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib
COPY ./data/ /tmp/sparkml

WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]

# Specify the User that the actual main process will run as
ARG spark_uid=185
USER ${spark_uid}

4.titanicをmllibで解く

4-1.コード

pyspark.mllibでパイプラインを構築していきます。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

spark = SparkSession \
        .builder \
        .appName("titanic") \
        .getOrCreate()

# Load training datasets.
schema = StructType([
  StructField("PassengerId", IntegerType(), True),
  StructField("Survived", IntegerType(), True),
  StructField("Pclass", IntegerType(), True),
  StructField("Name", StringType(), True),
  StructField("Sex", StringType(), True),
  StructField("Age", IntegerType(), True),
  StructField("SibSp", IntegerType(), True),
  StructField("Parch", IntegerType(), True),
  StructField("Ticket", StringType(), True),
  StructField("Fare", DoubleType(), True),
  StructField("Cabin", StringType(), True),
  StructField("Embarked", StringType(), True)
])

train_df = spark.read.format("csv").options(header="true").load("/tmp/sparkml/titanic/train.csv", schema=schema)
train_df = train_df.withColumn("label", train_df.Survived)

# Configure an ML pipeline.
assembler = VectorAssembler(
    inputCols=["Pclass", "Age"],
    outputCol="features")
assembler.setHandleInvalid("skip")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[assembler, lr])

# Fit the pipeline to training datasets.
model = pipeline.fit(train_df)

# Prepare test datasets, which are unlabeled.

test_df = 
spark.read.options(header="true", inferSchema="true").load("/tmp/sparkml/titanic/test.csv", format="csv")

# Make predictions on test data and print columns of interest.
prediction = model.transform(test_df)

selected = prediction.select("features", "probability", "prediction")
for row in selected.collect():
    features, prob, prediction = row
    print("(%s) --> prob=%s, prediction=%f" % (features, str(prob), prediction))

spark.stop()

まずいつものようにspark sessionを作ります(10行目)。ちなみに最後にちゃんとspark.stop()をしないとpodがいつまでも停止しません。
31行目でtrain.csvをロードします。今回はカラム名とtypeがが分かっているので、schemaをStructTypeとして定義しておいてloadの際に指定します。ちなみに47行目でtest.csvをロードする時のようにinfer_schema=Trueにすると型を察してくれますが、そのために一回データの走査が入ります。なので予めSchema指定できるならそうした方が速いです。

パイプライン上のMLモデルは、前段から流れてきたデータフレームのfeaturesカラムを特徴、labelカラムをground truthとして学習します。今回はSurvivedカラムがground truthになるので、このカラムをwithColumnメソッドを使ってlabelカラムとしてコピーしておきます。

35行目からパイプラインを定義します。今回は特にデータ変換は行わず、VectorAssemblerで数種類の数値型のカラムをまとめてfeaturesカラムとします。また、nullが入っている列があるとパイプライン実行時にエラーになるので、.setHandleInvalid(“skip”)オプションを付けてそういう列をdropします。
今回はestimatorとしてLogisticRegressionを使用します。学習回数や正則化のパラメータ等を指定します。
パイプラインに含めるコンポーネントのインスタンスを作成し終わったら、pipeline = Pipeline(stages=[assembler, lr])でパイプラインを作成します。

作成したパイプラインに対してmodel = pipeline.fit(train_df)でtrain_dfで学習したpipelineModelを得ます。pipelineModelに対してpredict = model.transrate(test_df)で予測結果を得ることができます。

得られたpredictのデータフレームには、確信度のprobabilityカラムと予測結果のpredictionカラムが含まれています。selectでそれらを抽出して表示したりできます。

4-2.実行

Titanic.pyをhttpサーバーでホストして、下記のコマンドでkubernetesクラスターにspark-submitします。

sudo /usr/local/spark/bin/spark-submit --master k8s://https://192.168.1.14:6443 --deploy-mode cluster --conf spark.executor.instances=3 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.container.image=quytd/spark-py:v3.1.2 --name titanic  http://192.168.1.14:30001/Titanic.py

実行ログはkubectl logsで確認できます。中には予測結果や確信度が出力されています。titanicのテストデータには正解ラベルが無いのでモデルの精度は分かりませんが、ラベルがあるのであればpyspark.ml.evaluationを使ってmetricsを計算することもできます。

([3.0,34.5]) --> prob=[0.8305983586067774,0.16940164139322256], prediction=0.000000
([3.0,47.0]) --> prob=[0.891814918443835,0.10818508155616502], prediction=0.000000
([2.0,62.0]) --> prob=[0.8165997450615972,0.18340025493840284], prediction=0.000000
([3.0,27.0]) --> prob=[0.782134552401779,0.21786544759822102], prediction=0.000000
([3.0,22.0]) --> prob=[0.7446601667233493,0.25533983327665066], prediction=0.000000
([3.0,14.0]) --> prob=[0.6765203274802152,0.3234796725197848], prediction=0.000000
([3.0,30.0]) --> prob=[0.8026340133255375,0.19736598667446248], prediction=0.000000
([2.0,26.0]) --> prob=[0.4992998234646864,0.5007001765353136], prediction=1.000000
([3.0,18.0]) --> prob=[0.711787103152333,0.28821289684766704], prediction=0.000000
([3.0,21.0]) --> prob=[0.7366772340900755,0.26332276590992454], prediction=0.000000
([1.0,46.0]) --> prob=[0.39869124812376505,0.601308751876235], prediction=1.000000
([1.0,23.0]) --> prob=[0.20312485912644834,0.7968751408735517], prediction=1.000000
([2.0,63.0]) --> prob=[0.8227427367101282,0.1772572632898718], prediction=0.000000
([1.0,47.0]) --> prob=[0.40869617764833116,0.5913038223516689], prediction=1.000000
([2.0,24.0]) --> prob=[0.47853138050480953,0.5214686194951905], prediction=1.000000
([2.0,35.0]) --> prob=[0.5917655775165058,0.4082344224834942], prediction=0.000000
([3.0,21.0]) --> prob=[0.7366772340900755,0.26332276590992454], prediction=0.000000
([3.0,27.0]) --> prob=[0.782134552401779,0.21786544759822102], prediction=0.000000
([3.0,45.0]) --> prob=[0.8835296936231415,0.11647030637685851], prediction=0.000000
([1.0,55.0]) --> prob=[0.4907881615871509,0.5092118384128491], prediction=1.000000

最後に

次世代システム研究室では,データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務など次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら,ぜひ募集職種一覧からご応募をお願いします。皆さんのご応募をお待ちしています。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事