読者です 読者をやめる 読者になる 読者になる

Septeni Engineer's Blog

セプテーニエンジニアが綴る技術ブログ

素早くデータマイニングしたくなったらSparkを始めよう

Scala Spark AWS

こんにちは。菅野です。

Scalaを使って集計バッチなどを書くと、ふつうは以下のようにコレクションのメソッドを駆使してデータをこねくり回しますよね?

  val 何かのデータ: Seq[String] = ???
  何かのデータ
    .groupBy(identity)
    .mapValues(_.size)
    .toSeq
    .sortBy(_._2)
    .foreach(println)

Scalaのコレクションは強力で使いやすいので、とりあえずこんな感じで日々のデータを処理すると思います。
しかし実行時間はデータ量に比例するように長くなり、そのうちOutOfMemoryErrorと叫びながらプロセスが爆散するようなります。

でも、もっと速く、もっと大量のデータを処理したいという要求が出た場合にはどうするのでしょうか?
ものすごい廃スペックマシンを用意すれば力技で解決できそうではあります。
それはそれでロマン溢れるのですが、イマドキのエンジニアなら分散処理でスケールアウトさせる方向に持っていきたいですよね?そのほうが無駄なく柔軟にリソース管理ができますし。

前置きが長くなりましたが、集計処理は簡単にスケールアウトできるんです!! そう、Sparkならね。

Apache Sparkとは

spark.apache.org

Sparkは分散処理を行うためのフレームワークです。 今年の7月に2.0.0がリリースされました。これから使うなら断然2系ですね。

この記事ではSparkそのものは特に解説しません。 とりあえず使うに当たってはRDD(Resilient Distributed Dataset)というものを覚えれば使えます。

RDDとは

RDDは簡単に言うとデータのまとまりで、そのデータに対してどのような変形操作を行うかを記述して処理を進めていきます。 SparkにはクラスとしてRDDクラスが存在します。このクラスにはmap flatMap filter groupBy等のメソッドがあって使い方はScalaのコレクションとほとんど変わりません。

じゃあ、RDDはSeqみたいなものかと言うとそうではなく、あくまで概念的なデータのまとまりです。 実データはパーティションという単位で分割されてどのノード上にあるとかは意識せずに扱い、計算そのものも実際に必要になるまで処理が遅延されるので各計算ステップごとにデータが作られるわけでもありません。

でもとりあえず使う分には凄いSeqとでも思っておくとすんなり使えます。 それにSpark上でのRDDの分散処理は勝手にやってくれます!

試しに使ってみる!

ある日突然、Twitterの日本語ツイート50万件分のテキストファイルに含まれる単語のランキングを出したくなったとしましょう。

Scalaのコレクションなら以下のようになると思います。形態素解析にkuromojiを使っています。

  val tokenizer = new Tokenizer()
  Source.fromFile("""/path/to/tweet_20161016.txt""").getLines()
    .map(tokenizer.tokenize)
    .flatMap(_.asScala)
    .filter(x => x.getPartOfSpeechLevel1 == "名詞" || x.getPartOfSpeechLevel1 == "動詞")
    .map(_.getSurface)
    .toSeq
    .groupBy(identity)
    .mapValues(_.size)
    .filter(_._2 > 10) // 多いから減らす…。
    .toSeq
    .sortBy(_._2)
    .foreach(println)

実行結果は以下です。意味ねー。実行時間は65秒かかりました。

~略~
(.,198214)
(/,202843)
(し,209766)
(:,221383)
(RT,231497)
(@,325492)

コレをSparkで処理するように書き換えます。

まずはsbtの依存性にSparkを追加。
"org.apache.spark" %% "spark-core" % "2.0.1"

そしてソースコードを下のように書き換えます。

  val tokenizer = new Tokenizer()
  val conf = new SparkConf().setAppName("Count").setMaster("local[*]")
  val sc = new SparkContext(conf)

  sc.textFile("""/path/to/tweet_20161016.txt""")
    .map(tokenizer.tokenize)
    .flatMap(_.asScala)
    .filter(x => x.getPartOfSpeechLevel1 == "名詞" || x.getPartOfSpeechLevel1 == "動詞")
    .map(_.getSurface)
    .groupBy(identity)
    .mapValues(_.size)
    .filter(_._2 > 10) // 多いから減らす…。
    .sortBy(_._2)
    .foreach(println)

