FLINTERS Engineer's Blog

FLINTERSのエンジニアによる技術ブログ

AWS SQS + Lambda を利用した Slack 通知アプリケーションを Scala で実装して Serverless Framework でデプロイする

お疲れさまです。中途2年目の堀越です。

表題の通り AWS のサービスを活用した Slack 通知アプリケーションを
開発する機会がありましたのでアウトプットです。

尚、今回はイベントソースについて詳細には触れませんのでご了承ください。
こちらについてはまたの機会に。

アプリケーション概要


  1. SQS に溜めたメッセージを Lambda がポーリング
  2. 読み込んだメッセージを Slack に POST
  3. 通知が成功したメッセージを SQS から削除

f:id:t_horikoshi:20180814203642j:plain

本ブログにおいてはコアの機能となっている、
メッセージの読み込み、通知が成功した場合にメッセージを削除する機能について解説していきます。

開発の準備


Serverless Framework インストール

タイトルにある通り、Lambdaのデプロイ及び動作確認には、
Serverless Framework を利用しました。
serverless.com

インストールは Quick Start#Pre-requisites から

YAMLファイルをちょこっと書くだけで、
アプリケーション構成を定義できるので大変便利でした。

今回はAWS Lambdaの構成管理に利用しましたが、
他にもGCP, Azureなど様々なプラットフォームに対応しているところも魅力的です。

環境変数 AWS_PROFILE

SQS, Lambda, S3, CloudFormation への更新が必要なので、
対象の credential に適切なポリシーを設定してください。

サービスの作成

インストールした Serverless を利用してコマンドラインからサービスを作成していきます。

今回は Lambda を Scala で実装するので、
テンプレートには aws-scala-sbt を指定します。

$ sls create --template aws-scala-sbt

結果は下記の通り。

├── build.sbt
├── project
│   ├── assembly.sbt
│   ├── build.properties
│   └── plugins.sbt
├── serverless.yml
└── src
    └── main
        └── scala
            └── hello
                ├── ApiGatewayResponse.scala
                ├── Handler.scala
                ├── Request.scala
                └── Response.scala

build.sbt 修正

libraryDependencies に必要なライブラリを追加していきます。

libraryDependencies ++= Seq(
  "com.amazonaws"  % "aws-lambda-java-events" % "1.3.0",
  "com.amazonaws"  % "aws-lambda-java-core"   % "1.1.0",

  /* 下記を追加 */ 

  "com.eed3si9n" %% "gigahorse-okhttp" % "0.3.0",
  "io.circe" %% "circe-core" % "0.9.3",
  "io.circe" %% "circe-generic" % "0.9.3",
  "io.circe" %% "circe-parser" % "0.9.3",
  "com.typesafe" % "config" % "1.3.2",
  "com.amazonaws" % "aws-java-sdk-sqs" % "1.11.386",
  "com.amazonaws" % "aws-java-sdk-core" % "1.11.386",
  "com.amazonaws" % "jmespath-java" % "1.11.386"
)

用途としては下記の通りです。

  • aws-java-sdk-sqs
    • SQSからのメッセージ読み込みと削除処理の実装
  • aws-java-sdk-core, jmespath-java
  • gigahorse-okhttp
    • Slack Web Api の Http Client に利用
  • circe-core, circe-generic, circe-parser
    • メッセージ の Decode 処理
  • config
    • application.conf 読み込み

Lambda関数を開発していく


Lambda関数を実装していきます。
まずは、src / main / scala ディレクトリの配下に適当なディレクトリを用意して実装していきます。

今回は sample ディレクトリでいきます。

Amazon SQS メッセージの受信・削除を行うクラス

aws-java-sdk-sqs を利用して メッセージの受信・削除を行うクラスを作成します。

基本的にはキューのURLを指定して、
行いたいオペレーションのメソッドを定義するだけなので実装はシンプルです。

package sample

