Scala入门到精通——第二十六节 Scala并发编程基础

简介: 作者:摇摆少年梦 视频地址:http://www.xuetuwuyou.com/course/12本节主要内容Scala并发编程简介Scala Actor并发编程模型react模型Actor的几种状态Actor深入使用解析1. Scala并发编程简介2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over

作者:摇摆少年梦
视频地址:http://www.xuetuwuyou.com/course/12

本节主要内容

  1. Scala并发编程简介
  2. Scala Actor并发编程模型
  3. react模型
  4. Actor的几种状态
  5. Actor深入使用解析

1. Scala并发编程简介

2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 “内核”(虚拟 CPU)所取代。这一发现对编程社区造成了不小的冲击,因为正确创建线程安全的代码,在理论而非实践中,始终会提高高性能开发人员的身价,而让各公司难以聘用他们。看上去,仅有少数人充分理解了 Java 的线程模型、并发 API 以及 “同步” 的含义,以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在(来源:http://www.ibm.com/developerworks/cn/java/j-scala02049.html)。

在Java中,要编写一个线程安全的程序并不是一件易事,例如:

class Account {  
    private int balance;  

    synchronized public int getBalance() {  
      return balance;  
    }  

    synchronized public void incrementBalance() {  
      balance++;  
    }  
}  

上面这段java代码虽然方法前面加了synchronized ,但它仍然不是线程安全的,例如,在执行下面两个语句

account.incrementBalance();  
account.getBalance();

时,有可能account.incrementBalance()执行完成后,其它线程可能会获取对象的锁,修改account的balance,从而造成得不到预期结果的问题。解决问题的方法是将两个功能结合起来形成一个方法:

synchronized public int incrementAndGetBalance() {  
  balance++;  
  return balance;  
}  

但这可能并不是我们想要的,每次获取balance都要将balance增加, 这显然与实际不符。除此之外,java中的并发编程可能还会经常遇到死锁问题,而这个问题往往难调试,问题可能会随机性的出现。总体上来看,java的并发编程模型相对较复杂,难以驾驭。

Scala很好地解决了java并发编程的问题,要在scala中进行并发编程,有以下几种途径可以实现:
1 actor消息模型、akka actor并发模型。

2 Thread、Runnable

3 java.util.concurennt

4 第三方开源并发框架如Netty,Mina

在上述四种途径当中,利用 actor消息模型、akka actor并发模型是scala并发编程的首先,本节主要介绍actor消息模型,akka actor并发模型我们将放在后面的章节中介绍。
在scala中,通过不变对象来实现线程安全,涉及到修改对象状态时,则创建一个新的对象来实现,如:

//成员balance状态一旦被赋值,便不能更改
//因而它也是线程安全的
class Person(val age: Integer) {  
  def getAge() = age
}  

object Person{  
  //创建新的对象来实现对象状态修改
  def increment(person: Person): Person{  
    new Person(Person.getAge() + 1)  
  }  
}  

通过不变对象实现并发编程,可以简化编程模型,使并发程序更容易现实和控制。

2.Scala Actor并发编程模型

java中的并发主要是通过线程来实现,各线程采用共享资源的机制来实现程序的并发,这里面临竞争资源的问题,虽然采用锁机制可以避免竞争资源的问题,但会存在死锁问题,要开发一套健壮的并发应用程序具有一定的难度。而scala的并发模型相比于java它更简单,它采用消息传递而非资源共享来实现程序的并发,消息传递正是通过Actor来实现的。下面的代码给出了Actor使用示例

//混入Actor特质,然后实现act方法
//如同java中的Runnable接口一样
//各线程的run方法是并发执行的
//Actor中的act方法也是并发执行的
class ActorDemo extends Actor{
  //实现 act()方法
  def act(){
    while(true){
      //receive从邮箱中获取一条消息
      //然后传递给它的参数
      //该参数是一个偏函数
      receive{
        case "actorDemo" => println("receive....ActorDemo")
      }      
    }
  }
}
object ActorDemo extends App{
  val actor=new ActorDemo
  //启动创建的actor 
  actor.start()
  //主线程发送消息给actor
  actor!"actorDemo"
}

下面给的是recieve方法的部分源代码

def receive[R](f: PartialFunction[Any, R]): R = {
    assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")

    synchronized {
      if (shouldExit) exit() // links
      drainSendBuffer(mailbox)
    }

    var done = false
    while (!done) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = replyTo :: senders
        val matches = f.isDefinedAt(m)
        senders = senders.tail
        matches
      })
................

