Futures, introduced into Scala standard library in version 2.10, quickly became one of the cornerstones of Scala concurrency. Their main selling point is the ease of composition. A standard example of that involves asynchronous calls to a number of remote services and computation of the answer once all of the responses are collected:
def price(itemId: Id): Future[Price] = ???
def availability(itemId: Id): Future[Int] = ???
def orderQuote(itemId: Id, quantityRequired: Int): Future[Option[Price]] = for {
unitPrice <- price(itemId)
quantityAvailable <- availability(itemId)
} yield if (quantityAvailable >= quantityRequired) Some(quantityRequired * unitPrice) else None
Here price
and availability
are service calls that can potentially take long time and are meant to be executed in parallel.
However, the usefulness of futures is not restricted to the concurrent setting. They also provide a nice pattern for suspending execution of a sequential algorithm until a certain condition is met. There is nothing inherently concurrent about Future
implementation either – the runtime behaviour depends on the instance of ExecutionContext
provided to functions that can spawn new computations.
The example below illustrates how futures and promises facilitate suspension of effects until the required condition – the availability of price in this case – is met.
import java.util.concurrent.Executor
import scala.collection.mutable
import scala.concurrent._
import scala.util.{Success, Failure}
// thanks to being implicit this execution context will be used by all Future methods that
// can spawn new Futures, making all spawned computations synchronous
implicit val synchronousExecutionContext = ExecutionContext.fromExecutor(new Executor {
def execute(task: Runnable) = task.run()
})
object PriceService {
val prices = mutable.HashMap[String, Int]()
val pendingRequests = mutable.HashMap[String, Set[Promise[Int]]]() withDefaultValue Set()
def setPrice(symbol: String, price: Int): Unit = {
prices += (symbol -> price)
println(s"${Thread.currentThread.getName}: price received: $symbol: $price")
pendingRequests(symbol).foreach(_.success(price))
}
def price(symbol: String): Future[Int] =
if (prices.contains(symbol)) Future.successful(prices(symbol))
else {
val request = Promise[Int]()
val updatedRequests = if (pendingRequests.contains(symbol)) pendingRequests(symbol) + request
else Set(request)
pendingRequests += (symbol -> updatedRequests)
request.future
}
}
def publishValuation(symbol: String, quantity: Int): Unit =
PriceService.price(symbol).map(_ * quantity).onComplete {
case Success(value) => println(s"${Thread.currentThread.getName}: $quantity x $symbol: $value)")
case Failure(e) => println(s"error: $e")
}
PriceService.setPrice("AAA", 1013)
publishValuation("AAA", 20)
publishValuation("BBB", 30)
PriceService.setPrice("BBB", 221)
This code prints out:
main: price received: AAA: 1013
main: 20 x AAA: 20260)
main: price received: BBB: 221
main: 30 x BBB: 6630)
demonstrating that unfulfilled condition doesn’t block the program even though only a single thread is used – and all that happens without breaking the abstraction boundaries between PriceService
and the valuation publishing code.