目录
actor并发编程
什么是Scala Actor
概念
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息的发送、接收来实现高并发的。
Actor可以看作是一个个独立的实体,他们之间是毫无关联的。但是,他们可以通过消息来通信。一个Actor收到其他Actor的信息后,它可以根据需要作出各种相应。消息的类型可以是任意的,消息的内容也可以是任意的。
java并发编程与Scala Actor编程的区别
对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。
与Java的基于共享数据和锁的线程模型不同,Scala的actor包则提供了另外一种不共享任何数据、依赖消息传递的模型,从而进行并发编程。
Actor的执行顺序
- 首先调用start()方法启动Actor
- 调用start()方法后其act()方法会被执行
- 向Actor发送消息
- act方法执行完成之后,程序会调用exit方法
发送消息的方式
符号 | 含义 |
---|---|
! | 发送异步消息,没有返回值。 |
!? | 发送同步消息,等待返回值。 |
!! | 发送异步消息,返回值是 Future[Any]。 |
Actor实战
第一个例子
怎么实现actor并发编程:
1、定义一个class或者是object继承Actor特质,注意导包import scala.actors.Actor
2、重写对应的act方法
3、调用Actor的start方法执行Actor
4、当act方法执行完成,整个程序运行结束
import scala.actors.Actor
class Actor1 extends Actor{
override def act(): Unit = {
for(i <- 1 to 10){
println("actor1====="+i)
}
}
}
object Actor2 extends Actor{
override def act(): Unit = {
for(j <- 1 to 10){
println("actor2====="+j)
}
}
}
object Actor1{
def main(args: Array[String]): Unit = {
val actor = new Actor1
actor.act()
Actor2.act()
}
}
第二个例子
怎么实现actor发送、接受消息:
1、定义一个class或者是object继承Actor特质,注意导包import scala.actors.Actor
2、重写对应的act方法
3、调用Actor的start方法执行Actor
4、通过不同发送消息的方式对actor发送消息
5、act方法中通过receive方法接受消息并进行相应的处理
6、act方法执行完成之后,程序退出
import scala.actors.Actor
class MyActor2 extends Actor{
override def act(): Unit = {
receive{
case "start" => println("starting......")
// case _ => println("我没有匹配到任何消息")
}
}
}
object MyActor2{
def main(args: Array[String]): Unit = {
val actor = new MyActor2
actor.start()
actor ! "start"
}
}
第三个例子
怎么实现actor可以不断地接受消息:
在act方法中可以使用while(true)的方式,不断的接受消息。
class MyActor3 extends Actor{
override def act(): Unit = {
while (true){
receive{
case "start" => println("starting")
case "stop" =>println("stopping")
}
}
}
}
object MyActor3{
def main(args: Array[String]): Unit = {
val actor = new MyActor3
actor.start()
actor ! "start"
actor ! "stop"
}
}
说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息
注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行
第四个例子
使用react方法代替receive方法去接受消息
好处:react方式会复用线程,避免频繁的线程创建、销毁和切换。比receive更高效
注意: react 如果要反复执行消息处理,react外层要用loop,不能用while
class MyActor4 extends Actor{
override def act(): Unit = {
loop{
react{
case "start" => println("starting")
case "stop" => println("stopping")
}
}
}
}
object MyActor4{
def main(args: Array[String]): Unit = {
val actor = new MyActor4
actor.start()
actor ! "start"
actor ! "stop"
}
}
第五个例子
结合case class样例类发送消息和接受消息
1、将消息封装在一个样例类中
2、通过匹配不同的样例类去执行不同的操作
3、Actor可以返回消息给发送方。通过sender方法向当前消息发送方返回消息
case class AsyncMessage(id:Int,message:String)
case class SyncMessage(id:Int,message:String)
case class ReplyMessage(id:Int,message:String)
class MyActor5 extends Actor{
override def act(): Unit = {
loop{
react{
case AsyncMessage(id,message) => {
println(s"$id,$message")
sender ! ReplyMessage(2,"异步有返回值的消息处理成功")
}
case SyncMessage(id,message) =>{
println(s"$id,$message")
sender ! ReplyMessage(id,"我是同步消息的返回值,等到我返回之后才能继续下一步的处理")
}
}
}
}
}
object MyActor5{
def main(args: Array[String]): Unit = {
val actor: MyActor5 = new MyActor5
actor.start()
actor ! AsyncMessage(1,"helloworld")
val asyncMessage: Future[Any] = actor !! AsyncMessage(2,"actorSend")
val apply: Any = asyncMessage.apply()
println(apply)
println("helloworld22222")
//同步阻塞消息
val syncMessage: Any = actor !? SyncMessage(3,"我是同步阻塞消息")
println(syncMessage)
}
}
通过actor实现多文件单词计数
需求:
用actor并发编程写一个单机版的WordCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果。
大致的思想步骤:
1、通过loop +react 方式去不断的接受消息
2、利用case class样例类去匹配对应的操作
3、其中scala中提供了文件读取的接口Source,通过调用其fromFile方法去获取文件内容
4、将每个文件的单词数量进行局部汇总,存放在一个ListBuffer中
5、最后将ListBuffer中的结果进行全局汇总。
import scala.actors.Actor
import scala.collection.mutable.ListBuffer
import scala.io.Source
case class FilePath(fileFullPath:String)
class WordCountActor extends Actor{
override def act(): Unit = {
//使用loop + react的方式接受数据
loop{
react{
case FilePath(fileFullPath:String) => {
// 使用Source来读取文件内容
val fileContent=Source.fromFile(fileFullPath)
// 获取文件所有内容
val text=fileContent.mkString
// println(fileContent)
// 对文件内容进行切分
val lines = text.split("\r\n")
// 对每一行进行按照空格进行切分
//切分之后,将数据进行压平
val words=lines.flatMap(x => x.split(" "))
// 将压平后仅有单词的数组的每个元素转为(单词,1)的元组的形式
val wordsTupleArr=words.map(x => (x,1))
// 对元组数组进行按单词聚合成map
val wordMap=wordsTupleArr.groupBy(x => x._1)
// 统计每个单词的value的长度,即为单词在该文件中出现次数,mapValues是用来映射并修改Map中每个key的值的
val wordCountRes=wordMap.mapValues(x => x.length)
sender ! wordCountRes
}
}
}
}
}
object WordCountActor{
def main(args: Array[String]): Unit = {
val filePaths=Array("E:\\wordCount\\1.txt","E:\\wordCount\\2.txt",
"E:\\wordCount\\3.txt")
val wordCountActor=new WordCountActor
// 创建一个列表来接收每个单词在单个文件出现次数的元组
val resList=new ListBuffer[Tuple2[String,Int]]
for (filePath <- filePaths){
wordCountActor.start()
val res=wordCountActor !! FilePath(filePath)
// 将actor返回的消息转为原来的Map类型
val resMap=res.apply().asInstanceOf[Map[String,Int]]
// 将每个单个文件单词次数统计Map转为数组,并将数组的每个元组通过映射添加到空列表中
resMap.toArray.map(x => resList.append(x))
}
// 按单词聚合元组,形成新的统计Map
val wordCountMap=resList.groupBy(x => x._1)
// 使用mapValues方法将每个单词原本的value(元组数组)转为foldLeft统计之后的单词次数
val totalCount=wordCountMap.mapValues(x => x.foldLeft(0)(_+_._2))
// 打印结果
print(totalCount.mkString(","))
}
}
scala当中的文件操作和网络请求
读取文件当中每一行的数据
def main(args: Array[String]): Unit = {
//注意文件的编码格式,如果编码格式不对,那么读取报错
val file: BufferedSource = Source.fromFile("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\file.txt","GBK");
val lines: Iterator[String] = file.getLines()
for(line <- lines){
println(line)
}
//注意关闭文件
file.close()
}
读取词法单元和数字
def main(args: Array[String]): Unit = {
val file: BufferedSource = Source.fromFile("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\file2.txt","GBK");
val split: Array[String] = file.mkString.split(" ")
println(split.mkString("\t"))
file.close()
}
读取网络资源、文件写入、控制台操作
读取网络资源
def main(args: Array[String]): Unit = {
val source: BufferedSource = Source.fromURL("http://www.baidu.com")
val string: String = source.mkString
println(string)
source.close()
}
文件写入操作
def main(args: Array[String]): Unit = {
val writer = new PrintWriter("F:\\scala与spark课件资料教案\\3、scala第三天\\files\\printWriter.txt")
for(i <- 1 to 100){
writer.println(i)
writer.flush()
}
writer.close()
}
控制台交互操作
def main(args: Array[String]): Unit = {
//控制台交互--老API
print("请输入内容:")
val consoleLine1 = Console.readLine()
println("刚才输入的内容是:" + consoleLine1)
//控制台交互--新API
print("请输入内容(新API):")
val consoleLine2 = StdIn.readLine()
println("刚才输入的内容是:" + consoleLine2)
}
scala当中的序列化
@SerialVersionUID(1L)
class Person extends Serializable{
override def toString = name + "," + age
val name = "Nick"
val age = 20
}
object PersonMain extends App{
override def main(args: Array[String]): Unit = {
import java.io.{FileOutputStream, FileInputStream, ObjectOutputStream, ObjectInputStream}
val nick = new Person
val out = new ObjectOutputStream(new FileOutputStream("Nick.obj"))
out.writeObject(nick)
out.close()
val in = new ObjectInputStream(new FileInputStream("Nick.obj"))
val saveNick = in.readObject()
in.close()
println(saveNick)
}
}
scala当中的正则表达式
def main(args: Array[String]): Unit = {
import scala.util.matching.Regex
val pattern1 = new Regex("(S|s)cala")
val pattern2 = "(S|s)cala".r
val str = "Scala is scalable and cool"
println((pattern2 findAllIn str).mkString(","))
}
隐式转换和隐式参数
隐式转换
Scala提供的隐式转换和隐式参数功能,是非常有特色的功能。是Java等编程语言所没有的功能。它可以允许你手动指定,将某种类型的对象转换成其他类型的对象或者是给一个类增加方法。通过这些功能,可以实现非常强大、特殊的功能。
Scala的隐式转换,其实最核心的就是定义隐式转换方法,即implicit conversion function。定义的隐式转换方法,只要在编写的程序内引入,就会被Scala自动使用。Scala会根据隐式转换方法的签名,在程序中使用到隐式转换方法接收的参数类型定义的对象时,会自动将其传入隐式转换方法,转换为另外一种类型的对象并返回。这就是“隐式转换”。其中所有的隐式值和隐式方法必须放到object中。
然而使用Scala的隐式转换是有一定的限制的,总结如下:
- implicit关键字只能用来修饰方法、变量(参数)。
- 隐式转换的方法在当前范围内才有效。如果隐式转换不在当前范围内定义(比如定义在另一个类中或包含在某个对象中),那么必须通过import语句将其导。
隐式参数
所谓的隐式参数,指的是在函数或者方法中,定义一个用implicit修饰的参数,此时Scala会尝试找到一个指定类型的(在当前环境中每种类型的隐式参数最多只能有一个),用implicit修饰的参数,即隐式值,并注入参数。
Scala会在两个范围内查找:
- 当前作用域内可见的val或var定义的隐式变量;
- 一种是隐式参数类型的伴生对象内的隐式值;
隐式转换方法作用域与导入
(1)Scala默认会使用两种隐式转换,一种是源类型或者目标类型的伴生对象内的隐式转换方法;一种是当前程序作用域内的可以用唯一标识符表示的隐式转换方法。
(2)如果隐式转换方法不在上述两种情况下的话,那么就必须手动使用import语法引入某个包下的隐式转换方法,比如import test._。通常建议,仅仅在需要进行隐式转换的地方,用import导入隐式转换方法,这样可以缩小隐式转换方法的作用域,避免不需要的隐式转换。
隐式转换的时机
(1)当对象调用类中不存在的方法或成员时,编译器会自动将对象进行隐式转换
(2)当方法中的参数的类型与目标类型不一致时
隐式转换和隐式参数案例
隐式转换案例一
将Double类型的数据自动转换成Int类型
object Chapter14 {
implicit def ConvertDoubleToInt(first:Double):Int= first.toInt
}
object Convert{
//导入隐式转换的方法
import Chapter14._
def main(args: Array[String]): Unit = {
val first:Int = 3.5
}
}
将猫类的方法给狗类,并且不需要狗类继承猫类
class Cat(){
def catchMouse(name:String): Unit ={
println(name+"抓老鼠")
}
}
class Dog(){
def lookDoor(name:String): Unit ={
println(name+"看门")
}
}
隐式转换案例二
让File类具备RichFile类中的read方法
// 注意,File类需要导java.io.File包
class MyFileReader(file:File){
def readFile()=Source.fromFile(file,"GBK").mkString
}
object RichFile{
implicit def convertFile2MyFileReader(f:File)=new MyFileReader(f)
def main(args: Array[String]): Unit = {
val file=new File("E:\\files\\file.txt")
println(file.readFile())
}
}
隐式转换案例三
奥特曼变身
class Man{
var name:String=_
def this(name:String){
this()
this.name=name
}
}
class AutoMan(name:String){
def beat(): Unit ={
println(name+"奥特曼打怪兽")
}
}
object AutoManOperator{
implicit def man2AutoMan(m:Man)=new AutoMan(m.name)
def main(args: Array[String]): Unit = {
val dijia=new Man("迪迦")
dijia.beat()
}
}
隐式转换案例四
一个类隐式转换成具有相同方法的多个类
class A(c:C) {
def readBook(): Unit ={
println("A说:好书好书...")
}
}
class B(c:C){
def readBook(): Unit ={
println("B说:看不懂...")
}
def writeBook(): Unit ={
println("B说:不会写...")
}
}
class C
object AB{
//创建一个类的2个类的隐式转换
implicit def C2A(c:C)=new A(c)
implicit def C2B(c:C)=new B(c)
}
object B{
def main(args: Array[String]) {
//导包
//1. import AB._ 会将AB类下的所有隐式转换导进来
//2. import AB._C2A 只导入C类到A类的的隐式转换方法
//3. import AB._C2B 只导入C类到B类的的隐式转换方法
import AB._
val c=new C
//由于A类与B类中都有readBook(),只能导入其中一个,否则调用共同方法时代码报错
//c.readBook()
//C类可以执行B类中的writeBook()
c.writeBook()
}
}
隐式参数案例五
员工领取薪水
object Company{
//在object中定义隐式值 注意:同一类型的隐式值只允许出现一次,否则会报错
implicit val aaa="zhangsan"
implicit val bbb=10000.00
}
class Boss {
//注意参数匹配的类型 它需要的是String类型的隐式值
def callName()(implicit name:String):String={
name+" is coming !"
}
//定义一个用implicit修饰的参数
//注意参数匹配的类型 它需要的是Double类型的隐式值
def getMoney()(implicit money:Double):String={
" 当月薪水:"+money
}
}
object Boss extends App{
//使用import导入定义好的隐式值,注意:必须先加载否则会报错
import Company._
val boss =new Boss
println(boss.callName()+boss.getMoney())
}
Akka并发编程
Akka简介
Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
- 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
- 提供了异步非阻塞的、高性能的事件驱动编程模型
- 超级轻量级事件处理(每GB堆内存几百万Actor)
Akka并发编程案例
利用Akka的actor编程模型,实现2个进程间的通信。
架构图
重要类介绍
-
ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
注意:
(1)、ActorSystem是一个进程中的老大,它负责创建和监督actor
(2)、ActorSystem是一个单例对象
(3)、actor负责通信 -
Actor
在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
(1)preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
(2)receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
具体代码
第一步:创建maven工程
pom如下:
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.14</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.3.14</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 限制jdk的编译版本插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第二步:master进程代码开发
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class Master extends Actor{
println("构造器方法")
override def preStart(): Unit = {
println("这是个初始化的方法,构造完成后就被调用")
}
//receive方法会在prestart方法执行后被调用,表示不断的接受消息
override def receive: Receive = {
case "connect" => {
println("connected")
//master发送注册成功信息给worker
sender ! "success"
}
}
}
object Master{
def main(args: Array[String]): Unit = {
val host="192.168.2.6"
val port=8888
//准备连接信息
val config=ConfigFactory.parseString(
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
)
// 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
val actorSystem=ActorSystem("masterActorSystem",config)
// 2、通过ActorSystem来创建master actor
val master=actorSystem.actorOf(Props(new Master),"master")
}
}
第三步:worker进程代码开发
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class Worker extends Actor{
println("构造器方法")
//prestart方法会在构造代码块之后被调用,并且只会被调用一次
override def preStart(): Unit = {
//获取master actor的引用
//ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor
//调用对应actorSelection方法,
// 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级
val actorSelection:ActorSelection=context.actorSelection(
"akka.tcp://masterActorSystem@192.168.2.6:8888/user/master")
actorSelection ! "connect"
}
//receive方法会在prestart方法执行后被调用,不断的接受消息
override def receive: Receive = {
//case "connect" => println("connecting...")
case "success" => println("successful")
}
}
object Worker{
val host="192.168.2.6"
// worker的端口号需要与master不同
val port=9999
def main(args: Array[String]): Unit = {
val config=ConfigFactory.parseString(
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
)
val actorSystem=ActorSystem("workerActorSystem",config)
val worker=actorSystem.actorOf(Props(new Worker),"worker")
//worker ! "connect"
}
}
Akka模拟简易Spark通信
首先创建maven工程,pom与“Akka并发编程案例”中的相同。
架构图
具体代码
1、Master类
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
class SparkMaster extends Actor {
//定义一个map集合,用于存放worker信息
var receiveMap=new mutable.HashMap[String,WorkerInfo]()
override def preStart(): Unit = {
println("test")
}
override def receive: Receive = {
//master接受worker的注册信息
case RegisterMessage(workerId,memory,cores) => {
//保存信息到map集合中
receiveMap.put(workerId,
new WorkerInfo(workerId,memory,cores))
//master反馈注册成功给worker
sender ! RegisteredMessage("注册成功")
}
//master接受worker的心跳信息
case SendHeartBeat(workerId) => {
println("接收到心跳"+workerId)
//判断worker是否已经注册,master只接受已经注册过的worker的心跳信息
if (receiveMap.contains(workerId)){
val workInfo=receiveMap(workerId)
// 修改worker的workInfo的最近心跳时间
workInfo.lastHeartBeatTime=System.currentTimeMillis()
receiveMap += workerId -> workInfo
}
}
}
}
object SparkMaster{
val host="192.168.2.6"
val port=8888
def main(args: Array[String]): Unit = {
val config=ConfigFactory.parseString(
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
)
//创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
val actorSystem=ActorSystem("SparkMasterActorSystem",config)
//通过ActorSystem来创建master actor
val masterActor=actorSystem.actorOf(Props(new SparkMaster),"SparkMaster")
}
}
2、Worker类
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
class SparkWorker extends Actor{
val workId=UUID.randomUUID().toString
val memory=128
val cores=16
// 创建worker与master通信的actorSelection通道,因为后面多处会用到该通道,所以放到类属性中
val actorSelection:ActorSelection=context
.actorSelection("akka.tcp://SparkMasterActorSystem@192.168.2.6:8888/user/SparkMaster")
override def preStart(): Unit = {
//发送注册信息给master
actorSelection ! RegisterMessage(workId,memory,cores)
}
override def receive: Receive = {
//接收master返回的注册成功的消息
case RegisteredMessage(message) => {
println(message)
// 导入隐式转换
import context.dispatcher
//向master定期的发送心跳
//worker先自己给自己发送心跳
context.system.scheduler.schedule(0 millis,3000 millis,self,HeartBeat)
}
//worker接受自己的心跳,并向master发送心跳
case HeartBeat => actorSelection ! SendHeartBeat(workId)
}
}
object SparkWorker{
val host="192.168.2.6"
val port=9999
def main(args: Array[String]): Unit = {
val config=ConfigFactory.parseString(
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
)
val actorSystem=ActorSystem("SparkWorkerActorSystem",config)
val workerActor=actorSystem.actorOf(Props(new SparkWorker),"SparkWorker")
}
}
3、WorkerInfo类
//封装worker信息
class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) {
//定义一个变量用于存放worker上一次心跳时间
var lastHeartBeatTime:Long=_
override def toString: String = {
s"workerId:$workerId , memory:$memory , cores:$cores"
}
}
4、样例类集合
trait RemoteMessage extends Serializable{
}
//worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage
//master反馈注册成功信息给worker,由于不在同一进程中,也需要实现序列化
case class RegisteredMessage(message:String) extends RemoteMessage
//worker向worker发送心跳 由于在同一进程中,不需要实现序列化
case object HeartBeat
//worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class SendHeartBeat(val workerId:String) extends RemoteMessage
//master自己向自己发送消息,由于在同一进程中,不需要实现序列化
case object CheckOutTime