从上述代码中不能看出,receive方法接受的参数是一个偏函数,并且是通过mailbox来实现消息的发送与接收。

在前述的class ActorDemo中,receive方法的参数为

{
        case "actorDemo" => println("receive....ActorDemo")
}      

该代码块在执行时被转换为一个PartialFunction[Any, R]的偏函数,其中R是偏函数的返回类型,对应case 语句=> 右边的部分,在本例子中R是Unit类型,而Any对应的则对应case语句的模式部分。

前面给的是通过extends Actor的方式来创建一个Actor类,其实scala.actors.Actor中提供了一个actor工具方法,可以非常方便地直接创建Actor对象如:

import scala.actors.Actor._

object ActorFromMethod extends App{
  //通过工具方法actor直接创建Actor对象
  val methodActor = actor {
    for (i <- 1 to 5)
      println("That is the question.")
      Thread.sleep(1000)
  }
}

上述代码创建的actor对象无需调用start方法,对象创建完成后会立即执行。

scala中本地线程也可用作Actor,下面的代码演示了如何在REPL命令行中将本地线程当作Actor;

scala> import scala.actors.Actor._
import scala.actors.Actor._

//self引用本地线程,并发送消息
scala> self ! "hello"
//接收消息
scala> self.receive { case x:String => x }
res1: String = hello

上述代码中,如果发送的消息不是String类型的,线程将被阻塞,为避免这个问题,可以采用receiveWithin方法,

scala> self ! 123

scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123

scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT

3. react模型

scala中的Actor也是构建在java线程基础之上的,前面在使用Actor时都是通过创建Actor对象,然后再调用act方法来启动actor。我们知道,java中线程的创建、销毁及线程间的切换是比较耗时的,因此实际中尽量避免频繁的线程创建、销毁和销毁。Scala中提供react方法,在方法执行结束后,线程仍然被保留。下面的代码演示了react方法的使用:

package cn.scala.xtwy.concurrency
import scala.actors._

object NameResolver extends Actor {
  import java.net.{ InetAddress, UnknownHostException }
  def act() {
    react {
      //匹配主线程发来的("www.scala-lang.org", NameResolver)
      case (name: String, actor: Actor) =>
        //向actor发送解析后的IP地址信息
        //由于本例中传进来的actor就是NameResolver自身
        //也即自己给自己发送消息,并存入将消息存入邮箱
        actor ! getIp(name)
        //再次调用act方法,试图从邮箱中提取信息
        //如果邮箱中信息为空,则进入等待模式
        act()
      case "EXIT" =>
        println("Name resolver exiting.")
      // quit
      //匹配邮箱中的单个信息,本例中会匹配邮箱中的IP地址信息
      case msg =>
        println("Unhandled message: " + msg)
        act()
    }
  }
  def getIp(name: String): Option[InetAddress] = {
    try {
      Some(InetAddress.getByName(name))
    } catch {
      case _: UnknownHostException => None
    }
  }
}
object Main extends App{
  NameResolver.start()
  //主线程向NameResolver发送消息("www.scala-lang.org", NameResolver)
  NameResolver ! ("www.scala-lang.org", NameResolver)
  NameResolver ! ("wwwwww.scala-lang.org", NameResolver)

}

从上述代码中可以看到,通过在react方法执行结束时加入act方法,方法执行完成后没有被销毁,而是继续试图从邮箱中获取信息,获取不到则等待。

4. Actor的几种状态

Actor有下列几种状态:

  • 初始状态(New),Actor对象被创建,但还没有启动即没有执行start方法时的状态
  • 执行状态(Runnable),正在执行时的状态
  • 挂起状态(Suspended),在react方法中等待时的状态
  • 时间点挂起状态(TimedSuspended),挂起状态的一种特殊形式,reactWithin方法中的等待时的状态
  • 阻塞状态(Blocked),在receive方法中阻塞等待时的状态
  • 时间点阻塞状态(TimedBlocked),在receiveWithin方法中阻塞等待时的状态
  • 结束状态(Terminated),执行完成后被销毁

5. Actor深入使用解析

本小节的例子来源:http://www.ibm.com/developerworks/cn/java/j-scala04109.html

1 receive方法单次执行:


object Actor2
  {
    case class Speak(line : String)
    case class Gesture(bodyPart : String, action : String)
    case class NegotiateNewContract()

    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          //这里receive方法只会匹配一次便结束
          receive
          {
            case NegotiateNewContract =>
              System.out.println("I won't do it for less than $1 million!")
            case Speak(line) =>
              System.out.println(line)
            case Gesture(bodyPart, action) =>
              System.out.println("(" + action + "s " + bodyPart + ")")
            case _ =>
              System.out.println("Huh? I'll be in my trailer.")
          }
        }
      //receive方法只处理下面这条语句发送的消息
      badActor ! NegotiateNewContract
      //下面其余的消息不会被处理
      badActor ! Speak("Do ya feel lucky, punk?")
      badActor ! Gesture("face", "grimaces")
      badActor ! Speak("Well, do ya?")
    }
  }

上述代码只会输出:
I won’t do it for less than $1 million!
即后面发送的消息如:
badActor ! Speak(“Do ya feel lucky, punk?”)
badActor ! Gesture(“face”, “grimaces”)
badActor ! Speak(“Well, do ya?”)
不会被处理。这是因为receive方法的单次执行问题。

2 能够处理多个消息的receive方法:

object Actor2
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String, action : String);
    case class NegotiateNewContract()
    //处理结束消息
    case class ThatsAWrap()

    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          var done = false
          //while循环
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                System.out.println("I won't do it for less than $1 million!")
              case Speak(line) =>
                System.out.println(line)
              case Gesture(bodyPart, action) =>
                System.out.println("(" + action + "s " + bodyPart + ")")
              case ThatsAWrap =>
                System.out.println("Great cast party, everybody! See ya!")
                done = true
              case _ =>
                System.out.println("Huh? I'll be in my trailer.")
            }
          }
        }
      //下面所有的消息都能被处理
      badActor ! NegotiateNewContract
      badActor ! Speak("Do ya feel lucky, punk?")
      badActor ! Gesture("face", "grimaces")
      badActor ! Speak("Well, do ya?")
      //消息发送后,receive方法执行完毕
      badActor ! ThatsAWrap
    }
  }

3 Actor后面实现原理仍然是线程的证据

object Actor3
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String, action : String);
    case class NegotiateNewContract;
    case class ThatsAWrap;

    def main(args : Array[String]) =
    {
      def ct =
        "Thread " + Thread.currentThread().getName() + ": "
      val badActor =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                System.out.println(ct + "I won't do it for less than $1 million!")
              case Speak(line) =>
                System.out.println(ct + line)
              case Gesture(bodyPart, action) =>
                System.out.println(ct + "(" + action + "s " + bodyPart + ")")
              case ThatsAWrap =>
                System.out.println(ct + "Great cast party, everybody! See ya!")
                done = true
              case _ =>
                System.out.println(ct + "Huh? I'll be in my trailer.")
            }
          }
        }

      System.out.println(ct + "Negotiating...")
      badActor ! NegotiateNewContract
      System.out.println(ct + "Speaking...")
      badActor ! Speak("Do ya feel lucky, punk?")
      System.out.println(ct + "Gesturing...")
      badActor ! Gesture("face", "grimaces")
      System.out.println(ct + "Speaking again...")
      badActor ! Speak("Well, do ya?")
      System.out.println(ct + "Wrapping up")
      badActor ! ThatsAWrap
    }
  }

执行结果如下:

Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!

从上述执行结果可以看到,Actor最终的实现仍然是线程,只不过它提供的编程模型与java中的编程模型不一样而已。

4 利用!?发送同步消息,等待返回值


import scala.actors._,Actor._


object ProdConSample2
  {
    case class Message(msg : String)

    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("Received message! -> " + msg)
                done = (msg == "DONE")
                reply("Already RECEIVED....."+msg)
            }
          }
        }

      System.out.println("Sending....")
      //获取响应值
      val r= consumer !? "Mares eat oats"
      println("replyed message"+r)
      System.out.println("Sending....")
      consumer !? "Does eat oats"
      System.out.println("Sending....")
      consumer !? "Little lambs eat ivy"
      System.out.println("Sending....")
      consumer !? "Kids eat ivy too"
      System.out.println("Sending....")
      consumer !? "DONE"      
    }
  }

代码执行结果:

Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE

通过上述代码执行结果可以看到,!?因为是同步消息,发送完返回结果后才会接着发送下一条消息。

5 Spawn方法发送消息

object ProdConSampleUsingSpawn
  {
    import concurrent.ops._

    def main(args : Array[String]) : Unit =
    {
      // Spawn Consumer
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("MESSAGE RECEIVED: " + msg)
                done = (msg == "DONE")
                reply("RECEIVED")
            }
          }
        }

      // Spawn Producer
      spawn  //spawn是一个定义在current.ops中的方法
      {
        val importantInfo : Array[String] = Array(
          "Mares eat oats",
          "Does eat oats",
          "Little lambs eat ivy",
          "A kid will eat ivy too",
          "DONE"
        );

        importantInfo.foreach((msg) => consumer !? msg)
      }
    }
  }

