Septeni Engineer's Blog

セプテーニ・オリジナルのエンジニアが綴る技術ブログ

Play FrameworkでServer-Sent Events(SSE)を使ってServer pushする方法あれこれ

あけましておめでとうございます。
初めまして、2017年4月に中途入社した張沢と申します。

今回はPlay FrameworkでServer-Sent Events(SSE)を使用してServer pushを行う実装方法について書きます。 WebSocketの情報は検索すると色々見つかりますが、SSEの記事やサンプルコードはあまり見かけないため…。

Play Framework 2.5からstream関連のAPI実装がAkka Streamに変わりました。そのため、Server pushのAPIでもAkka Streamを使用する必要があります。この記事ではSSEでのAkka Streamの使用方法について紹介します。

Server-Sent Eventsとは

Client(ブラウザ)へHTTP server pushを実現するための技術の1つで、以下のような特徴があります。

  • HTTPを利用したストリーム通信
  • 通信はServerからClient(ブラウザ)への一方向
  • UTF-8エンコードの文字列のみ送信可能
    • Base64に変換することで、画像データやUTF-8以外の文字列も送信できます
  • IEやEdgeではサポートされていませんが、polyfillが存在します

Play FrameworkにおけるSSEの実装方法

Play Framework 2.6でSSEを実装する場合、Controllerで下記のように書きます。

/app/controllers/FooController.scala

package controllers

import javax.inject.{Inject, Singleton}

import akka.stream.scaladsl.Source
import play.api.http.ContentTypes
import play.api.libs.EventSource
import play.api.mvc._

@Singleton
class FooController @Inject() (cc: ControllerComponents) extends AbstractController(cc) {

  def sse = Action {
    val source: Source[String, _] = _
    
    Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }
  
}

/conf/routes

GET     /sse                        controllers.FooController.sse

SSEは文字列しか返せない仕様のため、最終的にStringを返すAkka StreamのSourceを指定する必要があります。自分で定義した型をSourceとして流す場合、source.map()でStringへ変換してください。

source.map(Json.toJson(_).toString)のようにしてJSONの文字列に変換し、ブラウザ側(JavaScript)でJSON.parse()してデータをやりとりする方法もよいと思います。例えば以下のように。

ブラウザ側のJavaScript

const eventSource = new EventSource('/sse');
  
eventSource.onmessage = event => {
  // サーバーから送信されたJSON文字列をparseする
  const json = JSON.parse(event.data);
  
  // do something.
}

なるほど、簡単そうですね?

SSEのサンプルコード

さて、Play FrameworkでSSEを行うサンプルコードを探してみると、Play 2.5で非推奨となったEnumeratorを使用したサンプルSource.tick()を使用した一定時間ごとに時刻を送信するだけのサンプルしか見つからず、困り果てた方もいるのではないでしょうか?

例えば、他のActionで発生したリクエストやイベントなどのデータをSSEで各ブラウザにpushするサンプルが欲しくなるかと思います。

ということで、ごくごく簡単なチャットアプリケーションをPlay Framework 2.6で作りました。メッセージを投稿するとSSEにより同じページを開いている他のブラウザへ投稿したメッセージを送信します。

f:id:m-harisawa:20180105164551g:plain

github.com

以下のコマンドでこのアプリケーションを起動することができます。起動後、ブラウザでlocalhost:9000にアクセスしてください。

$ git clone https://github.com/septeni-original/play-scala-sse-example.git
$ cd play-scala-sse-example
$ sbt run

このチャットアプリケーションには4つの実装サンプルがあり、それぞれ以下のSourceを使用しています。

  • Source.actorRef()
  • Source.actorPublisher()
  • Source.fromGraph()
  • MergeHub & BroadcastHub

それぞれのSourceについて説明していきます。

Source.actorRef()

おそらく一番簡易的に使用できるSourceです。Source[_, ActorRef]が生成され、このactorRefにブラウザへ送信したいメッセージを送ります。

生成されたactorRefにはmapMaterializedValue()またはwatchTermination()でアクセスできます。基本的にはActorの停止を監視すると思いますので、watchTermination()を使うことになります。

