Septeni Engineer's Blog

セプテーニエンジニアが綴る技術ブログ

Akka Streamでスクレイピング時にサーバーに思いやりを与える

こんにちは、菅野です。

近頃はWeb上で様々なサービスが誕生し、誰もが便利に使っていると思います。また、IFTTTでそれらを繋いだり、イベントをLINEに通知したりするのも簡単です。
それはAPIが公開されているからなのですが、データがほしいけどAPIが公開されていない場合はスクレイピングで自力でデータを取ってくるしかありません。

人力ではなくてプログラムでWebサイトにアクセスするときは、気をつけないとスクレイピング先のサーバーに思わぬ負荷をかけてしまうかも知れません。
早朝に自宅に警察が押し寄せてきて逮捕されてしまうのは避けたいので、サーバをいたわるようにスクレイピングしたいと思います。

Throttleで流量制限

という訳でAkka Streamでさくっと作りたいと思います。
せっかくなのでHTTPクライアントはAkka HTTPを使います。

材料は以下です。依存ライブラリに追加しましょう。

"com.typesafe.akka" %% "akka-http" % "10.0.1"

さて、Akka Streamで流量を絞るといったらthrottleです。下の例だと、1秒おきに"hoge"が出力されます。

Source.repeat("hoge").throttle(1, 1.second, 0, ThrottleMode.shaping).runForeach(println)

応用して、下のようなフローはすぐに思い浮かぶと思います。(必要な箇所だけ抜粋)

  val requestFlow =
    Flow[HttpRequest]
    .map(x => (x, ()))
    .throttle(1, 1.second, 0, ThrottleMode.Shaping)
    .via(http.cachedHostConnectionPoolHttps[Unit](HostName))
    .mapAsync(1){ case (x, _) => Future.fromTry(x).flatMap(Unmarshal(_).to[String]) }

  Source.repeat(HttpRequest(uri = "/")).take(5)
    .via(requestFlow)
    .map(html => HttpRequest(uri = getNextUrlFromHtml(html)))
    .via(requestFlow)
    .map(html => HttpRequest(uri = getNextUrlFromHtml(html)))
    .via(requestFlow)
    .runForeach(html => println(html))

実際に動かしてみるとSourceから来る要素が一つだけの場合にはそれっぽい動きをします。
でも複数流れてくると、上の例だと3つあるHTTPリクエストのフローそれぞれで秒間1リクエストの制限がかかっているので実際には秒間3リクエスト飛んでしまいます。
これだと意図せず家宅捜索フラグが立ってしまうのでなんとかしたいところです。

Actor使う

実は公式ドキュメントもフロー全体で一定のレートに制限するというテーマを扱っています。 http://doc.akka.io/docs/akka/2.4.16/scala/stream/stream-cookbook.html#Globally_limiting_the_rate_of_a_set_of_streamsdoc.akka.io

ただ、ちょっと目がチカチカするのでthrottleを使ってもっと手抜き実装が出来ないか考えてみました。

class ClientActor extends Actor {

  val HostName = "www.example.co.jp"

  implicit val mat = ActorMaterializer()
  implicit val ec = context.system.dispatcher

  val http = Http(context.system)

  val requestFlow = Flow[(HttpRequest, ActorRef)]
    .throttle(1, 1.second, 0, ThrottleMode.Shaping)
    .via(http.cachedHostConnectionPoolHttps[ActorRef](HostName))
    .map { case (triedResponse, replyTo) =>
      Future.fromTry(triedResponse).flatMap(Unmarshal(_).to[String]) onComplete {
        case Success(x) => replyTo ! Status.Success(x)
        case Failure(e) => replyTo ! Status.Failure(e)
      }
    }

  val (requestPublisher, done) = Source.actorRef[(HttpRequest, ActorRef)](100, OverflowStrategy.fail)
    .via(requestFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

  done.onComplete(_ => self ! PoisonPill)

  override def receive: Receive = {
    case httpRequest: HttpRequest => requestPublisher ! (httpRequest, sender())
  }

}

とりあえずレート制限付きのHTTPリクエストフローを先に実行させておいて、それをうまくつかってやりくりすればいけそうです。

  val client = system.actorOf(Props[ClientActor])

  val avoidingArrestRequestFlow = Flow[HttpRequest].mapAsync(1)(req => (client ? req).mapTo[String])

  Source.repeat(HttpRequest(uri = "/")).take(5)
    .via(avoidingArrestRequestFlow)
    .map(html => HttpRequest(uri = getNextUrlFromHtml(html)))
    .via(avoidingArrestRequestFlow)
    .map(html => HttpRequest(uri = getNextUrlFromHtml(html)))
    .via(avoidingArrestRequestFlow)
    .runForeach(html => println(html))

動作させると、意図した通りにピッタリ秒間1リクエストで動作していました。これで図書館でも安心
しかしAkka Streamを使っていながらActorを触ることになるのは何かあまりいいものではないのでどうにかしたいです。Backpressure効かないですし。

Streamで頑張る

どうにかしてStreamだけで出来ないか考えてみた結果が下のフロー。

  val avoidingArrestRequestFlow = {
    val sourceQueue = {
      val source = Source.queue[(HttpRequest, Promise[String])](20, OverflowStrategy.backpressure)
        .throttle(1, 1.second, 0, ThrottleMode.Shaping)
      val requestFlow = Flow[(HttpRequest, Promise[String])]
        .via(http.cachedHostConnectionPoolHttps[Promise[String]](HostName))
      val sink = Sink.foreach[(Try[HttpResponse], Promise[String])] {
        case (x, p) => p.completeWith(Future.fromTry(x).flatMap(Unmarshal(_).to[String]))
      }
      source.via(requestFlow).to(sink).run()
    }

    Flow[HttpRequest].mapAsyncUnordered(1) { request =>
      val p = Promise[String]()
      sourceQueue.offer((request, p)).flatMap {
        case QueueOfferResult.Enqueued => p.future
        case failedResult => Future.failed(new Exception(failedResult.toString))
      }
    }
  }

リクエストはキューで受け取るようにして、結果を受け取るためのPromiseも一緒に渡してやることで動くものが出来ました。
動作に問題はなく、UpstreamにBackpressureもかかるのでバッチリ、、、と言いたいところですが、 このフローを同時に実行する数がSourceQueueのサイズに近づいていくと途中で詰まったり、Backpressureがかかっているキューにさらにofferをするなと怒られたりするようになります。

Backpressureが効かないよりはマシで、キューのサイズが十分であれば問題は起こらないので、
もっといい方法が見つかるまで私はこのやり方でやっています。

まとめ

throttleを使えば、Thread.sleepやActorを使うこと無く
簡単に処理速度を抑えることが出来ます。

良いスクレイピングライフを!