Scala(第五节)actor并发编程、文件操作和网络请求、隐式转换和隐式参数、Akka并发编程、Akka模拟简易Spark通信

目录

actor并发编程

什么是Scala Actor

概念

Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息的发送、接收来实现高并发的。
Actor可以看作是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。一个Actor收到其他Actor的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。

java并发编程与Scala Actor编程的区别

Scala(第五节)actor并发编程、文件操作和网络请求、隐式转换和隐式参数、Akka并发编程、Akka模拟简易Spark通信
对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。
与Java的基于共享数据和锁的线程模型不同,Scala的actor包则提供了另外一种不共享任何数据、依赖消息传递的模型,从而进行并发编程。

Actor的执行顺序

  1. 首先调用start()方法启动Actor
  2. 调用start()方法后其act()方法会被执行
  3. 向Actor发送消息
  4. act方法执行完成之后,程序会调用exit方法

发送消息的方式

符号 含义
! 发送异步消息,没有返回值。
!? 发送同步消息,等待返回值。
!! 发送异步消息,返回值是 Future[Any]。

Actor实战

第一个例子

怎么实现actor并发编程:
1、定义一个class或者是object继承Actor特质,注意导包import scala.actors.Actor
2、重写对应的act方法
3、调用Actor的start方法执行Actor
4、当act方法执行完成,整个程序运行结束

import scala.actors.Actor

class Actor1  extends Actor{
  override def act(): Unit = {
    for(i <- 1 to 10){
      println("actor1====="+i)
    }
  }
}
object  Actor2 extends Actor{
  override def act(): Unit = {
    for(j <- 1 to 10){
      println("actor2====="+j)
    }
  }
}
object  Actor1{
def main(args: Array[String]): Unit = {
  val actor = new Actor1
  actor.act()
  Actor2.act()
}

}

第二个例子

怎么实现actor发送、接受消息:
1、定义一个class或者是object继承Actor特质,注意导包import scala.actors.Actor
2、重写对应的act方法
3、调用Actor的start方法执行Actor
4、通过不同发送消息的方式对actor发送消息
5、act方法中通过receive方法接受消息并进行相应的处理
6、act方法执行完成之后,程序退出

import scala.actors.Actor
class MyActor2  extends  Actor{
  override def act(): Unit = {
    receive{
      case  "start" => println("starting......")
    //  case _ => println("我没有匹配到任何消息")
    }
  }
}
object MyActor2{
  def main(args: Array[String]): Unit = {
    val actor = new MyActor2
    actor.start()
    actor ! "start"
  }
}

第三个例子

怎么实现actor可以不断地接受消息:
在act方法中可以使用while(true)的方式,不断的接受消息。

class MyActor3 extends  Actor{
  override def act(): Unit = {
    while (true){
      receive{
        case  "start" => println("starting")
        case "stop" =>println("stopping")
      }
    }
  }
}
object MyActor3{
  def main(args: Array[String]): Unit = {
    val actor = new MyActor3
    actor.start()
    actor ! "start"
    actor ! "stop"
  }
}

说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息
注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行

第四个例子

使用react方法代替receive方法去接受消息
好处:react方式会复用线程,避免频繁的线程创建、销毁和切换。比receive更高效
注意: react 如果要反复执行消息处理,react外层要用loop,不能用while

class MyActor4 extends Actor{
  override def act(): Unit = {
    loop{
      react{
        case "start" => println("starting")
        case "stop" => println("stopping")
      }
    }
  }
}


object MyActor4{
  def main(args: Array[String]): Unit = {
    val actor = new MyActor4
    actor.start()
    actor ! "start"
    actor ! "stop"

  }
}

第五个例子

结合case class样例类发送消息和接受消息

1、将消息封装在一个样例类中
2、通过匹配不同的样例类去执行不同的操作
3、Actor可以返回消息给发送方。通过sender方法向当前消息发送方返回消息

case class AsyncMessage(id:Int,message:String)
case class SyncMessage(id:Int,message:String)
case class ReplyMessage(id:Int,message:String)
class MyActor5 extends Actor{
  override def act(): Unit = {
    loop{
      react{
        case AsyncMessage(id,message) => {
          println(s"$id,$message")
          sender ! ReplyMessage(2,"异步有返回值的消息处理成功")
        }
        case SyncMessage(id,message) =>{
          println(s"$id,$message")
          sender ! ReplyMessage(id,"我是同步消息的返回值,等到我返回之后才能继续下一步的处理")
        }
      }
    }
  }
}

object  MyActor5{