下記の例ではSource.actorRef()で生成されたactorRefを保持し、メッセージが投稿されるたびに保持しているactorRefへメッセージを送信して、SSE経由で各ブラウザへ送っています。

class ActorRefController @Inject() (system: ActorSystem,
                                    cc: ControllerComponents,
                                    addToken: CSRFAddToken)
                                   (implicit executionContext: ExecutionContext)
  extends AbstractController(cc) {

  // `Source.actorRef()` の actorRef を管理するためのActor
  private[this] val manager = system.actorOf(ActorRefManager.props)

  def index = addToken(Action { implicit request =>
    Ok(views.html.actorRef(CSRF.getToken.get))
  })

  def receiveMessage = Action(parse.json[Message]) { request =>
    // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する
    manager ! SendMessage(request.body.toString)
    Ok
  }

  def sse = Action {
    val source  =
      Source
        .actorRef[String](32, OverflowStrategy.dropHead)
        .watchTermination() { case (actorRef, terminate) =>
          // actorRefをmanagerに登録し、actorが停止した際には登録を解除する
          manager ! Register(actorRef)
          terminate.onComplete(_ => manager ! UnRegister(actorRef))
          actorRef
        }

    Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }

}

class ActorRefManager extends Actor {

  private[this] val actors = mutable.Set.empty[ActorRef]

  def receive = {
    case Register(actorRef)   => actors += actorRef
    case UnRegister(actorRef) => actors -= actorRef
    case SendMessage(message) => actors.foreach(_ ! message)
  }

}

object ActorRefManager {
  def props: Props = Props[ActorRefManager]

  case class SendMessage(message: String)

  case class Register(actorRef: ActorRef)
  case class UnRegister(actorRef: ActorRef)
}

バッファーが溢れた際の動作をOverflowStrategyで指定できますが、このactorRefはAkkaが内部で生成したActorPublisherのため、背圧制御を行う(指定する)ことはできません

Source.actorPublisher()

背圧制御を行うためにはActorPublisherを自分で定義します。

Implementing Reactive Streams Publisher or Subscriber

class Publisher extends ActorPublisher[String] {
  import akka.stream.actor.ActorPublisherMessage._

  val MaxBufferSize = 256
  private[this] var buf = Vector.empty[String]

  def receive = {
    case SendMessage(message) if buf.size == MaxBufferSize =>
      sender() ! MessageDenied(self, message)
    case SendMessage(message) =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      // Subscriberからデータを要求されたらバッファーに溜めてたデータを送信する
      deliverBuf()
    case Cancel =>
      // `Cancel`メッセージを受け取った場合はActorを停止する
      context.stop(self)
  }

  @annotation.tailrec
  final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      /*
       * totalDemand is a Long and could be larger than
       * what buf.splitAt can accept
       */
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

object Publisher {
  def props: Props = Props[Publisher]
}

ActorPublisherではtotalDemandでSubscriberが要求している(受け取れる)データの個数が取得できますので、その数を超えないようにonNext()でデータを送ります。(totalDemandがLongを返すことに注意してください)

今回のケースでは、データの送信タイミングは以下の2通りです。

  • SubscriberからRequestメッセージを受け取った(データを要求された)場合
  • チャットのメッセージが送信されてきた時に、Subscriberがまだデータを受け取れる場合

それ以外の場合は自身が持っているバッファーにデータを溜めておきます。

Source.actorPublisher()で定義したActorPublisherからSourceを生成できます。

class PubSubController @Inject() (system: ActorSystem,
                                  cc: ControllerComponents,
                                  addToken: CSRFAddToken)
                                 (implicit executionContext: ExecutionContext)
  extends AbstractController(cc) {

  // ActorPublisherを管理するためのActor
  private[this] val manager = system.actorOf(PublishersManager.props, PublishersManager.name)

  def index = addToken(Action { implicit request =>
    Ok(views.html.pubSub(CSRF.getToken.get))
  })

  def receiveMessage = Action(parse.json[Message]) { request =>
    // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する
    manager ! SendMessage(request.body.toString)
    Ok
  }

  def sse = Action {
    val source =
      Source
        .actorPublisher[String](Publisher.props)
        .watchTermination() { case (publisher, terminate) =>
          // ActorPublisherをmanagerに登録し、actorが停止した際には登録を解除する
          manager ! Register(publisher)
          terminate.onComplete(_ => manager ! UnRegister(publisher))
          publisher
        }

    Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }

}

class PublishersManager extends Actor with ActorLogging {

