Apache Spark をたのしく使う話 (ローカル の Docker で試そう)

前職で Apache Spark で鬼でかい 10 年分の記事データの形態素解析をゴリゴリするっていうことをしたことがあった. 辛い思いで実装したものだったが,それをインターネットに残してなかったのはもったいないな,という思いになった. なので,記憶のあるうちに残しておこうと思った.

しかしながら,Spark の環境自体は自分ではない人に作ってもらってて, それに乗っかっただけだったので,インフラノウハウはなくて, Spark を支える技術,もよく知らないという状態にある.

現実的には AWS におまかせするということで,Amazon EMR とか AWS Glue でやるのがいいだろうけど, それもよくわからないということで,まず ローカルの Docker でやってみようと思った.

僕の理解では Spark は複数台でクラスタを作って worker に仕事をさせるのがキモなので, ローカルだとクラスタを作らずに standalone モードで動きそうなので, 本気の巨大データを打ち込んでもパフォーマンスは出ずに恩恵にはあずかれないはず.

しかしながら,お試しならローカルでやると便利.

使う

Jupyter + PySpark な環境が動く Docker イメージが用意されているので,お試しにはこれが楽そう.

hub.docker.com

余談として,Spark 自体は Scala だけど, Python で使えるやつがあってそれが PySpark だという話がある. Scala <-> Python の変換のコストが結構でかいうんぬんみたいな話題もある. (IPC でがんばってるという仕組みになっていたはず)

前職では PySpark を使っていた. なぜかというと用意してくれた人が Python を使っていたからただそれだけ.

さて,使ってみる:

docker run -it -p 8888:8888 jupyter/pyspark-notebook

これを実行すると Terminal に 8888 番にトークンがついた URL が流れてくるので, おもむろにアクセスすると Jupyter のページが出てきて Notebook でコーディングできる簡単環境のできあがり.

概要

さて,Spark 自体だが僕の理解はかなりニワカで聞きかじりで,次のような感じ:

  1. なんらかのデータソースを持ってきて
  2. そのデータ列を適当な単位で小分けにして
  3. 各単位で map したりして操作して
  4. その結果を再度 reduce して集めてきて
  5. 出力するという仕組み.

データソースは CSV だったり MySQL だったりするかもしれないが, 操作をするときは DataFrame というものに抽象化されている. (DataSet とか RDD とかいう単位もある)

クラスタモードだとデータそのもの自体は Hadoophdfs の分散型のストレージ上に小分けされて, それを各クラスタでつつくようになっている.

試す

動くかどうかのテストコードは以下で,サンプルからとってきた:

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()

# do something to prove it works
spark.sql('SELECT "Test" as c1').show()

SparkSession というやつはよくわからないけど Spark 自体のインスタンスみたいなものという認識.

データを扱う

こういうデータを対象にしてみる:

id name gender age
1 サトシ male 10
2 シゲル male 10
3 カスミ female 12

入力と定義

Python で素朴にデータを定義するとこうなる:

from typing import List, Tuple

Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
    (1, 'サトシ', 'male',   10),
    (2, 'シゲル', 'male',   10),
    (3, 'カスミ', 'female', 12),
]

各行の型は Pythontyping でいう Tuple[int, str, str, int] だ.

で,Spark でもスキーマの定義がある:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

これで列のスキーマを定義できる.

これを Spark の DataFrame に変換する:

from pyspark.sql import DataFrame

trainers_df: DataFrame = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)

これで trainers_df という DataFrame ができた.

実際にはデータソースとして CSV とか MySQL とかそういうものから読み込める. (場合により後述する JDBC とか,Hadoop の設定が必要)

これをただダンプするには

trainers_df.show()

すればいい.

すると出力はこうなる:

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  1|サトシ|  male| 10|
|  2|シゲル|  male| 10|
|  3|カスミ|female| 12|
+---+------+------+---+

f:id:mangano-ito:20200319085353p:plain

素朴〜〜〜〜

集計と出力

値をもらうには .collect() すればいい:

result = trainers_df.collect()
print(result)

CSV に書き出すときはこういう雰囲気:

trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

.coalesce(1)パーティションごとの分割されているデータを, 1つのパーティションに coalesce するというもの. こうしないと,分割されたまま CSV 出力される.

Hadoophdfs コマンドをつかって,分割されたものを1つにまとめて取得するという手段もある.

入力同様,他にも S3, MySQL とか Elasticsearch とかいろいろ出力先がある雰囲気.

基本的に遅延評価になっていて,.collect() みたいな操作をしてはじめて評価されるようになっている.

基本

これだけではただ表示しただけでまったく意味がないので適当な操作をしてみる:

trainers_df.createOrReplaceTempView('trainers');

male_trainers_df = spark.sql('''
    SELECT *
    FROM   trainers
    WHERE  gender = 'male'
''')
male_trainers_df.show()

これはこうなる:

id name gender age
1 サトシ male 10
2 シゲル male 10

DataFrame.createOrReplaceTempView(name)DataFrame を, 一時的な SQL の View として登録することができる. これで spark.sql(query) で登録した View を対象に SQL の操作した結果の DF を得ることができる.

こうすれば,全く臆することなく慣れ親しんだ SQL を使って Spark を使うことができて, 心理的障壁も学習コストも低いというマジックになっている.

View に登録しなくても,DataFrame のままコードで記述するという方法もある:

male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')

こっちのほうが使いやすいケースもある.

応用

SQL を使うことができるのだから,基本的な操作では別に問題ないが, たいてい Spark を使いたいケースというのはなにかユーザー定義の操作をしたい状況になっている.

たとえば自分のケースでは記事本文を形態素解析して分かち書きするということで, これは SQL だけでは実現しがたい.

ただ,Python 上であれば比較的簡単な作業で,MeCab がある. MeCab のライブラリを使って形態素解析してやれば何も考えなくても分解されてやってくる.

そういう操作を Spark 上で DataFrame に対して行うにはどうすればいいかというと, UDF (User-Defined Function) を定義するといい.

(※ DataFrame ではなく RDD というものに対しては直接 lambda を適用できるという技がある.が,これはパフォーマンスが悪い)

UDF を定義するには次のような定義を行う:

from pyspark.sql.functions import udf

@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
    return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏')

spark.udf.register('name_with_suffix', name_with_suffix)

UDF となる関数に @udf(ReturnType) デコレーターを適用することで, その関数は UDF として定義できるようになる. それを Spark SQL で使うには spark.udf.register(udf_name, udf) して登録すればいい.

この例としてあげたものは gender に応じて, namegender に応じた suffix をつけるというもの.

ちなみにデコレーターを使わなくても,udf_fn = udf(fn) すれば既存の関数を適用できる.

この UDF を適用してみる:

dearest_trainers = spark.sql('''
    SELECT name_with_suffix(name, gender)
    FROM   trainers
''')
dearest_trainers.show()

結果はこうなる:

name_with_suffix(name, gender)
サトシくん
シゲルくん
カスミさん

こんなもの SQL でも CASE を駆使して書けるというご意見があるが,そのとおり.

やりたいことによっては便利に使える.

UDF

さて,前述した形態素解析して分かち書きするというものだが, これはイメージとしてこのような関数になる (実際には MeCab をカッコよく使う):

import re

# 半角/全角スペースや約物で文字列を分割する
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
    return [
        word
        for word
        in re.split('[  !…]+', text)
        if len(word) > 0
    ]

これを適用するのも同じくそのまま使えばいい:

Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
    (1, 'サトシ', 'male',   10, 'ポケモン ゲット だぜ'),
    (2, 'シゲル', 'male',   10, 'このおれさまが せかいで いちばん! つよいって ことなんだよ!'),
    (3, 'カスミ', 'female', 12, 'わたしの ポリシーはね… みず タイプ ポケモンで せめて せめて …せめまくる ことよ!'),
]

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

trainers_df = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');

wakachi_trainers_df = spark.sql('''
    SELECT id, name, wakachi(comment)
    FROM   trainers
''')
wakachi_trainers_df.show()

ここでポイントになるのは,str を受け取って List[str] として展開するということだ. これを実行してみるとこうなる:

id name wakachi(comment)
1 サトシ [ポケモン, ゲット, だぜ]
2 シゲル [このおれさまが, せかいで, い...
3 カスミ [わたしの, ポリシーはね, みず...

展開されたセルはリストになっている.ので列のなかに更に列がある入れ子状態になっている.

これをそれぞれの str を列として展開したい場合どうすればいいだろうか? 展開する関数を更に適用すればいい:

spark.apache.org

from pyspark.sql.functions import explode

wakachi_trainers_df = spark.sql('''
    SELECT id, name, explode(wakachi(comment))
    FROM   trainers
''')
wakachi_trainers_df.show()

explode という関数があるので, これを適用すれば入れ子になった要素がそれぞれの列として展開される:

id name col
1 サトシ ポケモン
1 サトシ ゲット
1 サトシ だぜ
2 シゲル このおれさまが
2 シゲル せかいで
2 シゲル いちばん
2 シゲル つよいって
2 シゲル ことなんだよ
3 カスミ わたしの
3 カスミ ポリシーはね
3 カスミ みず
3 カスミ タイプ
3 カスミ ポケモン
3 カスミ せめて
3 カスミ せめて
3 カスミ せめまくる
3 カスミ ことよ

ジョイン

さらなるポイントとして DataFrameうしの JOIN ができる. 普通の MySQL とかの JOIN と変わらずに結合につかうカラムを指定して, それをもとに DataFrame を結合する.

Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
    (1, 1, 'ピカチュウ', 99),
    (2, 1, 'リザードン', 99),
    (3, 2, 'イーブイ',   50),
    (4, 3, 'トサキント', 20),
    (5, 3, 'ヒトデマン', 30),
    (6, 3, 'スターミー', 40),
]
pkmns_schema = StructType([
    StructField('id',         IntegerType(), True),
    StructField('trainer_id', IntegerType(), True),
    StructField('name',       StringType(),  True),
    StructField('level',      IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
    spark.sparkContext.parallelize(pkmns),
    pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');

trainer_and_pkmns_df = spark.sql('''
    SELECT     *
    FROM       trainers
    INNER JOIN pkmns
          ON   trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id name gender age comment id trainer_id name level
1 サトシ male 10 ポケモン ゲット だぜ 1 1 ピカチュウ 99
1 サトシ male 10 ポケモン ゲット だぜ 2 1 リザードン 99
3 カスミ female 12 わたしの ポリシーはね… みず タ... 4 3 トサキント 20
3 カスミ female 12 わたしの ポリシーはね… みず タ... 5 3 ヒトデマン 30
3 カスミ female 12 わたしの ポリシーはね… みず タ... 6 3 スターミー 40
2 シゲル male 10 このおれさまが せかいで いちばん... 3 2 イーブイ 50

ちなみに INNER JOIN, OUTER JOIN の他に種類がいっぱいある:

qiita.com

これで集合操作ができるので便利という感じ. 各 JOIN の概念はこのベン図がわかりやすい:

medium.com

ポイントとしてやはり JOIN はコストがかかっていて遅い. クラスタを組んでたとしたら,各所に分散したデータから見つけて JOIN して戻してとかそういう操作が行われている.

ので,後述するパフォーマンスチューニングが必要になってくる.

パフォーマンス

現実のケースとして,膨大なデータセットと格闘するのはそうとう辛いものがある. というのも,4時間とかかかるものだったら,最後の方で落ちたらまたやり直しかとなって, 二回ミスると一日の業務時間を捧げたことになってしまって残業が確定する.

また,自分のケースではオンプレミス上でクラスタが作られていて, リソースがふんだんにある環境というわけでもなかった. なので,途中で OOM が発生したりとか,そういうパフォーマンスの問題にとらわれていた.

途中からはそういうパフォーマンスを改善するために,JOIN の効率を上げるようにデータを削減したり, パーティションの区切りかたを変えたり, パーティションをなるべくクラスタ上に小さく断片化させないような工夫が必要となった.

Broadcast Join というもので,あえて全クラスタにデータセットを重複して配置することで, JOIN 時にデータセットの検索のコストを下げるとかそういうものがある:

spark.apache.org

重要なテクニックとして,各チェックポイント的なところで適宜 DataFrame を .cache() しておくことでパフォーマンスが劇的に改善されるというものもある.

MySQL

さて,MySQL のデータベースから読み込んでうんぬんしたいというのがある.前自分がやったのもこのケース. このケースでは MySQL を扱うために JDBCMySQL コネクタを用意する必要がある.

こちらのかたのエントリと,その Docker イメージが参考になる:

cloudfish.hatenablog.com hub.docker.com

用途

今まで書いた内容でいうと,素朴に SQL を適当な言語でゴリゴリすればいいのでは,というご意見があるが,そのとおり.

Spark が威力を発揮するのは:

  • データがとにかくでかい
  • 適用したい処理が互いに依存しない
  • 各操作に副作用がなくて内部の操作で完結する (外の API への操作とかがない)

というもの.

なので,メモリの限界にぶち当たったり, 節約できても処理全体にバッチ流して二週間かかりますみたいな巨大データだと, 素朴なものでも自前で分割して複数プロセスにわけて実行とかすればできるかもしれないけど, Spark にできることなら任せると楽,かもしれない.

最後に

結果を MySQL にそのままブチ込んだら binlog で埋め尽くされて大変なことになったという思い出がある. が,いろいろ模索してデータ量を間引いたり分割したりして結果大丈夫な感じになった. MySQL とはコネクション張りまくって大変になったりと,いろいろ相性が悪い.

当時やらない選択肢はなかった結果こうなった.