  def main(args: Array[String]): Unit = {
    val actor: MyActor5 = new MyActor5
    actor.start()
    actor ! AsyncMessage(1,"helloworld")
    val asyncMessage: Future[Any] = actor !! AsyncMessage(2,"actorSend")
    val apply: Any = asyncMessage.apply()
    println(apply)
    println("helloworld22222")
    //同步阻塞消息
    val syncMessage: Any = actor !? SyncMessage(3,"我是同步阻塞消息")
    println(syncMessage)
  }
}

通过actor实现多文件单词计数

需求:
用actor并发编程写一个单机版的WordCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。

大致的思想步骤:
1、通过loop +react 方式去不断的接受消息
2、利用case class样例类去匹配对应的操作
3、其中scala中提供了文件读取的接口Source,通过调用其fromFile方法去获取文件内容
4、将每个文件的单词数量进行局部汇总,存放在一个ListBuffer中
5、最后将ListBuffer中的结果进行全局汇总。

import scala.actors.Actor
import scala.collection.mutable.ListBuffer
import scala.io.Source

case class FilePath(fileFullPath:String)
class WordCountActor extends Actor{
  override def act(): Unit = {
    //使用loop + react的方式接受数据
    loop{
      react{
        case FilePath(fileFullPath:String) => {
          // 使用Source来读取文件内容
          val fileContent=Source.fromFile(fileFullPath)
          // 获取文件所有内容
          val text=fileContent.mkString
          //   println(fileContent)
          // 对文件内容进行切分
          val lines = text.split("\r\n")
          // 对每一行进行按照空格进行切分
          //切分之后,将数据进行压平
          val words=lines.flatMap(x => x.split(" "))
          // 将压平后仅有单词的数组的每个元素转为(单词,1)的元组的形式
          val wordsTupleArr=words.map(x => (x,1))
          // 对元组数组进行按单词聚合成map
          val wordMap=wordsTupleArr.groupBy(x => x._1)
          // 统计每个单词的value的长度,即为单词在该文件中出现次数,mapValues是用来映射并修改Map中每个key的值的
          val wordCountRes=wordMap.mapValues(x => x.length)
          sender ! wordCountRes
        }
      }
    }
  }
}



object WordCountActor{
  def main(args: Array[String]): Unit = {
    val filePaths=Array("E:\\wordCount\\1.txt","E:\\wordCount\\2.txt",
    "E:\\wordCount\\3.txt")

    val wordCountActor=new WordCountActor
    // 创建一个列表来接收每个单词在单个文件出现次数的元组
    val resList=new ListBuffer[Tuple2[String,Int]]
    for (filePath <- filePaths){
      wordCountActor.start()
      val res=wordCountActor !! FilePath(filePath)
      // 将actor返回的消息转为原来的Map类型
      val resMap=res.apply().asInstanceOf[Map[String,Int]]
      // 将每个单个文件单词次数统计Map转为数组,并将数组的每个元组通过映射添加到空列表中
      resMap.toArray.map(x => resList.append(x))
    }
    // 按单词聚合元组,形成新的统计Map
    val wordCountMap=resList.groupBy(x => x._1)
    // 使用mapValues方法将每个单词原本的value(元组数组)转为foldLeft统计之后的单词次数
    val totalCount=wordCountMap.mapValues(x => x.foldLeft(0)(_+_._2))
    // 打印结果
    print(totalCount.mkString(","))
  }
}

scala当中的文件操作和网络请求

读取文件当中每一行的数据

def main(args: Array[String]): Unit = {
  //注意文件的编码格式,如果编码格式不对,那么读取报错
  val file: BufferedSource = Source.fromFile("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\file.txt","GBK");
  val lines: Iterator[String] = file.getLines()
 for(line <- lines){
   println(line)
 }
  //注意关闭文件
  file.close()
}

读取词法单元和数字

def main(args: Array[String]): Unit = {
  val file: BufferedSource = Source.fromFile("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\file2.txt","GBK");
  val split: Array[String] = file.mkString.split(" ")
  println(split.mkString("\t"))
  file.close()
}

读取网络资源、文件写入、控制台操作

读取网络资源

def main(args: Array[String]): Unit = {
  val source: BufferedSource = Source.fromURL("http://www.baidu.com")
  val string: String = source.mkString

  println(string)
  source.close()
}

文件写入操作

def main(args: Array[String]): Unit = {
  val writer = new PrintWriter("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\printWriter.txt")
  for(i <- 1 to 100){
    writer.println(i)
    writer.flush()
  }
  writer.close()
}

控制台交互操作