  private[this] val publishers = mutable.Set.empty[ActorRef]

  def receive = {
    case Register(publisher)   => publishers += publisher
    case UnRegister(publisher) => publishers -= publisher
    case event: SendMessage    => publishers.foreach(_ ! event)
    case denied: MessageDenied => log.error(denied.toString)
  }

}

object PublishersManager {
  val name: String = PublishersManager.getClass.getSimpleName
  val props: Props = Props[PublishersManager]

  case class SendMessage(message: String)

  case class Register(publisher: ActorRef)
  case class UnRegister(publisher: ActorRef)
  case class MessageDenied(publisher: ActorRef, message: String)
}

ActorPublisher内で背圧制御を行っているところ以外は、ほぼSource.actorRef()のサンプルコードと同じ処理になっています。

ちなみにActorPublisherはAkka-Stream 2.5.xでdeprecatedになっています

Source.fromGraph()(GraphStage)

deprecatedになったActorPublisherの代わりとして、GraphStageを使えばよさそうなことが書かれていますので、実際に使ってみます。

Custom stream processing • Akka Documentation

It is possible to acquire an ActorRef that can be addressed from the outside of the stage, similarly how AsyncCallback allows injecting asynchronous events into a stage logic. This reference can be obtained by calling getStageActorRef(receive) passing in a function that takes a Pair of the sender ActorRef and the received message.

  • they are not location transparent, they cannot be accessed via remoting.
  • they cannot be returned as materialized values.
  • they cannot be accessed from the constructor of the GraphStageLogic, but they can be accessed from the preStart() method.

Stage外からイベントを受け取るためのactorRefが用意されているようです。getStageActorRef(receive)でアクセスできますが、preStart()内でアクセスする必要があったり、位置透過性がなくリモートからアクセスできないなど注意すべき点があるようです。

class MessageStage(manager: ActorRef) extends GraphStage[SourceShape[String]] {
  // Define the (sole) output port of this stage
  val out: Outlet[String] = Outlet("MessageStage")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  val shape: SourceShape[String] = SourceShape(out)

  private[this] val messages = mutable.Queue.empty[String]
  private[this] var downstreamWaiting = false

  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    implicit def self: ActorRef = stageActor.ref

    override def preStart(): Unit = {
      // `preStart()` 内で ActorRef を取得し、managerに登録する
      val thisStageActor = getStageActor(receive).ref
      manager ! Register(thisStageActor)
    }

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (messages.isEmpty) {
          // 送信するメッセージが来るまで`push()`しない
          downstreamWaiting = true
        } else {
          push(out, messages.dequeue())
        }
      }
    })

    private def receive(receive: (ActorRef, Any)): Unit = receive match {
      case (_, SendMessage(message)) =>
        messages.enqueue(message)
        // 下流の`onPull()`に対してまだメッセージを送信していなければ送信する
        if (downstreamWaiting) {
          downstreamWaiting = false
          push(out, messages.dequeue())
        }
    }
  }
}

portとshape

GraphStageのボイラープレートのようなもので、portとshapeを定義する必要があります。

portは、入力側のin: Inletや出力側のout: Outletを定義するのですが、今回はSourceなので出力側のoutのみ定義します。Flowのように入力と出力がある場合はinout、Sinkのように入力しかない場合はinのみ定義します。port定義時に名前をつける必要がありますが、基本的には自身のclass名を指定します。Flowのようにinoutの2つのportがある場合は、それぞれClassName.in, ClassName.outのように分けて名前をつけます。今回はoutのみなのでOutlet("MessageStage")としています。

shapeは基底クラスGraphのshapeをoverrideしています。各GraphStageごとに対応しているshapeを設定します。

  • GraphStage[SourceShape[_]]
    SourceShape(out)

  • GraphStage[FlowShape[_, _]]
    FlowShape.of(in, out)

  • GraphStage[SinkShape[_]]
    SinkShape(in)