import com.amazonaws.services.sqs.model.{Message, ReceiveMessageRequest}
import com.amazonaws.services.sqs.{AmazonSQS, AmazonSQSClientBuilder}
import com.typesafe.config.Config

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

class SQSClient(config: Config) {

  private val queueUrl = config.getString("aws.sqs.queue.url")
  private val sqsClient: AmazonSQS = AmazonSQSClientBuilder.defaultClient()
  private val receiveRequest: ReceiveMessageRequest = {
    val request = new ReceiveMessageRequest
    request.setMaxNumberOfMessages(10)
    request.setQueueUrl(queueUrl)
    request
  }

  def receive(implicit ec: ExecutionContext): Future[Seq[Message]] =
    Future {
      sqsClient.receiveMessage(receiveRequest).getMessages.asScala
    }

  def delete(msg: Message)(implicit ec: ExecutionContext): Future[Unit] =
    Future{
      sqsClient.deleteMessage(queueUrl, msg.getReceiptHandle)
    }

}

各オペレーションのリクエストに対し細かいオプションを設定することができるので、
今回は受信時のリクエストに最大受信数を設定することにしました。

詳しくは ReceiveMessageRequest (AWS SDK for Java - 1.11.401) 参照。

Slack通知を行うクラス

Slack APIchat.postMessage に対してPOSTリクエストするクラスです。
Httpクライアントには Gigahorse を利用しました。

package sample

import com.amazonaws.services.sqs.model.Message
import com.typesafe.config.Config
import gigahorse.support.okhttp.Gigahorse
import gigahorse.{HeaderNames, MimeTypes, Request}

import scala.concurrent.{ExecutionContext, Future}

class SlackClient(config: Config) {

  private val Url = config.getString("slack.api.post.message.url")
  private val ApiToken = config.getString("slack.api.token")
  private val requestWithBody: String => Request = { body: String =>
    Gigahorse
      .url(Url)
      .post(body)
      .addHeaders(
        HeaderNames.AUTHORIZATION -> s"Bearer $ApiToken",
        HeaderNames.CONTENT_TYPE -> MimeTypes.JSON
      )
  }

  def post(message: Message)(implicit ec: ExecutionContext): Future[Unit] =
    Gigahorse.withHttp(Gigahorse.config) { http =>
      http
        .run(requestWithBody(message.getBody), Gigahorse.asString)
        .flatMap { s =>
          SlackAPIResponse
            .fromJsString(s)
            .fold(e => Future.failed(e), _ => Future.successful(()))
        }
    }
}

今回は割愛してますが、 Gigahorse — Extending Gigahorse で紹介されている
OAuth認証処理をラップしたクラスの実装例など非常に参考になりました。

Slack APIレスポンスの処理

Slack APIのレスポンスは通知が失敗した場合でもステータスコード200で結果を返してくるため、
下記のようなモデルにパースして、失敗していた場合は独自例外を返すようしました。

package sample

import io.circe.Decoder
import io.circe.parser.decode

case class SlackAPIResponse(ok: Boolean, error: Option[String])

object SlackAPIResponse {

  class SlackPostMessageError(
      message: String,
      cause: Option[Throwable] = None
  ) extends Exception(message, cause.orNull) {
    def this(message: String, cause: Throwable) = this(message, Some(cause))
  }

  object SlackPostMessageError {
    def apply(stringOpt: Option[String]): SlackPostMessageError =
      stringOpt
        .map(msg => new SlackPostMessageError(msg))
        .getOrElse(new SlackPostMessageError("There is no errors"))
  }

  implicit val decoder: Decoder[SlackAPIResponse] =
    Decoder.forProduct2("ok", "error")(SlackAPIResponse.apply)

  def fromJsString(
      jsonString: String): Either[SlackPostMessageError, SlackAPIResponse] =
    decode[SlackAPIResponse](jsonString) match {
      case Right(response @ SlackAPIResponse(true, _)) =>
        Right(response)
      case Right(_ @SlackAPIResponse(false, msgOpt)) =>
        Left(SlackPostMessageError(msgOpt))
      case Left(e) =>
        Left(new SlackPostMessageError(e.getMessage, e))
    }
}