def main(args: Array[String]): Unit = {
  //控制台交互--老API
  print("请输入内容:")
  val consoleLine1 = Console.readLine()
  println("刚才输入的内容是:" + consoleLine1)

  //控制台交互--新API
  print("请输入内容(新API):")
  val consoleLine2 = StdIn.readLine()
  println("刚才输入的内容是:" + consoleLine2)
}

scala当中的序列化

@SerialVersionUID(1L)
class Person extends Serializable{
  override def toString = name + "," + age

  val name = "Nick"
  val age = 20

}

object PersonMain extends App{
  override def main(args: Array[String]): Unit = {

    import java.io.{FileOutputStream, FileInputStream, ObjectOutputStream, ObjectInputStream}
    val nick = new Person
    val out = new ObjectOutputStream(new FileOutputStream("Nick.obj"))
    out.writeObject(nick)
    out.close()

    val in = new ObjectInputStream(new FileInputStream("Nick.obj"))
    val saveNick = in.readObject()
    in.close()
    println(saveNick)
  }
}

scala当中的正则表达式

def main(args: Array[String]): Unit = {
  import scala.util.matching.Regex
  val pattern1 = new Regex("(S|s)cala")
  val pattern2 = "(S|s)cala".r
  val str = "Scala is scalable and cool"
  println((pattern2 findAllIn str).mkString(","))

}

隐式转换和隐式参数

隐式转换

Scala提供的隐式转换和隐式参数功能,是非常有特色的功能。是Java等编程语言所没有的功能。它可以允许你手动指定,将某种类型的对象转换成其他类型的对象或者是给一个类增加方法。通过这些功能,可以实现非常强大、特殊的功能。
Scala的隐式转换,其实最核心的就是定义隐式转换方法,即implicit conversion function。定义的隐式转换方法,只要在编写的程序内引入,就会被Scala自动使用。Scala会根据隐式转换方法的签名,在程序中使用到隐式转换方法接收的参数类型定义的对象时,会自动将其传入隐式转换方法,转换为另外一种类型的对象并返回。这就是“隐式转换”。其中所有的隐式值和隐式方法必须放到object中。
然而使用Scala的隐式转换是有一定的限制的,总结如下:

  • implicit关键字只能用来修饰方法、变量(参数)。
  • 隐式转换的方法在当前范围内才有效。如果隐式转换不在当前范围内定义(比如定义在另一个类中或包含在某个对象中),那么必须通过import语句将其导。

隐式参数

所谓的隐式参数,指的是在函数或者方法中,定义一个用implicit修饰的参数,此时Scala会尝试找到一个指定类型的(在当前环境中每种类型的隐式参数最多只能有一个),用implicit修饰的参数,即隐式值,并注入参数。
Scala会在两个范围内查找:

  • 当前作用域内可见的val或var定义的隐式变量;
  • 一种是隐式参数类型的伴生对象内的隐式值;

隐式转换方法作用域与导入

(1)Scala默认会使用两种隐式转换,一种是源类型或者目标类型的伴生对象内的隐式转换方法;一种是当前程序作用域内的可以用唯一标识符表示的隐式转换方法。
(2)如果隐式转换方法不在上述两种情况下的话,那么就必须手动使用import语法引入某个包下的隐式转换方法,比如import test._。通常建议,仅仅在需要进行隐式转换的地方,用import导入隐式转换方法,这样可以缩小隐式转换方法的作用域,避免不需要的隐式转换。

隐式转换的时机

(1)当对象调用类中不存在的方法或成员时,编译器会自动将对象进行隐式转换
(2)当方法中的参数的类型与目标类型不一致时

隐式转换和隐式参数案例

隐式转换案例一

将Double类型的数据自动转换成Int类型

object Chapter14 {
  implicit def ConvertDoubleToInt(first:Double):Int= first.toInt
}

object Convert{
  //导入隐式转换的方法
  import Chapter14._
  def main(args: Array[String]): Unit = {
    val first:Int = 3.5
  }
}

将猫类的方法给狗类,并且不需要狗类继承猫类

class Cat(){
  def catchMouse(name:String): Unit ={
    println(name+"抓老鼠")
  }
}

class Dog(){
  def lookDoor(name:String): Unit ={
    println(name+"看门")
  }
}

隐式转换案例二

让File类具备RichFile类中的read方法

// 注意,File类需要导java.io.File包
class MyFileReader(file:File){
  def readFile()=Source.fromFile(file,"GBK").mkString
}

object RichFile{
  implicit def convertFile2MyFileReader(f:File)=new MyFileReader(f)
  def main(args: Array[String]): Unit = {
    val file=new File("E:\\files\\file.txt")
    println(file.readFile())
  }
}

隐式转换案例三

奥特曼变身

