Future firstCompletedOf
firstCompletedOf在处理多个Future请求时,会返回第一个处理完成的future结果。
println(s"\nStep 3: Call Future.firstCompletedOf to get the results of the first future that completes") val futureFirstCompletedResult = Future.firstCompletedOf(futureOperations) futureFirstCompletedResult.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
Future zip VS zipWith
zip用来将两个future结果组合成一个tuple. zipWith则可以自定义Function来处理future返回的结果。
println(s"\nStep 3: Zip the values of the first future with the second future") val donutStockAndPriceOperation = donutStock("vanilla donut") zip donutPrice() donutStockAndPriceOperation.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
输出值:
Step 3: Zip the values of the first future with the second future checking donut stock Results (Some(10),3.25)
使用zipwith的例子:
println(s"\nStep 4: Call Future.zipWith and pass-through function qtyAndPriceF") val donutAndPriceOperation = donutStock("vanilla donut").zipWith(donutPrice())(qtyAndPriceF) donutAndPriceOperation.onComplete { case Success(result) => println(s"Result $result") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
输出结果:
Step 4: Call Future.zipWith and pass-through function qtyAndPriceF checking donut stock Result (10,3.25)
Future andThen
andThen后面可以跟一个自定义的PartialFunction,来处理Future返回的结果, 如下所示:
println(s"\nStep 2: Call Future.andThen with a PartialFunction") val donutStockOperation = donutStock("vanilla donut") donutStockOperation.andThen { case stockQty => println(s"Donut stock qty = $stockQty")}
输出结果:
Step 2: Call Future.andThen with a PartialFunction checking donut stock Donut stock qty = Success(10)
自定义threadpool
上面的例子中, 我们都是使用了scala的全局ExecutionContext:
scala.concurrent.ExecutionContext.Implicits.global.
同样的,我们也可以自定义你自己的ExecutionContext。下面是一个使用
java.util.concurrent.Executors的例子:
println("Step 1: Define an ExecutionContext") val executor = Executors.newSingleThreadExecutor() implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(executor) println("\nStep 2: Define a method which returns a Future") import scala.concurrent.Future def donutStock(donut: String): Future[Int] = Future { // assume some long running database operation println("checking donut stock") 10 } println("\nStep 3: Call method which returns a Future") val donutStockOperation = donutStock("vanilla donut") donutStockOperation.onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") } Thread.sleep(3000) executor.shutdownNow()
recover() recoverWith() and fallbackTo()
这三个方法主要用来处理异常的,recover是用来从你已知的异常中恢复,如下所示:
println("\nStep 3: Call Future.recover to recover from a known exception") donutStock("unknown donut") .recover { case e: IllegalStateException if e.getMessage == "Out of stock" => 0 } .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
recoverWith()和recover()类似,不同的是他的返回值是一个Future。
println("\nStep 3: Call Future.recoverWith to recover from a known exception") donutStock("unknown donut") .recoverWith { case e: IllegalStateException if e.getMessage == "Out of stock" => Future.successful(0) } .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
fallbackTo()是在发生异常时,去调用指定的方法:
println("\nStep 3: Call Future.fallbackTo") val donutStockOperation = donutStock("plain donut") .fallbackTo(similarDonutStock("vanilla donut")) .onComplete { case Success(donutStock) => println(s"Results $donutStock") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
promise
熟悉ES6的同学可能知道,promise是JS在ES6中引入的新特性,其主要目的是将回调转变成链式调动。
当然scala的promise和ES6的promise还是不一样的,我们看下scala中promise是怎么用的:
println("Step 1: Define a method which returns a Future") import scala.concurrent.ExecutionContext.Implicits.global def donutStock(donut: String): Int = { if(donut == "vanilla donut") 10 else throw new IllegalStateException("Out of stock") } println(s"\nStep 2: Define a Promise of type Int") val donutStockPromise = Promise[Int]() println("\nStep 3: Define a future from Promise") val donutStockFuture = donutStockPromise.future donutStockFuture.onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e") } println("\nStep 4: Use Promise.success or Promise.failure to control execution of your future") val donut = "vanilla donut" if(donut == "vanilla donut") { donutStockPromise.success(donutStock(donut)) } else { donutStockPromise.failure(Try(donutStock(donut)).failed.get) } println("\nStep 5: Completing Promise using Promise.complete() method") val donutStockPromise2 = Promise[Int]() val donutStockFuture2 = donutStockPromise2.future donutStockFuture2.onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e") } donutStockPromise2.complete(Try(donutStock("unknown donut")))
上面例子中我们使用了 Promise.success, Promise.failure, Promise.complete() 来控制程序的运行。