circe が非常に便利です。 circe.github.io

Handlerの実装

AWS Lambda から呼び出される関数を定義します。
先に解説した SQSClient, SlackClient を駆動してコアとなる機能を実装していきます。

package sample

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import com.typesafe.config.ConfigFactory
import sample.SampleHandler.{SampleRequest, SampleResponse}

import scala.beans.BeanProperty
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal

object SampleHandler {

  class SampleRequest(@BeanProperty var value: String) {
    def this() = this("")
  }
  case class SampleResponse(
      @BeanProperty value: String = "slack message post done.")
}

class SampleHandler extends RequestHandler[SampleRequest, SampleResponse] {

  import scala.concurrent.ExecutionContext.Implicits.global

  private val config = ConfigFactory.load()
  private val sqsClient = new SQSClient(config)
  private val slackClient = new SlackClient(config)

  def handleRequest(input: SampleRequest, context: Context): SampleResponse = {
    val eventualUnit = for {
      messages <- sqsClient.receive
      eventualMessages = messages.map(m => slackClient.post(m).map(_ => m))
      eventualUnits = eventualMessages.map(_.flatMap(sqsClient.delete))
      result = eventualUnits.map(_.recover(printStackTraceInCaseOfFailure()))
      _ <- Future.sequence(result)
    } yield SampleResponse()

    Await.result(eventualUnit, 300.second)
  }

  private def printStackTraceInCaseOfFailure(): PartialFunction[Throwable, Unit] = {
    case NonFatal(e) => e.printStackTrace()
  }
}

Await.result の第二引数( atMost ) に設定しているのは Lambda の最大実行時間です。

Slack通知が失敗したメッセージの削除はスキップしてひとまずスタックトレースを吐くことにしました。
キューに残ったメッセージの後始末は最大受信数を設定してデッドレターキューに投げてしまうのが良さそうですね。

docs.aws.amazon.com

application.conf

src / main / resourcesapplication.conf を作成します。
キューのURLとSlackのAPIトークンは環境変数から読み込むことにました。

aws.sqs.queue.url=${QUEUE_URL}
slack.api.token=${SLACK_API_TOKEN}
slack.api.post.message.url="https://slack.com/api/chat.postMessage"

serverless.yml

Lambda関数やSQSの構成を定義していきます。

service: sample-slack-post

frameworkVersion: "=1.30.0"

provider:
  name: aws
  runtime: java8
  stage: ${opt:stage, 'dev'}
  region: ap-northeast-1
  iamRoleStatements:
    -  Effect: "Allow"
       Action:
       - "sqs:*"
       Resource:
         Fn::ImportValue: ${self:service}:${self:provider.stage}:SampleQueueArn

package:
  artifact: target/scala-2.12/hello.jar

functions:
  sample:
    handler: sample.SampleHandler
    name: sample-slack-post-function-${self:provider.stage}
    timeout: 300
    memorySize: 256
    environment:
      QUEUE_URL:
        Ref: sampleQueue
      SLACK_API_TOKEN: "slack api token required"

resources:
  Resources:
    sampleQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: sample-slack-post-${self:provider.stage}
        ReceiveMessageWaitTimeSeconds: 5
        VisibilityTimeout: 10
  Outputs:
    SampleQueueArn:
      Description: The ARN for the Sample Queue
      Value:
        "Fn::GetAtt": [ sampleQueue, Arn ]
      Export:
        Name: ${self:service}:${self:provider.stage}:SampleQueueArn

SQSに関して、実際の業務では CloudFormationでは構築しましたが、
せっかくなので resources で構築してみました。

Scalaのコードが参照するキューのURLとSlackのAPIトークンは、
environment から環境変数に登録しています。