class Man{
  var name:String=_
  def this(name:String){
    this()
    this.name=name
  }
}

class AutoMan(name:String){
  def beat(): Unit ={
    println(name+"奥特曼打怪兽")
  }
}

object AutoManOperator{
  implicit def man2AutoMan(m:Man)=new AutoMan(m.name)

  def main(args: Array[String]): Unit = {
    val dijia=new Man("迪迦")
    dijia.beat()
  }
}

隐式转换案例四

一个类隐式转换成具有相同方法的多个类

class A(c:C) {
  def readBook(): Unit ={
    println("A说:好书好书...")
  }
}
class B(c:C){
  def readBook(): Unit ={
    println("B说:看不懂...")
  }
  def writeBook(): Unit ={
    println("B说:不会写...")
  }
}
class C
object AB{
  //创建一个类的2个类的隐式转换
  implicit def C2A(c:C)=new A(c)
  implicit def C2B(c:C)=new B(c)
}
object B{
  def main(args: Array[String]) {
    //导包
    //1. import AB._ 会将AB类下的所有隐式转换导进来
    //2. import AB._C2A 只导入C类到A类的的隐式转换方法
    //3. import AB._C2B 只导入C类到B类的的隐式转换方法
    import AB._
    val c=new C
    //由于A类与B类中都有readBook(),只能导入其中一个,否则调用共同方法时代码报错
    //c.readBook()
    //C类可以执行B类中的writeBook()
    c.writeBook()
  }
}

隐式参数案例五

员工领取薪水

object Company{
  //在object中定义隐式值    注意:同一类型的隐式值只允许出现一次,否则会报错
  implicit  val aaa="zhangsan"
  implicit  val bbb=10000.00
}
class Boss {
  //注意参数匹配的类型   它需要的是String类型的隐式值
  def callName()(implicit name:String):String={
    name+" is coming !"
  }
  //定义一个用implicit修饰的参数
  //注意参数匹配的类型    它需要的是Double类型的隐式值
  def getMoney()(implicit money:Double):String={
    " 当月薪水:"+money
  }
}
object Boss extends App{
  //使用import导入定义好的隐式值,注意:必须先加载否则会报错
  import Company._
  val boss =new Boss
  println(boss.callName()+boss.getMoney())

}

Akka并发编程

Akka简介

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Scala(第五节)actor并发编程、文件操作和网络请求、隐式转换和隐式参数、Akka并发编程、Akka模拟简易Spark通信
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

  1. 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
  2. 提供了异步非阻塞的、高性能的事件驱动编程模型
  3. 超级轻量级事件处理(每GB堆内存几百万Actor)

Akka并发编程案例

利用Akka的actor编程模型,实现2个进程间的通信。

架构图

Scala(第五节)actor并发编程、文件操作和网络请求、隐式转换和隐式参数、Akka并发编程、Akka模拟简易Spark通信

重要类介绍

  • ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
    注意:
    (1)、ActorSystem是一个进程中的老大,它负责创建和监督actor
    (2)、ActorSystem是一个单例对象
    (3)、actor负责通信

  • Actor
    在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
    (1)preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
    (2)receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。

具体代码

第一步:创建maven工程
pom如下:

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.3.14</version>
    </dependency>

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-remote_2.11</artifactId>
        <version>2.3.14</version>
    </dependency>

</dependencies>

<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <!-- 限制jdk的编译版本插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>


        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

第二步:master进程代码开发

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Master extends Actor{
  println("构造器方法")

  override def preStart(): Unit = {
    println("这是个初始化的方法,构造完成后就被调用")
  }
//receive方法会在prestart方法执行后被调用,表示不断的接受消息
  override def receive: Receive = {
    case "connect" => {
      println("connected")
      //master发送注册成功信息给worker
      sender ! "success"
    }
  }
}


object Master{
  def main(args: Array[String]): Unit = {
    val host="192.168.2.6"
    val port=8888
    //准备连接信息
    val config=ConfigFactory.parseString(
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
    )
     // 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
    val actorSystem=ActorSystem("masterActorSystem",config)
    // 2、通过ActorSystem来创建master actor
    val master=actorSystem.actorOf(Props(new Master),"master")
  }
}

第三步:worker进程代码开发

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Worker extends Actor{
  println("构造器方法")
//prestart方法会在构造代码块之后被调用,并且只会被调用一次
  override def preStart(): Unit = {
  //获取master actor的引用
    //ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor
    //调用对应actorSelection方法,
    // 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级
    val actorSelection:ActorSelection=context.actorSelection(
      "akka.tcp://masterActorSystem@192.168.2.6:8888/user/master")
    actorSelection ! "connect"
  }
//receive方法会在prestart方法执行后被调用,不断的接受消息
  override def receive: Receive = {
    //case "connect" => println("connecting...")
    case "success" => println("successful")
  }
}


