文章目录
- 定义返回Future的方法
- 阻塞方式获取Future的值
- 非阻塞方式获取Future的值
- Future链
- flatmap VS map
- Future.sequence() VS Future.traverse()
- Future.foldLeft VS Future reduceLeft
- Future firstCompletedOf
- Future zip VS zipWith
- Future andThen
- 自定义threadpool
- recover() recoverWith() and fallbackTo()
- promise
在scala中可以方便的实现异步操作,这里是通过Future来实现的,和java中的Future很相似,但是功能更加强大。
定义返回Future的方法
下面我们看下如何定义一个返回Future的方法:
println("Step 1: Define a method which returns a Future") import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global def donutStock(donut: String): Future[Int] = Future { // assume some long running database operation println("checking donut stock") 10 }
注意这里需要引入scala.concurrent.ExecutionContext.Implicits.global, 它会提供一个默认的线程池来异步执行Future。
阻塞方式获取Future的值
println("\nStep 2: Call method which returns a Future") import scala.concurrent.Await import scala.concurrent.duration._ val vanillaDonutStock = Await.result(donutStock("vanilla donut"), 5 seconds) println(s"Stock of vanilla donut = $vanillaDonutStock")
donutStock() 是异步执行的,我们可以使用Await.result() 来阻塞主线程来等待donutStock()的执行结果。
下面是其输出:
Step 2: Call method which returns a Future checking donut stock Stock of vanilla donut = 10
非阻塞方式获取Future的值
我们可以使用Future.onComplete() 回调来实现非阻塞的通知:
println("\nStep 2: Non blocking future result") import scala.util.{Failure, Success} donutStock("vanilla donut").onComplete { case Success(stock) => println(s"Stock for vanilla donut = $stock") case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e") } Thread.sleep(3000)
Future.onComplete() 有两种可能情况,Success 或者 Failure,需要引入: import scala.util.{Failure, Success}。
Future链
有时候我们需要在获得一个Future之后再继续对其进行操作,有点类似于java中的管道,下面看一个例子:
println("\nStep 2: Define another method which returns a Future") def buyDonuts(quantity: Int): Future[Boolean] = Future { println(s"buying $quantity donuts") true }
上面我们又定义了一个方法,用来接收donutStock()的返回值,然后再返回一个Future[Boolean] 。
我们看下使用flatmap该怎么链接他们:
println("\nStep 3: Chaining Futures using flatMap") val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty)) import scala.concurrent.Await import scala.concurrent.duration._ val isSuccess = Await.result(buyingDonuts, 5 seconds) println(s"Buying vanilla donut was successful = $isSuccess")
同样的,我们还可以使用for语句来进行链接:
println("\nStep 3: Chaining Futures using for comprehension") for { stock <- donutStock("vanilla donut") isSuccess <- buyDonuts(stock) } yield println(s"Buying vanilla donut was successful = $isSuccess") Thread.sleep(3000)
flatmap VS map
map就是对集合中的元素进行重映射,而flatmap则会将返回的值拆散然后重新组合。 下面举个直观的例子:
val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))
flatMap返回的值是Future[Boolean]。
val buyingDonuts: Future[Future[Boolean]] = donutStock("plain donut").Map(qty => buyDonuts(qty))
map返回的值是Future[Future[Boolean]]。
Future.sequence() VS Future.traverse()
如果我们有很多个Future,然后想让他们并行执行,则可以使用 Future.sequence() 。
println(s"\nStep 2: Create a List of future operations") val futureOperations = List( donutStock("vanilla donut"), donutStock("plain donut"), donutStock("chocolate donut") ) println(s"\nStep 5: Call Future.sequence to run the future operations in parallel") val futureSequenceResults = Future.sequence(futureOperations) futureSequenceResults.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
Future.traverse() 和Future.sequence() 类似, 唯一不同的是,Future.traverse()可以对要执行的Future进行操作,如下所示:
println(s"\nStep 3: Call Future.traverse to convert all Option of Int into Int") val futureTraverseResult = Future.traverse(futureOperations){ futureSomeQty => futureSomeQty.map(someQty => someQty.getOrElse(0)) } futureTraverseResult.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
Future.foldLeft VS Future reduceLeft
foldLeft 和 reduceLeft 都是用来从左到右做集合操作的,区别在于foldLeft可以提供默认值。看下下面的例子:
println(s"\nStep 3: Call Future.foldLeft to fold over futures results from left to right") val futureFoldLeft = Future.foldLeft(futureOperations)(0){ case (acc, someQty) => acc + someQty.getOrElse(0) } futureFoldLeft.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
输出结果:
Step 3: Call Future.foldLeft to fold over futures results from left to right Results 20
println(s"\nStep 3: Call Future.reduceLeft to fold over futures results from left to right") val futureFoldLeft = Future.reduceLeft(futureOperations){ case (acc, someQty) => acc.map(qty => qty + someQty.getOrElse(0)) } futureFoldLeft.onComplete { case Success(results) => println(s"Results $results") case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}") }
输出结果:
Step 3: Call Future.reduceLeft to fold over futures results from left to right Results Some(20)