new SparkContext(conf)まではおまじないです。 setMaster("local[*]")の部分でSparkのマスターノードを指定しています。基本的にマスターノードは環境から与えられるのでプログラムから指定する必要はないのですが、今回は手軽にローカルマシン上でスタンドアロンでSparkを動作させるために指定しています。
[*]は実行スレッド数の指定で、*でCPUのコア数が自動で設定されます。ちなみにサンプルは物理4コアのマシンで実行しています。

SparkContexttextFileメソッドでテキストファイルからRDDを作成できます。それ以降はScalaのコレクションの方とほとんど変わりません。というか、.toSeqの必要が無くなった分よりスッキリした感じ?!

簡単に書き換えられたので実行します。そしてわりとすぐに実行結果出ました!速~い、って結果違うし!!

~略~
(Nq,17)
(クロスボーンガンダム,17)
(DjaV,17)
(End,17)
(happiness,17)
(garasu,17)
(ステレオ,17)

何故なのか?答えを言ってしまうと、.foreach(println)の部分も並行処理されちゃっているからです。 分散処理された結果を一つの集計結果にしたい場合には.collect()メソッドを使います。そうすると各パーティションで処理された結果を一つにまとめることが出来ます。

    ~略~
    .sortBy(_._2)
    .collect()
    .foreach(println)

出力前にcollectするよう修正してから再度実行。 結果は最初のサンプルと同じ結果になりました。21秒。速い!

最初のScalaコレクションでの実行とSparkでの実行とでは使用するスレッド数が違うというハンデがあるので、.parを使ってパラレルコレクションを使うバージョンでも試してみました。 しかし、それでも33秒かかったのでSparkの方がより効率的に実行できているようでした。CPU使用率もSparkでの実行のほうが低かったです。

クラスター環境で実行する

で、当初の目的はいくらでもスケールする環境で集計処理を実行することでした。
Sparkはクラスタを組んで実行環境を作ることができ、分散処理も勝手にやってくれます。 でも自前でSparkクラスタを構築する?いえ、そんなことするよりもっといい物があります。 Amazon EMRです。

aws.amazon.com

EMRは手軽にSparkクラスタの環境を提供してくれます。
2016年8月にemr-5.0.0がリリースされ、Spark2.0.0に対応しました。
Sparkの環境はAWSが用意してくれるので、私がやることはさっきのサンプルを実行するだけです。やったね!

クラスター上で実行する用のjar作成

クラスター上で実行するにはspark-submitというコマンドに渡せる形のjarにする必要があります。
いったん先程のソースコードを修正します。

import com.atilika.kuromoji.ipadic.Tokenizer
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._

object CountSparkCluster extends App {

  val conf = new SparkConf().setAppName("Count")
  val sc = new SparkContext(conf)

  sc.textFile("""s3://sample-zakknak/tweet_230.txt.gz""")
    .mapPartitions { x =>
      val tokenizer = new Tokenizer()
      x.map(tokenizer.tokenize)
    }
    .flatMap(_.asScala)
    .filter(x => x.getPartOfSpeechLevel1 == "名詞" || x.getPartOfSpeechLevel1 == "動詞")
    .map(_.getSurface)
    .groupBy(identity)
    .mapValues(_.size)
    .filter(_._2 > 100) // 出力が多いのでもっと減らす…。
    .sortBy(_._2)
    .saveAsTextFile("""s3://sample-zakknak/output""")

}

まず、.setMaster("local[*]")は余計になるので取り除きました。
そして読み込みと結果の保存先をS3にしました。EMRで動かす場合はS3を簡単に使えます。

そしてmapPartitionsを使って各ノード上でそれぞれTokenizerのインスタンスを作るように修正しました。
詳しく解説しませんが、こうしないと複数ノード上で実行することが出来ません。

ちなみに、せっかくクラスター上で動かすので230万件のツイートを用意しました。
データの転送もそれなりにコストになるのでgzip形式にしています。

ソースのコンパイル結果をjarにします。kuromojiも使っているのでに一緒に固めておきましょう。
"sbt-assembly"を使えば簡単にできます。
が、そのままやるとsparkのライブラリも含まれてしまいます。
spark上で実行するjarにsparkのライブラリを含める必要はないので
"org.apache.spark" %% "spark-core" % "2.0.1" % "provided"
のようにしておきます。

作成したらS3に上げておきます。 出力先のディレクトリは作る必要ありません。 f:id:zakknak:20161101021456p:plain

いざSparkクラスター作成!

とりあえずEMRの画面を開くと、
f:id:zakknak:20161101004602p:plain
早く作れと催促されます。

