Akka Study - 白墨的博客

Akka 用于构建高并发、分布式且具有容错机制的事件驱动型的应用,本文是 Scala Cookbook 一书中 Akka 部分内容的总结。

Akka Guide

Actor 模型与线程比较,是一种高层次的抽象。说 “高层次的抽象”这句话,就意味着这个东西简单易用,你不需要考虑太多底层的其他东西。所以如果理解了 Actor 模型的原理,就可以专注解决问题,而不需要把注意力放在像线程、锁和共享数据这样低层次的问题上。

优势
轻量级的事件驱动模型:对相应的消息事件做出异步回应,并且消息是不可变的,因为需要在不同线程之间共享。
容错性:Akka 可以用于构造 “自我修复型” 系统。
位置透明性:Akka actors 可以跨过多个 JVM 或者 服务器,其被设计为通过纯粹的消息在分布式系统中进行沟通。

在 Actor 模型中,消息是唯一的沟通方式,你要做任何事情都需要发送消息,不能直接更改 Actor 内部的状态或者使用其内部的资源。

Actor 模型简介
1、Actor 是 actor 系统中的最小单位,就类似对象是 OOP 系统中的最小单位一样。
2、也正如一个对象,一个 actor 的状态和行为也是被封装起来的。你不能直接去访问或执行一个 actor 的方法或者是成员对象(field),而只能是向这个 actor 发送一个要求访问其内部状态的消息,正如你不可能有读心术去了解别人心里是怎么想的,而只能问对方你的感受如何。
3、每一个 actor 都有一个邮箱用来接收消息,而它们的日常工作就是处理这些消息。
4、你与每个 actor 交互的方式就是向其发送一条不可变的消息,这些消息会进入到这个 actor 的邮箱中。
5、actor 也有自己的层级关系,类似一个公司的组织结构。一个 actor 会有父 actor 或者子 actor 或者兄弟 actor,正如一个高管会有董事长管他,会有其他高管与之一起合作,也会有自己的下属。这些组织结构就是为了实现一个目的:“代理委派”,让所有 actor 合作起来使系统正常运转。
6、最后一点就是 actor 模型的异常处理,当 actor 处理消息时,某些地方可能会出错,可能会抛出一些异常,那么此时怎么处理呢?通常做法是当前 actor 会停止其本身和其子 actor 的所有操作,然后向其父 actor 发送一条通知错误信息的消息,然后父节点根据不同情况做出处理,一般处理方式有以下几种:

  1. 恢复子 actor,所有状态保持不变
  2. 重启子 actor,所有状态清空,从零开始
  3. 终止子 actor (当一个 actor 终止的时候,其邮箱內的消息会进入 dead letter 邮箱)。
  4. 把错误抛给上一级 actor 处理

Actor 之间如何沟通?

一个基于 actor 的系统当然会包含很多 actor,那么它们之间是怎么沟通的呢,让通过一个 ping-pong 的例子来了解一下。

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

object extends App {
val system = ActorSystem("PingPongSystem")
val pong = system.actorOf(Props[Pong], name = "pong")
val ping = system.actorOf(Props(new Ping(pong)), name = "ping")
ping ! StartMessage
}

case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage

class Ping(pong: ActorRef) extends Actor {
var count = 0
def incrementAndPrint{ count += 1; println("ping") }
def receive = {
case StartMessage =>
incrementAndPrint
pong ! PingMessage
case PongMessage =>
incrementAndPrint
if (count > 99) {
sender ! StopMessage
println("ping stopped")
context.stop(self)
} else {
sender ! PingMessage
}
case _ => println("Ping got something unexpected.")
}
}

class Pong extends Actor {
def receive = {
case PingMessage =>
println(" pong")
sender ! PongMessage
case StopMessage =>
println("pong stopped")
context.stop(self)
case _ => println("Pong got something unexpected.")
}
}

这个例子做了什么呢?首先创建一个名为 “PingPongSystem” 的 ActorSystem,然后用这个 ActorSystem,然后用这个 分别创建 ping 和 pong actor,然后这两个 actor 之间互相发送消息,输出如下:

ping
pong
ping
pong
...
ping stopped
pong stopped

