Septeni Engineer's Blog

セプテーニエンジニアが綴る技術ブログ

scala.concurrent.Future の実装を追ってみる

あけましておめでとうございます。id:i000i0 です。

去年の9月に入社して以来、Scalaを使って広告の運用ツールを開発しています。

Scalaで開発していると強力な標準ライブラリのお世話になりますが、その内部実装がどうなっているかについて気になることがあります。

そこで今回は、非同期処理に使う scala.concurrent.Future の実装についてコードを追ってみたいと思います。

ちなみにScalaのバージョンは現在の最新版である2.11.7です。

Futureオブジェクトの生成

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FFuture.scala#L29

Futureオブジェクトの生成から処理を追っていくとここにたどり着きます。 ここでは以下の様な処理を行っています。

  1. 「非同期に実行したい処理」を渡した PromiseCompletingRunnable オブジェクトを生成
  2. executor上 で 1. で生成したオブジェクトを実行
  3. 1.で生成したオブジェクトの promise.future を返す(これが scala.concurrent.Future型)

まずは 2. の executor について見てみます。

ExecutionContext

executor の型は ExecutionContext です。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FExecutionContext.scala#L61

traitになっており、複数の実装から利用するものを選べるようになっています。

例として、 java.util.concurrent.ExecutorService から生成する場合の ExecutionContext.fromExecutorService を見てみます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FExecutionContextImpl.scala#L133

execute メソッドは単に executor.execute を呼び出しています。

要するに渡された ExecutorService 上で非同期に実行しているだけですね。

この他にもExecutionContextの実装は幾つかありますが、そこを追うのは本筋とは外れるので省略します。

ちなみにFutureオブジェクト生成時に ExecutionContext が見つからなかった場合には以下の様なお馴染みの(?)エラーメッセージが出力されますが..

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> Future()
<console>:12: error: Cannot find an implicit ExecutionContext. You might pass
an (implicit ec: ExecutionContext) parameter to your method
or import scala.concurrent.ExecutionContext.Implicits.global.
       Future()
             ^

この scala.concurrent.ExecutionContext.Implicits.global は以下のようになっています。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FExecutionContext.scala#L130

lazy val なので、一度呼び出して初期化すると、以降の呼び出しでは同じインスタンスが返されます。

よってこのcontextをimportした処理では同じcontextが使い回されるということになります。

スレッドプールの設定等は各アプリケーションの開発者が明示的に制御したくなるはずであり、プロダクションコードではこのglobalなcontextを使うようなケースはあまりないと思います。

参考: アプリケーションに合ったExecutionContextを使う

PromiseCompletingRunnable

ExecutionContext については何となく理解できたような気分になったので、Futureオブジェクトの生成処理内で生成していた PromiseCompletingRunnable を見てみます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FFuture.scala#L19

ExecutionContext上で実行するためにRunnableを継承しています。 内部で DefaultPromise を生成していますが、こちらについては後述します。

runメソッドDefaultPromise#complete に、非同期に実行したい処理を渡しています。

非同期に実行したい処理が例外を投げなければ Success 、NonFatalな例外を投げたら Failure 、Fatalな例外ならそれをスロー、という感じです。

つまり、非同期で実行したかった処理を実際に呼び出しているのはここです。

ただし ExecutionContext 上で実行するので、非同期に実行されることになります。

DefaultPromise

PromiseCompletingRunnable 内で生成していた DefaultPromise について見てみます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FPromise.scala#L153

直前に大量のコメントが書いてありますが、コメントによると DefaultPromise は3つの状態のいずれかになるとのことです。

  1. 未完了(完了待ちコールバックのリスト。 List[CallbackRunnable] で表される)
  2. 完了(非同期処理の結果。Try[T] で表される)
  3. 他のPromiseとリンク(DefaultPromise[T] で表される)

この状態を保持するオブジェクトの型は AnyRef(Object) というなかなかワイルドな実装になっています。

コンストラクタ処理の先頭でupdateStateなるメソッドNilに設定していますが、これは状態を未完了(空のコールバックリスト) に設定しているということです。

なお、DefaultPromise は、scala.concurrent.impl.Promise を継承しています。

更にこいつは scala.concurrent.Promise と、 scala.concurrent.Future を継承しています。

最初に見たFutureオブジェクトの生成処理内で、 promise.future を返していたと思いますが、実はこれは DefaultPromiseインスタンスそのものです。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FPromise.scala#L21

Futureのメソッドの実装

ここまで読むと、何となくFutureの実装が読めそうな気分になってきたので、Futureに実装されているメソッドがどのような実装になっているかを見てみます。

map

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FFuture.scala#L233

onComplete を呼んでいます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FPromise.scala#L267

更に、 dispatchOrAddCallback を呼んでいます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2Fimpl%2FPromise.scala#L278

DefaultPromise の状態によって条件分岐しています。 状態は先述の通り3種類あります。

Futureの処理が完了している(状態が Try[_]) ならmapで渡した処理を実行し、まだFutureの処理が終わっていないならコールバックのリストにmapで渡した処理を追加、という感じです。

このような実装になっているため、既に処理が完了したFutureに対しても、その処理結果を利用する別の処理を、後から渡すことができます。

flatMap

先に書いたように DefaultPromise の状態は3つありました。

そのうちの一つに、他の Promise とリンクしているというのがあったと思いますが、この状態には flatMap メソッドを使った時になり得ます。

https://github.com/scala/scala/blob/v2.11.7/src%2Flibrary%2Fscala%2Fconcurrent%2FFuture.scala#L246

このような実装になっている理由についてはソースコード上にコメントで残されています。 曰く、

  • flatMapが再帰的に呼ばれた際に、単純にonCompleteとPromiseのチェーンを繋げていくと、最後のflatMapに渡したFutureの処理が完了した後でないとGCで回収されない
  • このスペースリークは内部的に行われ、ユーザーからは直接見えないため発生しやすい

この挙動を防ぐために、 DefaultPromise ではPromiseのチェーンを作る代わりに、チェーンを辿って大元のPromiseに直接リンクするという実装になっています。

まとめ

細かい部分まで追うとあまりにも長くなってしまうのでかなり省略しましたが、ScalaのFutureの実装について追ってみました。

内部実装を読んでみると、細かいところで効率化のための工夫がなされたりしていて、標準ライブラリの有り難みが増したような気がします。

これからも気になる機能については時間を見つけて内部実装を読んでみたいと思います。