実際の開発では、キューのURLなど環境毎に切り替えが必要な情報は、
開発用、ステージング用にファイルを切り出して対応しました。
serverless.com

デプロイ

カレントディレクトリ上で下記のコマンドを実行していきます。

パッケージング
$ sbt assembly
デプロイ
$ sls deploy -v

AWSコンソール上からも serverless.yml に定義した Lambda と SQS がクラウド上に、
作成されていることが確認できます。

Lambda f:id:t_horikoshi:20180820013252p:plain

SQS f:id:t_horikoshi:20180820013436p:plain

デプロイすると Serverless が CloudFormation に Stack を作成します。

失敗することがありますが、
環境変数 AWS_PROFILE に登録に入力したプロファイルの権限が問題なことが多いのでポリシーの見直しが必要でしょう。

また、正しい権限に修正しても一度作成した CloudFormation の Stack をうまく削除することができずに再実行が失敗することがあるので、
そんなときは迷わず下記のコマンドを実行すれば大体のことは解決できそうでした。

$ sls remove

動かしてみる


メッセージの送信

テストデータとして aws-cli からキューにメッセージを送信します。
メッセージの内容は Slack API のリクエストボディに設定する Json です。

今回は channel, text, username を指定したシンプルな Json を送信します。

$ aws sqs send-message --queue-url <キューのURL> --message-body '{"channel":"@t_horikoshi","text":"test","username":"LambdaBot"}'

Lambdaの実行

serverless.yml の functions.name に指定した名前を指定して、
Lambdaを実行します。

$ sls invoke -f sample -l

🎉🎉🎉ヒャッホウ!!!🎉🎉🎉

通知を確認できました

f:id:t_horikoshi:20180820015330p:plain

開発してみて


Serverless Framework

Lambda の設定に関して、
実際の開発では timeoutmemorySize に加えて reservedConcurrency, events.scheduleserverless.yml に定義しました。

serverless.com

serverless.com

これらの構成管理やクラウドへの反映が大変容易に感じました。
また依存するSQSのリソース定義が一元管理できる点も魅力的です。

CloudFormation や terraform の知見がある人にとっては、
キャッチアップが容易な気がします。

便利。

AWS SQS

可視性タイムアウトやデッドレターキューの仕組みなど、
よく考えられているなーっと感心しました。

デッドレターキューに送信した不正メッセージの処理については、
AWS SNSなどで分析できるという話でした。 docs.aws.amazon.com

素敵。

AWS Lambda

動きっぱなしのEC2なんかで動く簡素な日次バッチ処理なんかは、
全部Lambdaに移行してしまえばコストカットにつながるのかなという印象でした。

同時実行起動数やイベントソースのサポートが大変充実しているので、
割と色んなユースケースに柔軟に対応できる気がします。

実はSQSのイベントソースがサポートされているので、
開発でも試してみたかったのですが進捗の都合で今回は断念しました...、無念。

機会があれば触ってみたいと思います。

無念。

辛かったところ

Slack APIchat.postMessage ですが
Channel 毎に秒間1リクエストしか受け付けないという RateLimit がありまして、
このあたりの制御がちょっと厄介でした。

Slackに通知する際、リクエストが1回で済むようにチャンネル単位でメッセージを集計したり、
Throttle制御入れたり、Lambdaの同時実行数を絞ったりなどなど...

解説では割愛してますがいろいろやってます。

このあたりはちょっとしんどかったです。

何でもかんでもSlack通知するっていうのはもうやめようぜ。
そんな気持ちになりました。

終わりに

今回紹介したコードは GitHub に公開しておきました。

github.com

どなたかの参考になればと。

総じてAWSクラウドサービスは便利ですね。
いろいろできるようになりたいので今後も仲良くしていきたいです。

本当はイベントソースやデッドレターキューの
お話などをもう少し詳しく解説に交えたかったのですがそれはまたの機会に。

では、また。