preStart()postStop()

前述の通り、preStart()でactorRefを取得しています。何かしらの前処理を行う場合はここで行うのが良いでしょう。逆に後処理が必要な場合は、postStop()をoverrideしてその中で行います。

handler

各portに対してhandlerを設定します。outには下流からデータのリクエストがあった際の処理をOutHandlerで記述します。やることは簡単でonPull()内でpush()を実行して下流にデータを送信するだけです。

ここで送信するべきデータがない場合に何をすればいいのか悩むかと思います。結論から言うと、次に送信するデータの用意ができるまでonPull()では何もしなくてOKです

Rate decoupled graph stages

公式ドキュメントのサンプルコード通り、onPull()で送信すべきメッセージがまだない場合、downstreamWaiting = trueにして下流を待たせている状態であることを記憶しておきます。そして次に送信すべきメッセージが来たとき、下流を待たせている場合はpush()でメッセージを送信します。つまり1回のpullに対して1回pushすればよいわけです。また、push()を実行するまで次のpullが来ることはありません。

Source.fromGraph()で定義したGraphStageからSourceを生成できます。

class GraphStageController @Inject() (system: ActorSystem,
                                      cc: ControllerComponents,
                                      addToken: CSRFAddToken)
  extends AbstractController(cc) {

  // GraphStage の actorRef を管理するためのActor
  private[this] val manager = system.actorOf(GraphStageManager.props)

  def index = addToken(Action { implicit request =>
    Ok(views.html.graphStage(CSRF.getToken.get))
  })

  def receiveMessage = Action(parse.json[Message]) { request =>
    // ブラウザから送信されてきたメッセージを `manager` 経由でこのページを開いてるすべてのブラウザへ送信する
    manager ! SendMessage(request.body.toString)
    Ok
  }

  def sse = Action {
    val source = Source.fromGraph(new MessageStage(manager))
    Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }

}

class GraphStageManager extends Actor {

  private[this] val actors = mutable.Set.empty[ActorRef]

  def receive = {
    case Register(actorRef) =>
      // GraphStageのActorが停止した際に登録を解除するためにActorを監視する
      context.watch(actorRef)
      actors += actorRef
    case Terminated(actorRef) =>
      // GraphStageのActorが停止したら監視を停止して登録を解除する
      context.unwatch(actorRef)
      actors -= actorRef
    case message: SendMessage =>
      actors.foreach(_ ! message)
  }

}

object GraphStageManager {
  def props: Props = Props[GraphStageManager]

  case class SendMessage(message: String)

  case class Register(actorRef: ActorRef)
}

MergeHub & BroadcastHub

動的なin/outにはMergeHubとBroadcastHubが使えます。

Dynamic fan-in and fan-out with MergeHub, BroadcastHub and PartitionHub

Combining dynamic stages to build a simple Publish-Subscribe service

公式ドキュメントのサンプルコードをそのままコピペすると以下のようになります。

class HubController @Inject() (cc: ControllerComponents,
                               addToken: CSRFAddToken)
                              (implicit val mat: Materializer)
  extends AbstractController(cc) {

  private[this] val (sink, source) =
    MergeHub.source[String](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
      .run()

  def index = addToken(Action { implicit request =>
    Ok(views.html.hub(CSRF.getToken.get))
  })

  def receiveMessage = Action(parse.json[Message]) { request =>
    Source.single(request.body.toString).runWith(sink)
    Ok
  }

  def sse = Action {
    Ok.chunked(source via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }

}

BroadcastHub.sink()には何度でもSourceを流すことができるので、メッセージが投稿されるたびにSource.single()で投稿されたメッセージをブラウザへ送信しています。

MergeHubBroadcastHubも内部的にはGraphStageで実装されていますので、興味があればソースを眺めてみるのも良いでしょう。

おわりに

Akka-Streamの詳細にはあまり立ち入らずSourceの使用方法にフォーカスした記事でしたが、いかがだったでしょうか?

この記事がPlay FrameworkでSSEを実装する際の一助になれば幸いです。

参考リンク