前職で Apache Spark で鬼でかい 10 年分の記事データの形態素解析をゴリゴリするっていうことをしたことがあった. 辛い思いで実装したものだったが,それをインターネットに残してなかったのはもったいないな,という思いになった. なので,記憶のあるうちに残しておこうと思った.
しかしながら,Spark の環境自体は自分ではない人に作ってもらってて, それに乗っかっただけだったので,インフラノウハウはなくて, Spark を支える技術,もよく知らないという状態にある.
現実的には AWS におまかせするということで,Amazon EMR とか AWS Glue でやるのがいいだろうけど, それもよくわからないということで,まず ローカルの Docker でやってみようと思った.
僕の理解では Spark は複数台でクラスタを作って worker に仕事をさせるのがキモなので, ローカルだとクラスタを作らずに standalone モードで動きそうなので, 本気の巨大データを打ち込んでもパフォーマンスは出ずに恩恵にはあずかれないはず.
しかしながら,お試しならローカルでやると便利.
使う
Jupyter + PySpark な環境が動く Docker イメージが用意されているので,お試しにはこれが楽そう.
余談として,Spark 自体は Scala だけど, Python で使えるやつがあってそれが PySpark だという話がある. Scala <-> Python の変換のコストが結構でかいうんぬんみたいな話題もある. (IPC でがんばってるという仕組みになっていたはず)
前職では PySpark を使っていた. なぜかというと用意してくれた人が Python を使っていたからただそれだけ.
さて,使ってみる:
docker run -it -p 8888:8888 jupyter/pyspark-notebook
これを実行すると Terminal に 8888 番にトークンがついた URL が流れてくるので, おもむろにアクセスすると Jupyter のページが出てきて Notebook でコーディングできる簡単環境のできあがり.
概要
さて,Spark 自体だが僕の理解はかなりニワカで聞きかじりで,次のような感じ:
- なんらかのデータソースを持ってきて
- そのデータ列を適当な単位で小分けにして
- 各単位で
map
したりして操作して - その結果を再度
reduce
して集めてきて - 出力するという仕組み.
データソースは CSV だったり MySQL だったりするかもしれないが,
操作をするときは DataFrame
というものに抽象化されている.
(DataSet とか RDD とかいう単位もある)
クラスタモードだとデータそのもの自体は Hadoop の hdfs の分散型のストレージ上に小分けされて, それを各クラスタでつつくようになっている.
試す
動くかどうかのテストコードは以下で,サンプルからとってきた:
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() # do something to prove it works spark.sql('SELECT "Test" as c1').show()
SparkSession
というやつはよくわからないけど Spark 自体のインスタンスみたいなものという認識.
データを扱う
こういうデータを対象にしてみる:
id |
name |
gender |
age |
---|---|---|---|
1 | サトシ | male | 10 |
2 | シゲル | male | 10 |
3 | カスミ | female | 12 |
入力と定義
Python で素朴にデータを定義するとこうなる:
from typing import List, Tuple Trainer = Tuple[int, str, str, int] trainers: List[Trainer] = [ (1, 'サトシ', 'male', 10), (2, 'シゲル', 'male', 10), (3, 'カスミ', 'female', 12), ]
各行の型は Python の typing
でいう Tuple[int, str, str, int]
だ.
で,Spark でもスキーマの定義がある:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ])
これで列のスキーマを定義できる.
これを Spark の DataFrame
に変換する:
from pyspark.sql import DataFrame trainers_df: DataFrame = spark.createDataFrame( spark.sparkContext.parallelize(trainers), trainers_schema )
これで trainers_df
という DataFrame
ができた.
実際にはデータソースとして CSV とか MySQL とかそういうものから読み込める. (場合により後述する JDBC とか,Hadoop の設定が必要)
これをただダンプするには
trainers_df.show()
すればいい.
すると出力はこうなる:
+---+------+------+---+ | id| name|gender|age| +---+------+------+---+ | 1|サトシ| male| 10| | 2|シゲル| male| 10| | 3|カスミ|female| 12| +---+------+------+---+
素朴〜〜〜〜
集計と出力
値をもらうには .collect()
すればいい:
result = trainers_df.collect()
print(result)
CSV に書き出すときはこういう雰囲気:
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
.coalesce(1)
はパーティションごとの分割されているデータを,
1つのパーティションに coalesce するというもの.
こうしないと,分割されたまま CSV 出力される.
Hadoop の hdfs
コマンドをつかって,分割されたものを1つにまとめて取得するという手段もある.
入力同様,他にも S3, MySQL とか Elasticsearch とかいろいろ出力先がある雰囲気.
基本的に遅延評価になっていて,.collect()
みたいな操作をしてはじめて評価されるようになっている.
基本
これだけではただ表示しただけでまったく意味がないので適当な操作をしてみる:
trainers_df.createOrReplaceTempView('trainers'); male_trainers_df = spark.sql(''' SELECT * FROM trainers WHERE gender = 'male' ''') male_trainers_df.show()
これはこうなる:
id | name | gender | age |
---|---|---|---|
1 | サトシ | male | 10 |
2 | シゲル | male | 10 |
DataFrame.createOrReplaceTempView(name)
は DataFrame
を,
一時的な SQL の View として登録することができる.
これで spark.sql(query)
で登録した View を対象に SQL の操作した結果の DF を得ることができる.
こうすれば,全く臆することなく慣れ親しんだ SQL を使って Spark を使うことができて, 心理的障壁も学習コストも低いというマジックになっている.
View に登録しなくても,DataFrame
のままコードで記述するという方法もある:
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
こっちのほうが使いやすいケースもある.
応用
SQL を使うことができるのだから,基本的な操作では別に問題ないが, たいてい Spark を使いたいケースというのはなにかユーザー定義の操作をしたい状況になっている.
たとえば自分のケースでは記事本文を形態素解析して分かち書きするということで, これは SQL だけでは実現しがたい.
ただ,Python 上であれば比較的簡単な作業で,MeCab がある. MeCab のライブラリを使って形態素解析してやれば何も考えなくても分解されてやってくる.
そういう操作を Spark 上で DataFrame
に対して行うにはどうすればいいかというと,
UDF (User-Defined Function) を定義するといい.
(※ DataFrame
ではなく RDD というものに対しては直接 lambda
を適用できるという技がある.が,これはパフォーマンスが悪い)
UDF を定義するには次のような定義を行う:
from pyspark.sql.functions import udf @udf(StringType()) def name_with_suffix(name: str, gender: str) -> str: return name + {'male': 'くん', 'female': 'さん'}.get(gender, '氏') spark.udf.register('name_with_suffix', name_with_suffix)
UDF となる関数に @udf(ReturnType)
デコレーターを適用することで,
その関数は UDF として定義できるようになる.
それを Spark SQL で使うには spark.udf.register(udf_name, udf)
して登録すればいい.
この例としてあげたものは gender
に応じて,
name
に gender
に応じた suffix をつけるというもの.
ちなみにデコレーターを使わなくても,udf_fn = udf(fn)
すれば既存の関数を適用できる.
この UDF を適用してみる:
dearest_trainers = spark.sql(''' SELECT name_with_suffix(name, gender) FROM trainers ''') dearest_trainers.show()
結果はこうなる:
name_with_suffix(name, gender) |
---|
サトシくん |
シゲルくん |
カスミさん |
こんなもの SQL でも CASE
を駆使して書けるというご意見があるが,そのとおり.
やりたいことによっては便利に使える.
UDF
さて,前述した形態素解析して分かち書きするというものだが, これはイメージとしてこのような関数になる (実際には MeCab をカッコよく使う):
import re # 半角/全角スペースや約物で文字列を分割する @udf(ArrayType(StringType())) def wakachi(text: str) -> List[str]: return [ word for word in re.split('[ !…]+', text) if len(word) > 0 ]
これを適用するのも同じくそのまま使えばいい:
Trainer = Tuple[int, str, str, int, str] trainers: List[Trainer] = [ (1, 'サトシ', 'male', 10, 'ポケモン ゲット だぜ'), (2, 'シゲル', 'male', 10, 'このおれさまが せかいで いちばん! つよいって ことなんだよ!'), (3, 'カスミ', 'female', 12, 'わたしの ポリシーはね… みず タイプ ポケモンで せめて せめて …せめまくる ことよ!'), ] trainers_schema = StructType([ StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('gender', StringType(), True), StructField('age', IntegerType(), True), ]) trainers_df = spark.createDataFrame( spark.sparkContext.parallelize(trainers), trainers_schema ) trainers_df.createOrReplaceTempView('trainers'); wakachi_trainers_df = spark.sql(''' SELECT id, name, wakachi(comment) FROM trainers ''') wakachi_trainers_df.show()
ここでポイントになるのは,str
を受け取って List[str]
として展開するということだ.
これを実行してみるとこうなる:
id | name | wakachi(comment) |
---|---|---|
1 | サトシ | [ポケモン, ゲット, だぜ] |
2 | シゲル | [このおれさまが, せかいで, い... |
3 | カスミ | [わたしの, ポリシーはね, みず... |
展開されたセルはリストになっている.ので列のなかに更に列がある入れ子状態になっている.
これをそれぞれの str
を列として展開したい場合どうすればいいだろうか?
展開する関数を更に適用すればいい:
from pyspark.sql.functions import explode wakachi_trainers_df = spark.sql(''' SELECT id, name, explode(wakachi(comment)) FROM trainers ''') wakachi_trainers_df.show()
explode
という関数があるので,
これを適用すれば入れ子になった要素がそれぞれの列として展開される:
id | name | col |
---|---|---|
1 | サトシ | ポケモン |
1 | サトシ | ゲット |
1 | サトシ | だぜ |
2 | シゲル | このおれさまが |
2 | シゲル | せかいで |
2 | シゲル | いちばん |
2 | シゲル | つよいって |
2 | シゲル | ことなんだよ |
3 | カスミ | わたしの |
3 | カスミ | ポリシーはね |
3 | カスミ | みず |
3 | カスミ | タイプ |
3 | カスミ | ポケモンで |
3 | カスミ | せめて |
3 | カスミ | せめて |
3 | カスミ | せめまくる |
3 | カスミ | ことよ |
ジョイン
さらなるポイントとして DataFrame
どうしの JOIN
ができる.
普通の MySQL とかの JOIN
と変わらずに結合につかうカラムを指定して,
それをもとに DataFrame
を結合する.
Pkmn = Tuple[int, int, str, int] pkmns: List[Pkmn] = [ (1, 1, 'ピカチュウ', 99), (2, 1, 'リザードン', 99), (3, 2, 'イーブイ', 50), (4, 3, 'トサキント', 20), (5, 3, 'ヒトデマン', 30), (6, 3, 'スターミー', 40), ] pkmns_schema = StructType([ StructField('id', IntegerType(), True), StructField('trainer_id', IntegerType(), True), StructField('name', StringType(), True), StructField('level', IntegerType(), True), ]) pkmns_df = spark.createDataFrame( spark.sparkContext.parallelize(pkmns), pkmns_schema ) pkmns_df.createOrReplaceTempView('pkmns'); trainer_and_pkmns_df = spark.sql(''' SELECT * FROM trainers INNER JOIN pkmns ON trainers.id = pkmns.trainer_id ''') trainer_and_pkmns_df.show()
id | name | gender | age | comment | id | trainer_id | name | level |
---|---|---|---|---|---|---|---|---|
1 | サトシ | male | 10 | ポケモン ゲット だぜ | 1 | 1 | ピカチュウ | 99 |
1 | サトシ | male | 10 | ポケモン ゲット だぜ | 2 | 1 | リザードン | 99 |
3 | カスミ | female | 12 | わたしの ポリシーはね… みず タ... | 4 | 3 | トサキント | 20 |
3 | カスミ | female | 12 | わたしの ポリシーはね… みず タ... | 5 | 3 | ヒトデマン | 30 |
3 | カスミ | female | 12 | わたしの ポリシーはね… みず タ... | 6 | 3 | スターミー | 40 |
2 | シゲル | male | 10 | このおれさまが せかいで いちばん... | 3 | 2 | イーブイ | 50 |
ちなみに INNER JOIN
, OUTER JOIN
の他に種類がいっぱいある:
これで集合操作ができるので便利という感じ.
各 JOIN
の概念はこのベン図がわかりやすい:
ポイントとしてやはり JOIN
はコストがかかっていて遅い.
クラスタを組んでたとしたら,各所に分散したデータから見つけて JOIN
して戻してとかそういう操作が行われている.
ので,後述するパフォーマンスチューニングが必要になってくる.
パフォーマンス
現実のケースとして,膨大なデータセットと格闘するのはそうとう辛いものがある. というのも,4時間とかかかるものだったら,最後の方で落ちたらまたやり直しかとなって, 二回ミスると一日の業務時間を捧げたことになってしまって残業が確定する.
また,自分のケースではオンプレミス上でクラスタが作られていて, リソースがふんだんにある環境というわけでもなかった. なので,途中で OOM が発生したりとか,そういうパフォーマンスの問題にとらわれていた.
途中からはそういうパフォーマンスを改善するために,JOIN
の効率を上げるようにデータを削減したり,
パーティションの区切りかたを変えたり,
パーティションをなるべくクラスタ上に小さく断片化させないような工夫が必要となった.
Broadcast Join というもので,あえて全クラスタにデータセットを重複して配置することで,
JOIN
時にデータセットの検索のコストを下げるとかそういうものがある:
重要なテクニックとして,各チェックポイント的なところで適宜 DataFrame を .cache()
しておくことでパフォーマンスが劇的に改善されるというものもある.
MySQL
さて,MySQL のデータベースから読み込んでうんぬんしたいというのがある.前自分がやったのもこのケース. このケースでは MySQL を扱うために JDBC の MySQL コネクタを用意する必要がある.
こちらのかたのエントリと,その Docker イメージが参考になる:
cloudfish.hatenablog.com hub.docker.com
用途
今まで書いた内容でいうと,素朴に SQL を適当な言語でゴリゴリすればいいのでは,というご意見があるが,そのとおり.
Spark が威力を発揮するのは:
- データがとにかくでかい
- 適用したい処理が互いに依存しない
- 各操作に副作用がなくて内部の操作で完結する (外の API への操作とかがない)
というもの.
なので,メモリの限界にぶち当たったり, 節約できても処理全体にバッチ流して二週間かかりますみたいな巨大データだと, 素朴なものでも自前で分割して複数プロセスにわけて実行とかすればできるかもしれないけど, Spark にできることなら任せると楽,かもしれない.
最後に
結果を MySQL にそのままブチ込んだら binlog
で埋め尽くされて大変なことになったという思い出がある.
が,いろいろ模索してデータ量を間引いたり分割したりして結果大丈夫な感じになった.
MySQL とはコネクション張りまくって大変になったりと,いろいろ相性が悪い.
当時やらない選択肢はなかった結果こうなった.