6 !! 发送异步消息,返回值是 Future[Any]

object ProdConSample3
  {
    case class Message(msg : String)

    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("Received message! -> " + msg)
                done = (msg == "DONE")
                reply("Already RECEIVED....."+msg)
            }
          }
        }

      System.out.println("Sending....")
      //发送异步消息,返回
      val replyFuture= consumer !! "Mares eat oats"
      val r=replyFuture()
      println("replyed message*****"+r)
      System.out.println("Sending....")
      consumer !! "Does eat oats"
      System.out.println("Sending....")
      consumer !! "Little lambs eat ivy"
      System.out.println("Sending....")
      consumer !! "Kids eat ivy too"
      System.out.println("Sending....")
      consumer !! "DONE"      
    }
  }

执行结果:

Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE

通过上述代码的执行结果可以看到,!!的消息发送是异步的,消息发送后无需等待结果返回便执行下一条语句,但如果要获取异步消息的返回值,如:

 val replyFuture= consumer !! "Mares eat oats"
      val r=replyFuture()

则执行到这两条语句的时候,程序先被阻塞,等获得结果之后再发送其它的异步消息。

7 loop方法实现react

object LoopReact extends App{
  val a1 = Actor.actor {
    //注意这里loop是一个方法,不是关键字
    //实现类型while循环的作用
    loop {
      react {
        //为整型时结束操作
        case x: Int=>println("a1 stop: " + x); exit()
        case msg: String => println("a1: " + msg)
      }
    }
  }

  a1!("我是摇摆少年梦")
  a1.!(23)

}

添加公众微信号,可以了解更多最新Spark、Scala相关技术资讯
这里写图片描述

目录
相关文章
|
4月前
|
分布式计算 Java Hadoop
Scala入门必刷的100道练习题(附答案)
Scala入门必刷的100道练习题(附答案)
535 1
|
4月前
|
Java 大数据 Scala
Scala入门【运算符和流程控制】
Scala入门【运算符和流程控制】
|
23天前
|
分布式计算 大数据 Java
Scala 入门指南:从零开始的大数据开发
Scala 入门指南:从零开始的大数据开发
|
2月前
|
Scala 开发者
Scala中的模式匹配与高阶函数:探索强大的编程范式
【7月更文挑战第11天】Scala中的模式匹配和高阶函数是两种极其强大的特性,它们不仅提升了代码的表达力和可读性,还使得开发者能够编写出更加灵活和可重用的解决方案。通过
|
2月前
|
前端开发 Scala
Scala并发编程的react、loop方法详解
在这个例子中,`MyActor`会无限循环接收和处理消息。当收到一个字符串消息时,它会打印出"Received: "加上消息内容。如果收到其他类型的消息,它会打印"Unknown message"。
17 1
|
2月前
|
分布式计算 大数据 Java
大数据开发语言Scala入门
大数据开发语言Scala入门
|
2月前
|
IDE 大数据 Java
「AIGC」大数据开发语言Scala入门
Scala,融合OOP和FP的多范式语言,在JVM上运行,常用于大数据处理,尤其与Apache Spark配合。要开始学习,安装Scala,选择IDE如IntelliJ。基础包括变量、数据类型、控制结构、函数。Scala支持类、对象、不可变数据结构、模式匹配和强大的并发工具。利用官方文档、教程、社区资源进行学习,并通过实践提升技能。
38 0
|
3月前
|
前端开发 Scala
Scala并发编程的react、loop方法详解
在这个例子中,`MyActor`会无限循环接收和处理消息。当收到一个字符串消息时,它会打印出"Received: "加上消息内容。如果收到其他类型的消息,它会打印"Unknown message"。
19 0
|
4月前
|
安全 编译器 Scala
何时需要指定泛型:Scala编程指南
本文是Scala编程指南,介绍了何时需要指定泛型类型参数。泛型提供代码重用和类型安全性,但在编译器无法推断类型、需要提高代码清晰度、调用泛型方法或创建泛型集合时,应明确指定类型参数。通过示例展示了泛型在避免类型错误和增强编译时检查方面的作用,强调了理解泛型使用时机对编写高效Scala代码的重要性。
35 1
何时需要指定泛型:Scala编程指南
|
4月前
|
Java Shell API
Scala入门【变量和数据类型】
Scala入门【变量和数据类型】