作成ボタンをポチるとほとんど何も設定できないクイックオプションが出てくるのですが、
惑わされずに詳細オプションへ行きましょう。何故急かすのか? f:id:zakknak:20161101004720p:plain

ソフトウェア設定ではSpark2.0.1を選択します。 f:id:zakknak:20161101005255p:plain

ステップには「Sparkアプリケーション」としてs3にアップロードしたjarを指定します。
メインクラスも引数で指定しておきましょう。 f:id:zakknak:20161101005604p:plain

ハードウェア構成はお好みで選択してください。
今回は試しにインスタンス10個にしますが、お財布に優しさを与えるためスポットインスタンスを使います。 f:id:zakknak:20161101005902p:plain

全般設定でログの出力先の設定しておきます。 f:id:zakknak:20161101010652p:plain

最後にEC2のキーペアを選択します。
無くてもEMRは使えるのですが、あるとEC2インスタンスに入れるので直接sparkのコマンドが実行出来て便利。 f:id:zakknak:20161101010828p:plain

最後に「クラスターを作成」ボタンを押すと終わりです。

「プロビジョニング」→「ブートストラップ」→「実行中」とステータスが変わっていき、クラスター全体が起動するのに7分くらいかかります。 f:id:zakknak:20161101012807p:plain

ついに起動しました!画面ポチポチするだけですぐクラスター作れるのは便利です。 f:id:zakknak:20161101013710p:plain

作ったアプリが動き始めました。 f:id:zakknak:20161101013755p:plain

4分で終わりました。 f:id:zakknak:20161101013838p:plain

出力先にはこのようなファイルが作成されます。 f:id:zakknak:20161101013907p:plain

中身はきちんと処理結果が入っています。@が一番多いという相変わらず意味のない結果…。
f:id:zakknak:20161101013945p:plain
一見して、日時を表すパーツが多かったです。ちなみに一番多かった絵文字は上から132番目の✨でした。

CLIも使えるよ

画面から動かすのも良いのですが、
マスターノードのインスタンスに入ると直接sparkのコマンドが叩けるので試してみます。

まずマスターノードのインスタンスsshログインします。
接続方法は「マスターパブリック DNS」の右にある「SSH」のリンクから確認できます。
つながらない場合はマスターノードのセキュリティグループの設定でインバウンドのsshのポートを許可するようにします。
f:id:zakknak:20161101020932p:plain

インスタンスに入れたらsparkのコマンドが実行できます。 EMRのステップに表示されているとおり、
spark-submit --deploy-mode cluster --class CountSparkCluster s3://sample-zakknak/spark-assembly-1.0.jar
というコマンドで同様の動作をします。ぶっちゃけ、Web画面よりCLIのほうがレスポンスがいいです。

spark-submit以外にもspark-shellというものがあり、こちらはREPL形式で実行できます。
f:id:zakknak:20161101020954p:plain

突然の死

あ、、、 f:id:zakknak:20161101014301p:plain 遊んでいるうちにインスタンスが止まりました。

スポットインスタンス価格の急激な上昇に私のクラスターは木っ端微塵にされてしまいました。 f:id:zakknak:20161101014342p:plain

でも1時間以内にAWS側からterminateされたのでお試し実行には好都合。
ただしEMRの料金はきっちり請求されました。$0.07でした。 f:id:zakknak:20161101014910p:plain

Databricksでもっと手軽にSparkを試す

databricks.com

もっともっと手軽にSparkを試したい場合にはDatabricksを使いましょう。
こちらはWebのUIで手軽にSparkを試せます。
Databricks Community Editionであれば無料で始められます。

アカウントを作成したら必要なものをアップロードします。
kuromojiのjarとツイートのデータです。

jarはメニューの「Create > Library」でアップロードできます。
f:id:zakknak:20161102083539p:plain f:id:zakknak:20161102083720p:plain

データのテキストファイルはCreate Tableからアップロードできます。
f:id:zakknak:20161102083744p:plain

準備が整ったらクラスターを起動します。Community Editionだと何の選択肢もありません。
f:id:zakknak:20161102083918p:plain f:id:zakknak:20161102083926p:plain

あとはノートブックにコードを書き、クラスターをattachしてRunするだけです。
f:id:zakknak:20161102084159p:plain

Community Editionで作成できるクラスターだとツイート230万件は処理しきれなかったので、50万件で実行してみました。
f:id:zakknak:20161101021540p:plain 42秒くらいで終わってますね。 別なことがしたくなったらコードを書き換えて再実行するだけです。
めちゃくちゃ手軽!

まとめ

Spark敷居低いです✨