一个事务必须具有以下四个特点,即所谓的ACID特性:
- 原子性:所有的操作要么全部成功,要么全部失败。
- 一致性:在事务完成后,系统保持一致性状态。
- 隔离性:在一个事务成功或失败前,产生的数据对于系统中的其他事务是不见的。
- 持久性:事务操作的结果要持久化保存。
Akka使用(Software Transactional Memory)软件事务内存来实现事务。这是一种多线程之间数据共享的同步机制。对于并行计算编程而言,只要将线程中需要 访问共享内存的关键逻辑 部分划分出来封装到一个事务中即可。
传统的保护共享数据的方法就是加同步锁。在java或Scala中以synchronized同步代码块的形式来实现:
var seats = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
val reservedSeat = seats.synchronized {// seats的synchronized构建临界区
val head = seats.head// 取出第一个元素
seats = seats.tail// 把除取出的第一个元素外的元素重新赋值给synchronized
head
}
所有的线程在synchronized块上面是依次执行的,保证了在同一时刻只有一个线程访问共享变量,确保了共享变量的一致性。但是如果有线程只是想去读取共享变量,而不是要去修改时,遇到synchronized块也是要等待的,这样会降低了系统的整体性能,这种锁,叫做“悲观锁”,这种锁它会假设在任何时候都可能会有线程去修改共享变量。
相应的就会有**“乐观锁”**,乐观锁认为在访问和修改共享变量时,都不会产生任何问题。因此在执行代码时不会有任何锁。在乐观锁的实现中,当线程离开了临界区时,系统会检测可能的更新冲突,如果检测不到更新冲突,那么就直接提交事务,如果检测到有冲突发生,那么所有的改变都会回滚并尝试重新执行临界区代码。
STM使用的是乐观锁。Akka通过将共享变量包装到STM的引用中,检测共享数据在事务中是否已经发生改变,从而能防止因多线程访问共享变量造成的数据不一致的问题。
要使用STM必须在build.sbt中加入:
libraryDependencies += "com.typesafe.akka" %% "akka-agent" % "2.5.23"
未例代码:
import concurrent.stm._
object HelloScala {
def main(args: Array[String]): Unit = {
// 使用Ref包裹变量
val mySeq = Seq[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
val seats = Ref(mySeq)
// 在atomic块中使用Ref变量示例
val getSeat = atomic{implicit txt =>
val head = seats().head// 取出第一个值
// 共享变量
seats() = seats().tail// 重新赋值
head
}
println(getSeat)
}
}
上面的seats变量,只能够中atomic块中使用,在atomic块的代码将被视为一个原子命令被执行。在编译时,atomic块需要一个隐式变量如上述的txt来为Ref中手冲突做检测。上述代码跟synchronized做的是同样的事情,但是工作机制完全不同。STM使用的是乐观锁,当atomic块执行完成后,有一个检查将会执行,这个操作就是去检查是否有冲突发生。ACID乐观锁实现了三个,没有实现持久化,因STM都是发生在内存中,内存中的事务永远都不会持久化。面使用synchronized的临界区只有执行一次,使用的是悲观锁。
有时候,我们只想读取共享变量,而不做任何改变。我们就可以使用Ref.View来读取共享变量来提高性能:
println(seats.single.get)// 得到seats视图,调用视图上的get方法来获取值
println(seats.single.get.head)
println(seats.single.get)
读取Agent事务中的数据
Akka中的Agent和Actor都是基于STM来处理事务的。akka的Agent提供了一个独立于位置的异步操作,所有对Agent的操作都是异步的。
import akka.agent.Agent
import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
case class Seat(var a: Int)
object HelloScala {
def main(args: Array[String]): Unit = {
val agent = Agent(50)// 创建Agent
println(agent())// 使用agent()方式读取Agent数据
println(agent.get)// 使用get()方式读取Agent数据
agent send(888)// 使用send修改Agent的值
val future1 :Future[Int] = agent alter(_+100)// 修改Agent数据
println(future1 foreach println)
}
}