object Worker{
  val host="192.168.2.6"
  // worker的端口号需要与master不同
  val port=9999

  def main(args: Array[String]): Unit = {
    val config=ConfigFactory.parseString(
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
    )
    val actorSystem=ActorSystem("workerActorSystem",config)
    val worker=actorSystem.actorOf(Props(new Worker),"worker")
    //worker ! "connect"
  }
}

Akka模拟简易Spark通信

首先创建maven工程,pom与“Akka并发编程案例”中的相同。

架构图

Scala(第五节)actor并发编程、文件操作和网络请求、隐式转换和隐式参数、Akka并发编程、Akka模拟简易Spark通信

具体代码

1、Master类

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

class SparkMaster extends Actor {
//定义一个map集合,用于存放worker信息
  var receiveMap=new mutable.HashMap[String,WorkerInfo]()

  override def preStart(): Unit = {
    println("test")
  }

  override def receive: Receive = {
  //master接受worker的注册信息
    case RegisterMessage(workerId,memory,cores) => {
    //保存信息到map集合中
    receiveMap.put(workerId,
      new WorkerInfo(workerId,memory,cores))
      //master反馈注册成功给worker
      sender ! RegisteredMessage("注册成功")
    }
     //master接受worker的心跳信息
    case SendHeartBeat(workerId) => {
      println("接收到心跳"+workerId)
      //判断worker是否已经注册,master只接受已经注册过的worker的心跳信息
      if (receiveMap.contains(workerId)){
        val workInfo=receiveMap(workerId)
        // 修改worker的workInfo的最近心跳时间
        workInfo.lastHeartBeatTime=System.currentTimeMillis()
        receiveMap += workerId -> workInfo
      }
    }
  }
}


object SparkMaster{
  val host="192.168.2.6"
  val port=8888

  def main(args: Array[String]): Unit = {
    val config=ConfigFactory.parseString(
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
    )
	//创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
    val actorSystem=ActorSystem("SparkMasterActorSystem",config)
    //通过ActorSystem来创建master actor
    val masterActor=actorSystem.actorOf(Props(new SparkMaster),"SparkMaster")
  }
}

2、Worker类

import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._


class SparkWorker extends Actor{
  val workId=UUID.randomUUID().toString
  val memory=128
  val cores=16
  // 创建worker与master通信的actorSelection通道,因为后面多处会用到该通道,所以放到类属性中
  val actorSelection:ActorSelection=context
  .actorSelection("akka.tcp://SparkMasterActorSystem@192.168.2.6:8888/user/SparkMaster")
  override def preStart(): Unit = {
  //发送注册信息给master
    actorSelection ! RegisterMessage(workId,memory,cores)
  }

  override def receive: Receive = {
  //接收master返回的注册成功的消息
    case RegisteredMessage(message) => {
      println(message)
      // 导入隐式转换
      import context.dispatcher
      //向master定期的发送心跳
      //worker先自己给自己发送心跳
      context.system.scheduler.schedule(0 millis,3000 millis,self,HeartBeat)
    }
    //worker接受自己的心跳,并向master发送心跳
    case HeartBeat => actorSelection ! SendHeartBeat(workId)
  }
}

object SparkWorker{
  val host="192.168.2.6"
  val port=9999

  def main(args: Array[String]): Unit = {
    val config=ConfigFactory.parseString(
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
    )

    val actorSystem=ActorSystem("SparkWorkerActorSystem",config)
    val workerActor=actorSystem.actorOf(Props(new SparkWorker),"SparkWorker")
  }
}

3、WorkerInfo类

//封装worker信息
class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) {
        //定义一个变量用于存放worker上一次心跳时间
      var lastHeartBeatTime:Long=_

  override def toString: String = {
    s"workerId:$workerId , memory:$memory , cores:$cores"
  }
}

4、样例类集合

trait RemoteMessage  extends Serializable{
}
//worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage
//master反馈注册成功信息给worker,由于不在同一进程中,也需要实现序列化
case class RegisteredMessage(message:String) extends RemoteMessage
//worker向worker发送心跳 由于在同一进程中,不需要实现序列化
case object HeartBeat
//worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class SendHeartBeat(val workerId:String) extends RemoteMessage
//master自己向自己发送消息,由于在同一进程中,不需要实现序列化
case object CheckOutTime
上一篇:mysql性能优化-group by的优化


下一篇:Java动态代理