每一个基于 actor 模型的应用一般都会有一个 ActorSystem,然后用这个 作为最*的 actor,其可以管理所有的子 actor,并可以创造其他 actor。对于普通的 actor,例如 ping 和 pong,它们本身必须可以接受消息并处理,所以每一个 actor 都需要你重写 receive 方法,里面是具体的针对各种消息的处理逻辑。

那么如何发送消息呢?上面的例子是这样做的:

pong ! PingMessage

意思就是向 pong 这个 actor 发送 PingMessage

上面的例子还用到了 sender

sender ! PingMessage

当一个 actor A 接收到另一个 actor B 的消息时,它同时会接收到这个 actor B 的隐式引用叫 sender,你可以使用这个引用再往这个 actor B 回发一条消息。

Actor 的生命周期

除了构造器以外,下面的方法包含了 Actor 的整个生命周期:

receive
preStart
postStop
preRestart
postRestart

还是通过一个例子来了解一下:

import akka.actor.{Actor, ActorSystem, Props}

object LifecycleDemo extends App {
val system = ActorSystem("LifecycleDemo")
val kenny = system.actorOf(Props[Kenny], name = "Kenny")

println("sending kenny a simple String message")
kenny ! "hello"
Thread.sleep(1000)

println("make kenny restart")
kenny ! ForceRestart
Thread.sleep(1000)

println("stopping kenny")
system.stop(kenny)

println("shutting down system")
system.terminate
}

case object ForceRestart

class Kenny extends Actor {
println("entered the Kenny constructor")
override def preStart() { println("Kenny: preStart") }
override def postStop() { println("Kenny: postStop") }

override def preRestart(reason: Throwable, message: Option[Any]) {
println("Kenny: preRestart")
println(s" MESSAGE: ${message.getOrElse("")}")
println(s" REASON: ${reason.getMessage}")
super.preRestart(reason, message)
}

override def postRestart(reason: Throwable) {
println("Kenny: postRestart")
println(s" REASON: ${reason.getMessage}")
super.postRestart(reason)
}

override def receive: Receive = {
case ForceRestart => throw new Exception("Boom!")
case _ => println("Kenny received a message")
}
}

输出如下:

entered the Kenny constructor
sending kenny a simple String message
Kenny: preStart
Kenny received a message
make kenny restart
Kenny: preRestart
MESSAGE: ForceRestart
REASON: Boom!
Kenny: postStop
entered the Kenny constructor
Kenny: postRestart
REASON: Boom!
Kenny: preStart
[ERROR] [05/21/2018 23:21:39.109] [LifecycleDemo-akka.actor.default-dispatcher-4] [akka:
java.lang.Exception: Boom!
at ink.baixin.akka.example.Kenny$$anonfun$receive$1.applyOrElse(LifecycleDemo.scala:45)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at ink.baixin.akka.example.Kenny.aroundReceive(LifecycleDemo.scala:26)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at akka.actor.ActorCell.invoke(ActorCell.scala:559)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

stopping kenny
shutting down system
Kenny: postStop

让我们来分析一下整个过程:
首先创建一个 actor,进入其构造器创建一个 actor 实例;然后启动这个 actor,启动之前会先调用该实例的 preStart 方法;之后我们向这个 actor 发送了一条 String 类型的消息,actor 调用其 receive 方法进行处理;然后我们向这个 actor 发送了 ForceRestart 的消息用于重启,该消息会抛出一个异常,当 actor 检测到异常时,就会启动自动重启机制,调用实例自身的 preRestart 方法,此时会终止该 actor 实例及其所以子 actor 然后调用 postStop 方法;这个时候之前的 actor 实例的生命周期就彻底结束了,会创建一个新的 actor 实例,创建之后调用这个新实例的 postRestart 方法,然后再调用其 preStart 方法,之后新的 actor 就启动了,最后我们通过 ActorSystem 停止了这个 actor 实例,调用该实例的 postStop 方法,这就是上面 actor 经历的完整生命周期。

可以看到,一个 actor restart 前后实际上是两个不同的实例。

Start and Stop an Actor

启动 actor

使用 ActorSystem.actorOf 启动
可以使用 ActorSystem 创建其他普通 actor

val system = ActorSystem("HelloSystem")
// the actor is created and started here
大专栏
上一篇:流畅的python,Fluent Python 第十二章笔记 (继承)


下一篇:解决Intellij IDEA中Mybatis Mapper自动注入警告