[toc]
## Scala并发编程模型Akka
### 1. Akka简介
#### Akka是什么?
> 1. Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时,你可以理解成Akka是编写并发程序的框架。
> 2. Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。
> 3. Akka主要解决的问题是:可以轻松的写出高效稳定的并发程序,程序员不再过多的考虑线程、锁和资源竞争等细节。
> 4. ![image-20210406221430939](assets/image-20210406221430939.png)
#### Actor 模型解决什么问题?
> 1. 处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。但是当我们对关键代码加入同步条件synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有很大影响。
> 2. 若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保证。
> 3. Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。 你可以这里理解:`Actor 模型是一种处理并发问题的解决方案,很牛!`
### 2. Akka中Actor模型
#### Actor模型及其说明
> ![image-20210406221510708](assets/image-20210406221510708.png)
>
> 1. Akka 处理并发的方法基于 Actor 模型。(示意图)
> 2. 在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。
> 3. Actor 模型是作为一个并发模型设计和架构的。Actor与Actor 之间只能通过消息通信,如图的信封。
> 4. Actor 与 Actor 之间只能用消息进行通信,当一个 Actor 给另外一个 Actor发消息,消息是有顺序的(消息队列),只需要将消息投寄的相应的邮箱即可。
> 5. 怎么处理消息是由接收消息的Actor决定的,发送消息Actor可以等待回复,也可以异步处理【ajax】
> 6. ActorSystem 的职责是负责创建并管理其创建的Actor, ActorSystem 是单例的(可以ActorSystem是一个工厂,专门创建Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的。
> 7. Actor模型是对并发模型进行了更高的抽象。
> 8. Actor模型是`异步、非阻塞、高性能`的事件驱动编程模型。[异步、非阻塞, 最经典的案例:就是ajax异步请求处理 ]
> 9. Actor模型是轻量级事件处理(1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高。
#### Actor模型工作机制说明
![image-20210406222141148](assets/image-20210406222141148.png)
> `Actor模型工作机制说明(对照工作机制示意图理解)`
>
> 1. ActorySystem创建Actor
> 2. `ActorRef:可以理解成是Actor的代理或者引用。`消息是通过ActorRef来发送,而不能通过Actor 发送消息,通过哪个ActorRef 发消息,就表示把该消息发给哪个Actor
> 3. 消息发送到`Dispatcher Message (消息分发器)`,它得到消息后,会将消息进行分发到对应的MailBox。(注: Dispatcher Message 可以理解成是一个线程池, MailBox 可以理解成是消息队列,可以缓冲多个消息,遵守FIFO)
> 4. Actor 可以通过 receive方法来获取消息,然后进行处理。
>
> `Actor间传递消息机制(对照工作机制示意图理解)`
>
> 1. 每一个消息就是一个Message对象。Message 继承了Runable, 因为Message就是线程类。
> 2. 从Actor模型工作机制看上去很麻烦,但是程序员编程时只需要编写Actor就可以了,其它的交给Actor模型完成即可。
> 3. `A Actor要给B Actor 发送消息,那么A Actor 要先拿到(也称为持有) B Actor 的 代理对象ActorRef 才能发送消息。`
### 3. Actor模型快速入门
> 1. 编写一个SayHelloActor
> 2. SayHelloActor 可以给自己发送消息
> 3. 要求使用Maven的方式来构建项目,这样可以很好的解决项目开发包的依赖关系。
#### SayHelloActor
~~~~scala
package com.atguigu.akka01.actor
import akka.actor.{Actor}
/**
* @Date 2021/4/6 14:13
* @Version 10.21
* @Author DuanChaojie
* 继承Actor后,SayHelloActor就是一个Actor
* 重写核心方法receive
*/
class SayHelloActor extends Actor {
/**
* 1、receive方法,会被该SayHelloActor的MailBox调用
* 2、当该SayHelloActor的MailBox接收到消息,就会调用receive方法
* 3、type Receive = scala.PartialFunction[scala.Any, scala.Unit],即Receive表示偏函数接收的参数类型是Any,返回类型是Unit
* 4、isDefinedAt(x: Any) 如果返回true ,就会去调用 apply 构建对象实例,如果是false,过滤
*
* @return Receive
*/
override def receive: Receive = {
case "Hello" => println("SayHelloActor:Hello tom")
case "Ok" => println("SayHelloActor:Ok jack")
case "exit" => {
println("SayHelloActor:退出系统...")
// 停止actoref
/**
*/
/** 1、context.stop(xxx)阻止被指定的Actor,这是一个异步操作,即涉及一个消息发送。
* 2、如果此方法应用于来自参与者内部的"self"引用,
* 则该参与者保证不会在此调用后处理任何其他消息;
* 请注意,当前消息的处理将继续,此方法不会立即终止此参与者。
* 3、The 'self' field holds the ActorRef for this actor
* 4、self可用于向自身发送消息,格式为:self ! message
*/
context.stop(self)
/**
* Terminates this actor system
*/
context.system.terminate()
}
case _ => println("SayHelloActor:匹配失败!")
}
}
~~~~
#### SayHelloActorDemo
~~~~scala
package com.atguigu.akka01.main
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.actor.SayHelloActor
import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}
/**
* @Date 2021/4/6 15:42
* @Version 10.21
* @Author DuanChaojie
*/
object SayHelloActorDemo {
// 1、创建一个ActorSystem,专门用于创建Actor
private val actorFactory = ActorSystem("actorFactory")
/**
* 创建SayHelloActor的同时,返回SayHelloActor的sayHelloActorRef
* 1、Props[SayHelloActor] 通过反射创建一个SayHelloActor实例
* 2、给创建的Actor(SayHelloActor)取名为sayHelloActor
* 3、sayHelloActorRef: ActorRef 就是 Props[SayHelloActor] 的ActorRef
* 4、创建的SayHelloActor实例被ActorSystem接管
*/
private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")
def main(args: Array[String]): Unit = {
// 向sayHelloActorRef发送消息
breakable {
while (true) {
println(Console.GREEN + "请输入你想发的消息(提示输入完毕按回车):")
val command = StdIn.readLine()
sayHelloActorRef ! command
Thread.sleep(1000)
if (command == "exit") {
break()
}
}
}
}
}
~~~~
#### 效果如图:
![QQ截图20210406150212](assets/QQ截图20210406150212.png)
#### 小结和说明:
> 1. 当程序执行 `private val sayHelloActorRef: ActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")`,会完成如下任务 [这是非常重要的方法]
> 2. actorFactory 是 ActorSystem("actorFactory")这样创建的。
> 3. 这里的 Props[SayHelloActor]会使用反射机制,创建一个SayHelloActor对象,如果是`actorFactory.actorOf(Props(new SayHelloActor), "sayHelloActor") 形式`,就是使用new 的方式创建一个SayHelloActor对象, 注意Props() 是小括号。
> 4. 会创建一个SayHelloActor对象的代理对象 sayHelloActorRef, 使用sayHelloActorRef才能发送消息
> 5. 会在底层创建 Dispather Message ,是一个线程池,用于分发消息, 消息是发送到对应的Actor的 MailBox
> 6. 会在底层创建SayHelloActor的MailBox 对象,该对象是一个队列,可接收Dispatcher Message 发送的消息
> 7. MailBox 实现了Runnable 接口,是一个线程,一直运行并调用Actor的receive 方法,因此当Dispather 发送消息到MailBox时,Actor 在receive 方法就可以得到信息
> 8. `sayHelloActorRef! "hello"`, 表示把hello消息发送到sayHelloActorRef的mailbox (通过Dispatcher Message 转发)
### 4. Actor模型应用实例-Actor间通讯
> 1. 编写2个 Actor , 分别是 DdActor和 MmActor
> 2. DdActor和MmActor之间可以相互发送消息
> 3. 加强对Actor传递消息机制的理解
#### DdActor
```scala
package com.atguigu.akka02.actor
import akka.actor.{Actor, ActorRef}
/**
* @Date 2021/4/6 15:38
* @Version 10.21
* @Author DuanChaojie
*/
class DdActor(mmActorRef: ActorRef) extends Actor {
val myMmActorRef = mmActorRef
override def receive: Receive = {
case "Go!" => {
println("3s后开启世界大战!")
Thread.sleep(3000)
myMmActorRef ! "DD"
}
case "MM" => {
Thread.sleep(1000)
println("MM:一起喵喵喵喵喵~")
myMmActorRef ! "DD"
}
}
}
```
#### MmActor
~~~~scala
package com.atguigu.akka02.actor
import akka.actor.Actor
/**
* @Date 2021/4/6 15:38
* @Version 10.21
* @Author DuanChaojie
*/
class MmActor extends Actor {
override def receive: Receive = {
case "DD" => {
Thread.sleep(1000)
println("DD:我们一起学猫叫~")
sender() ! "MM"
}
}
}
~~~~
#### Main
```scala
package com.atguigu.akka02.main
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka02.actor.{DdActor, MmActor}
import scala.io.StdIn
/**
* @Date 2021/4/6 15:40
* @Version 10.21
* @Author DuanChaojie
*/
object Main extends App {
private val actorFactory = ActorSystem("actorFactory")
private val mmActorRef: ActorRef = actorFactory.actorOf(Props[MmActor], "mmActor")
private val ddActorRef: ActorRef = actorFactory.actorOf(Props(new DdActor(mmActorRef)), "ddActor")
println(Console.GREEN + "请输入Start...")
val command = StdIn.readLine()
if (command.toLowerCase == "start") {
ddActorRef ! "Go!"
}
}
```
#### 效果图:
![image-20210406223923598](assets/image-20210406223923598.png)
#### 小结:
> 1. 两个Actor通讯机制和Actor 自身发消息机制基本一样,只是要注意如下
> 2. 如果DdActor 在需要给MmActor 发消息,则需要持有MmActor 的 ActorRef,可以通过创建时,传入MmActor的 代理对象(ActorRef)
> 3. 当MmActor 在receive 方法中接收到消息,需要回复时,可以通过sender() 获取到发送Actor的 代理对象。
### 5. Akka网络编程
> Akka支持面向大并发后端服务程序,网络通信这块是服务端程序重要的一部分。
>
> 网络编程有两种:
>
> 1. `TCP socket编程`,是网络编程的主流。之所以叫Tcp socket编程,是因为底层是基于Tcp/ip协议的. 比如: QQ聊天 [示意图]
> 2. `B/S结构的Http编程`,我们使用浏览器去访问服务器时,使用的就是Http协议,而http底层依旧是用tcp socket实现的。 比如: 京东商城 【属于 web 开发范畴 】
#### 网络编程基础知识
##### TCP/IP模型
> 1. ![image-20210406225122074](assets/image-20210406225122074.png)
>
>
>
> 2. 深入理解:qq间相互通讯的案例
> 3. ![image-20210406225227857](assets/image-20210406225227857.png)
> 4. tracert的使用案例:
> 5. ![image-20210406225335990](assets/image-20210406225335990.png)
##### IP地址
> 每个internet上的主机和路由器都有一个ip地址,它包括网络号和主机号,ip地址有ipv4(32位)或者ipv6(128位). 可以通过ipconfig 来查看
##### 端口port
> 1.
> 我们这里所指的端口不是指物理意义上的端口,而是特指TCP/IP协议中的端口,是逻辑意义上的端口。
> 2. 如果把IP地址比作一间房子,端口就是出入这间房子的门。真正的房子只有几个门,但是一个IP地址的端口可以有`65535`(即:`256×256-1`)个之多!端口是通过端口号来标记的。(端口号 0:Reserved)
> 3. ==端口port的分类==
> - 0号是保留端口
> - 1-1024是固定端口,又叫有名端口,即被某些程序固定使用,一般程序员不使用
> - 22: SSH远程登录协议
> - 23: telnet使用
> - 21: ftp使用
> - 25: smtp服务使用
> - 80: iis使用
> - 7: echo服务
> - 1025-65535是动态端口,这些端口,程序员可以使用。
> 4. 端口port使用注意
> 1. 在计算机(尤其是做服务器)要尽可能的少开端口
> 2. 一个端口只能被一个程序监听( )
> 3. 如果使用 netstat –an 可以查看本机有哪些端口在监听
> 4. 可以使用 netstat –anb 来查看监听端口的pid,在结合任务管理器关闭不安全的端口。
#### Akka网络编程-小黄鸡客服案例
> 需求分析:
>
> 1. 服务端进行监听(8888)
> 2. 客户端可以通过键盘输入,发送咨询问题给小黄鸡客服(服务端)
> 3. 小黄鸡(服务端) 回答客户的问题
##### 服务端--ServerMain
```scala
package com.atguigu.akka03.server
import akka.actor.{ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
object ServerMain extends App{
val serverHost = "127.0.0.1"
val serverPort = 8888
/**
* 对于此字符串中的每一行:
* 从行中删除由空格或控制字符('|')组成的前导前缀。
* 创建config对象,指定协议类型,监听的ip和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$serverHost
|akka.remote.netty.tcp.port=$serverPort
""".stripMargin)
private val serverActorFactory = ActorSystem("serverActorFactory",config)
private val yellowChickenServerRef: ActorRef = serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")
// ServerMain启动
yellowChickenServerRef ! "start".toLowerCase
}
```
##### 服务端--YellowChickenServer
```scala
package com.atguigu.akka03.server
import akka.actor.Actor
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}
/**
* @Date 2021/4/6 16:17
* @Version 10.21
* @Author DuanChaojie
*/
class YellowChickenServer extends Actor {
override def receive: Receive = {
case "start" => println(Console.BLUE + "YellowChickenServer已经启动....")
case ClientMessage(msg) => {
// TODO match模糊匹配
msg match {
case "java" => sender() ! ServerMessage("Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的高级程序设计语言。")
case "javascript" => sender() ! ServerMessage("JavaScript在1995年由Netscape公司的Brendan Eich,在网景导航者浏览器上首次设计实现而成。因为Netscape与Sun合作,Netscape管理层希望它外观看起来像Java,因此取名为JavaScript。但实际上它的语法风格与Self及Scheme较为接近。")
case "大数据" => sender() ! ServerMessage("大数据(big data),IT行业术语,是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。")
case "作者" => sender() ! ServerMessage("https://blog.csdn.net/weixin_45267102/article/details/111472987")
case _ => println("Nothing~")
}
}
}
}
```
##### 客户端--ClientMain
```scala
package com.atguigu.akka03.client
import akka.actor.{ActorRef, ActorSystem, Props}
import com.atguigu.akka01.main.SayHelloActorDemo.sayHelloActorRef
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
import scala.util.control.Breaks.{break, breakable}
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
object ClientMain extends App {
val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 6666, "127.0.0.1", 8888)
/**
* 对于此字符串中的每一行:
* 从行中删除由空格或控制字符('|')组成的前导前缀。
* 创建config对象,指定协议类型,监听的ip和端口
*/
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$clientHost
|akka.remote.netty.tcp.port=$clientPort
""".stripMargin)
val clientActorFactory = ActorSystem("clientActorFactory", config)
val yellowChickenClientRef: ActorRef = clientActorFactory.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient")
// ClientMain启动
yellowChickenClientRef ! "start".toLowerCase
// 向yellowChickenClientRef发送消息
breakable {
while (true) {
Thread.sleep(1000)
println(Console.GREEN + "请输入你咨询的问题(提示输入完毕按回车):")
val command = StdIn.readLine()
yellowChickenClientRef ! command
if (command == "exit") {
break()
}
}
}
}
```
##### 客户端--YellowChickenClient
```scala
package com.atguigu.akka03.client
import akka.actor.{Actor, ActorSelection}
import com.atguigu.akka03.common.{ClientMessage, ServerMessage}
/**
* @Date 2021/4/6 16:18
* @Version 10.21
* @Author DuanChaojie
*/
class YellowChickenClient(serverHost: String, serverPort: Int) extends Actor {
var yellowChickenServerRef: ActorSelection = _
/**
* 1、在Actor中有一个方法PreStart方法,他会在actor运行前执行
* 2、在akka的开发中,通常将初始化的工作,放在preStart方法
*/
override def preStart(): Unit = {
/** 注意:
* serverActorFactory 是server端 ActorSystem("serverActorFactory",config)
* user/后面是 serverActorFactory.actorOf(Props[YellowChickenServer],"yellowChickenServer")
*/
yellowChickenServerRef = context.actorSelection(s"akka.tcp://serverActorFactory@${serverHost}:${serverPort}/user/yellowChickenServer")
println(yellowChickenServerRef)
}
override def receive: Receive = {
case "start" => println(Console.BLUE + "YellowChickenClient已经启动....")
// 将咨询的问题发送到Server端
case msg: String => {
yellowChickenServerRef ! ClientMessage(msg.toLowerCase)
}
case ServerMessage(msg) => {
println(s"YellowChickenServer: $msg")
}
}
}
```
##### MessageProtocol
```scala
/**
* @Date 2021/4/6 16:19
* @Version 10.21
* @Author DuanChaojie
*/
class MessageProtocol {
}
/**
* 使用样例类来构建协议
* 客户端发给服务器协议(序列化的对象)
* @param mes
*/
case class ClientMessage(mes: String)
/**
* 服务端发给客户端的协议(样例类对象)
* @param mes
*/
case class ServerMessage(mes: String)
```
##### 项目结构图:
![image-20210406230448869](assets/image-20210406230448869.png)
##### 效果图:
![image-20210406230528305](assets/image-20210406230528305.png)
![image-20210406230612163](assets/image-20210406230612163.png)
### 6. Spark Master Worker 进程通讯项目
#### 项目意义:
> 1. 深入理解Spark的Master和Worker的通讯机制
> 2. 为了方便同学们看Spark的底层源码,命名的方式和源码几乎保持一致(如: 通讯消息类命名就是一样的)
> 3. ==加深对主从服务心跳检测机制(HeartBeat)的理解==,方便以后spark源码二次开发。
#### 项目需求分析:
![image-20210406230859899](assets/image-20210406230859899.png)
> 1. worker注册到Master, Master完成注册,并回复worker注册成功
> 2. worker定时发送心跳,并在Master接收到
> 3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间
> 4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从workers(HashMap)中删除
> 5. master worker 进行分布式部署(Linux系统)
#### 功能实现:
##### SparkMaster
```scala
package com.atguigu.spark.master
import akka.actor.{Actor, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, RemoveTimeOutWorker, StartTimeOutWorker, WorkerInfo}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.collection.mutable
/**
* @Date 2021/4/6 18:35
* @Version 10.21
* @Author DuanChaojie
*/
class SparkMaster extends Actor {
/**
* SparkMaster维护一个SparkWorker的mutable.map(id,WorkerInfo(id, cpu, ram))
*/
val workers = mutable.Map[String, WorkerInfo]()
override def receive: Receive = {
case "start" => {
println(Console.BLUE + "SparkMaster启动了...")
self ! StartTimeOutWorker
}
case RegisterWorkerInfo(id, cpu, ram) => {
// SparkMaster处理注册信息
if (!workers.contains(id)) {
val workerInfo = new WorkerInfo(id, cpu, ram)
// 将wokerInfo加入到workers中
workers += (id -> workerInfo)
// 告诉SparkWorker注册成功
sender() ! RegisteredWorkerInfo
}
println("workers = " + workers)
}
case HeartBeat(id) => {
val workerInfo = workers(id)
workerInfo.lastHeartBeat = System.currentTimeMillis()
println("SparkMaster更新了id = " + id + "的心跳时间为:" + workerInfo.lastHeartBeat)
}
case StartTimeOutWorker => {
import context.dispatcher
context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
}
case RemoveTimeOutWorker => {
//拿到所有的workerInfo
val workerInfos = workers.values
val deadWorkerInfos = workerInfos.filter(workerInfo => (System.currentTimeMillis() - workerInfo.lastHeartBeat) > 6000)
deadWorkerInfos.foreach(workerInfo => workers.remove(workerInfo.id))
println(s"当前有${workers.size}个sparkWorker存活~")
}
}
}
object SparkMaster {
/**
* @param args (0) serverHost args(1) serverPort
*/
def main(args: Array[String]): Unit = {
val masterHost = args(0)
val masterPort = args(1).toInt
/** 创建config对象,指定协议类型,监听的ip和端口 */
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$masterHost
|akka.remote.netty.tcp.port=$masterPort
""".stripMargin)
val sparkMasterSystem = ActorSystem("sparkMasterSystem", config)
val sparkMaster = sparkMasterSystem.actorOf(Props[SparkMaster], "sparkMaster")
sparkMaster ! "start"
}
}
```
##### SparkWorker
```scala
package com.atguigu.spark.worker
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.atguigu.spark.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
* @Date 2021/4/6 18:36
* @Version 10.21
* @Author DuanChaojie
*/
class SparkWorker(masterHost: String, masterPort: Int) extends Actor {
var sparkMaster: ActorSelection = _
override def preStart(): Unit = {
sparkMaster = context.actorSelection(s"akka.tcp://sparkMasterSystem@${masterHost}:${masterPort}/user/sparkMaster")
}
// 使用UUID生成sparkWorker的id
private val id: String = java.util.UUID.randomUUID().toString
override def receive: Receive = {
case "start" => {
println(Console.BLUE + "SparkMaster启动了...")
println("正在向SparkMaster注册自己...")
//RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
sparkMaster ! RegisterWorkerInfo(id, 4, 256 * 1024)
}
case RegisteredWorkerInfo => {
println("已经向SparkMaster注册成功!")
import context.dispatcher
context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
}
case SendHeartBeat => {
println("给SparkMaster发送心跳~")
sparkMaster ! HeartBeat(id)
}
}
}
object SparkWorker {
/**
* @param args
* workerHost args(0)
* workerPort args(1)
* masterHost args(2)
* masterPort args(3)
*/
def main(args: Array[String]): Unit = {
val (workerHost, workerPort, masterHost, masterPort) = (args(0),args(1).toInt,args(2),args(3).toInt)
/** 创建config对象,指定协议类型,监听的ip和端口 */
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$workerHost
|akka.remote.netty.tcp.port=$workerPort
""".stripMargin)
val sparkWorkerSystem = ActorSystem("sparkWorkerSystem", config)
val sparkWorker = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "sparkWorker")
sparkWorker ! "start"
}
}
```
##### MessageProtocol
~~~~scala
package com.atguigu.spark.common
/**
* @Date 2021/4/6 18:37
* @Version 10.21
* @Author DuanChaojie
* MessageProtocol.scala
*/
class MessageProtocol {
}
/**
* worker注册信息
* @param id
* @param cpu
* @param ram
*/
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
/**
* 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
* 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
* @param id
* @param cpu
* @param ram
*/
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
var lastHeartBeat : Long = System.currentTimeMillis()
}
/**
* 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
*/
case object RegisteredWorkerInfo
/**
* worker每隔一定时间由定时器发给自己的一个消息
*/
case object SendHeartBeat
/**
* worker每隔一定时间由定时器触发,而向master发现的协议消息
* @param id
*/
case class HeartBeat(id: String)
/**
* master给自己发送一个触发检查超时worker的信息
*/
case object StartTimeOutWorker
/**
* master给自己发消息,检测worker,对于心跳超时的
*/
case object RemoveTimeOutWorker
~~~~
##### 项目结构图:
![image-20210406231338055](assets/image-20210406231338055.png)
##### 项目效果图:
![image-20210406231424443](assets/image-20210406231424443.png)
![image-20210406231527269](assets/image-20210406231527269.png)
![image-20210406231540071](assets/image-20210406231540071.png)、
##### 进行分布式部署(Linux系统)
> 打包
![image-20210406231833163](assets/image-20210406231833163.png)
> 更名后上传到服务器(linux)上:
~~~SHELL
#依次执行以下命令
java -jar SparkMaster.jar 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 6666 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 5555 127.0.0.1 7777
java -jar SparkWorker.jar 127.0.0.1 4444 127.0.0.1 7777
~~~
> 效果图如下:
![image-20210406224411623](assets/image-20210406224411623.png)
![image-20210406224437635](assets/image-20210406224437635.png)
![image-20210406224501292](assets/image-20210406224501292.png)
![image-20210406224530020](assets/image-20210406224530020.png)
## ☆