Subsections
线程安全(Thread safety)
锁(lock)
共享对象
对象组合
基础构建模块
任务执行
取消和关闭
线程池的使用
性能与可伸缩性
并发程序的测试
显示锁
原子变量和非阻塞同步机制
一、线程安全(Thread safety)
无论何时,只要多于一个线程访问给定的状态变量。而且其中某个线程会写入该变量,此时必须使用同步来协助线程对该变量的访问。
线程安全是指多个线程在访问一个类时,如果不需要额外的同步,这个类的行为仍然是正确的。
线程安全的实例:
(1)、一个无状态的类是线程安全的。
无状态类是指不包含任何域或也没有引用其它类的域。一次特定计算的瞬间状态,会唯一存在本地变量中。
(2)、原子操作是线程安全的。
自增操作时一个离散操作的简写形式,获取当前值,加一,写回新值。这是一个读-改-写操作,不具备原子性。
(3)、竞争条件是不安全的。
当计算的正确性依赖于运行时相关的时序或者多线程的交替时,会产生竞争条件。最常见的一种是检查再运行(check-then-act)。
eg:
public class Instance() { private Instance in = null; public Instance getInstance() { if(in == null) { in= new Instance(); } return in; } }
这个例子意图是想得到一个单例的对象,但是如果两个线程同时执行到getInstance()这个地方,此时此刻,in是否为null,这依赖于时序。这是无法预测到的。
解决检查再运行和读-改-写操作导致的线程不安全方法,就是必须保证操作的原子性。而java 内置的原子性机制-锁可以解决这些个问题。
二、锁(lock)
(1)、内部锁,java提供了强制原子性的内置锁机制:synchronized 块。
操作共享状态的复合操作必须是原子的,以避免竞争条件,比如读-改-写操作和检查再运行操作。复合操作会在完整的运行期占有锁,以确保其行为为原子的。
三、共享对象
要编写正确的程序,关键问题在于:在访问共享的可变状态时需要进行正确的管理。
(1)、内存可见性
我们不仅希望防止某个线程在访问某个状态,而另外一个线程同时在修改这个状态。同时希望确保一个线程修改了对象状态以后,其它线程能够看到发生的状态变化。
在没有同步的情况下,编译器,处理器对操作的执行顺序进行一些意想不到的调整。在缺少同步多线程程序中,要想对内存操作的执行顺序进行判断,几乎无法得到正确的结论。
(2)、失效数据
在缺少同步的程序中产生错误的结果的一种情况就是,失效数据。
(3)、加锁与可见性
锁可以确保某个线程以一种可以预测的方式来查看另一个线程的执行结果,比如A执行某个同步代码块时,线程B随后进入同一锁保护的代码块。在B执行锁保护的同步代码块时,可以看到线程A之前在同一代码块中所有的操作结果。
为什么在访问某个共享且可变的共享变量时,要求线程在同一个锁上同步,是为了确保某个线程写入该变量的值对于其它线程来说是可见的。否则,如果一个线程在未持有正确锁的情况下读取某个变量,那么读到的可能是一个失效值。
(4)、发布和逸出
(5)、线程封闭
当访问共享的可变数据时,通常需要同步。一种避免使用同步的方式就是不共享数据。如果仅在单线程内访问数据,就不需要同步。这种技术称为线程封闭。它是线程安全最简单方式之一。
eg:JDBC 的Connection对象,servlet请求大部分都是单线程同步方式处理,并且在Connection对象返回之前,连接池不会再将它分配给其它线程。
维持线程封闭更规范的方法是使用ThreadLocal
其设计的初衷是为了解决多线程编程中的资源共享问题。提起这个,大家一般会想到synchronized,synchronized采取的是“以时间换空间”的策略,本质上是对关键资源上锁,让大家排队操作。而ThreadLocal采取的是“以空间换时间”的思路,为每个使用该变量的线程提供独立的变量副本,在本线程内部,它相当于一个“全局变量”,可以保证本线程任何时间操纵的都是同一个对象。
ThreadLocal有四个方法:
//返回此线程局部变量的当前线程副本中的值
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) return (T)e.value; } return setInitialValue(); }
//如果是第一次调用,需要初始化。
private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; }//将此线程局部变量的当前线程副本中的值设置为指定值
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); }//移除此线程局部变量的值。
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }数据库连接管理类,转载:http://blog.csdn.net/ghsau/article/details/15732053
public class ConnectionManager { /** 线程内共享Connection,ThreadLocal通常是全局的,支持泛型 */ private static ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>(); public static Connection getCurrConnection() { // 获取当前线程内共享的Connection Connection conn = threadLocal.get(); try { // 判断连接是否可用 if(conn == null || conn.isClosed()) { // 创建新的Connection赋值给conn(略) // 保存Connection threadLocal.set(conn); } } catch (SQLException e) { // 异常处理 } return conn; } /** * 关闭当前数据库连接 */ public static void close() { // 获取当前线程内共享的Connection Connection conn = threadLocal.get(); try { // 判断是否已经关闭 if(conn != null && !conn.isClosed()) { // 关闭资源 conn.close(); // 移除Connection threadLocal.remove(); conn = null; } } catch (SQLException e) { // 异常处理 } } }
(6)、不变性
满足同步需求的另外一种方法是使用不可变对象。不可变对象一定是线程安全的。
不可变对象的定义:对象创建出来后(通过构造方法)无论如何对此对象进行非暴力操作(不用反射),此对象的状态(实例域的值)都不会发生变化,那么此对象就是不可变的,相应类就是不可变类,跟是否用 final 修饰没关系。
在并发程序中使用和共享对象时,可以使用一下策略:
线程封闭。线程封闭的对象只能由一个线程拥有。
只读共享。在没有额外同步情况下,共享的只读对象可以由多个线程并发访问。
线程共享安全。线程安全的对象在其内部实现同步。因此多个线程可以通过对象的公有接口来访问而不需要进一步的同步。
四、对象组合
(1)、如何设计线程安全的类。
设计安全类需要注意一下三要素:
找出构造对象状态的所有变量。
约束状态变量的不变性条件。
建立对象状态的并发访问管理策略。
(2)、实例封闭
如果某对象不是线程安全的,我们可以通过多种技术使其在多线程中能安全的使用。确保该对象只能由单个线程访问。
public class PersonSet{ private final Set<Person> mySet = new HashSet<Person>(); public sychronized void addPersion(Person p) { mySet.add(p) } public sychronized boolean containsPerson(Person p) { return mySet.contains(p); } }
虽然HashSet 并非线程安全的。但是mySet是私有的不会逸出。唯一能访问mySet的代码是addPerson(),和containsPerson()。在执行上他们都要获的PersonSet 上的锁。PersonSet的状态完全又它的内置锁保护。所以
PersonSet是一个线程安全的类。
java 平台的类库有很多实例封闭的例子。比如一些基本的容器并非线程安全的,如ArrayList,HashMap。类库提供的包装方法,Collections.synchronizedList(list)、Collections.synchronizedMap(m)使得非线程安全的类可以在多线程中使用。
(3)、java 监视器模式
把对象的所有可变状态都封装起来,并由对象自己的内部锁来保护。
public class privateLock { private final Object myLock = new Object(); private int weight; void someMethod() { synchronized(myLock) { //访问weight } } }使用私有锁对象比使用对象的内置锁有许多优点。私有锁可以将锁封装起来,客户代码无法得到锁。但客户可以通过公有方法来访问锁。以便参与到同步策略中去。
监视器好比一做建筑,它有一个很特别的房间,房间里有一些数据,而且在同一时间只能被一个线程占据,进入这个建筑叫做"进入监视器",进入建筑中的那个特别的房间叫做"获得监视器",占据房间叫做"持有监视器",离开房间叫做"释放监视器",离开建筑叫做"退出监视器".
如上图所示,一个线程通过1号门进入Entry Set(入口区),如果在入口区没有线程等待,那么这个线程就会获取监视器成为监视器的owner,然后执行监视区域的代码。如果在入口区中有其它线程在 等待,那么新来的线程也会和这些线程一起等待。线程在持有监视器的过程中,有两个选择,一个是正常执行监视器区域的代码,释放监视器,通过5号门退出监视 器;还有可能等待某个条件的出现,于是它会通过3号门到Wait Set(等待区)休息,直到相应的条件满足后再通过4号门进入重新获取监视器再执行。
注意:当一个线程释放监视器时,在入口区和等待区的等待线程都会去竞争监视器,如果入口区的线程赢了,会从2号门进入;如果等待区的线程赢了会从4 号门进入。只有通过3号门才能进入等待区,在等待区中的线程只有通过4号门才能退出等待区,也就是说一个线程只有在持有监视器时才能执行wait操作,处于等待的线程只有再次获得监视器才能退出等待状态。
五、基础构建模块
(1)、同步容器类。包括Vector和Hashtable。同步的封装容器类由Collections.sychronizedXXX工厂方法创建。
eg:synchronizedList,synchronizedMap(m)、synchronizedSet(s)
(2)、同步工具类。
阻塞队列(BlockingQueue(LinkedBlockingQueue,ArrayBlockingQueue,PriorityBlockingQueue,SynchronousQueue))不仅可以保存对象的容器,而且还可以协调生产者和消费者之间的控制流。
信号量(Semaphore):用来控制同时访问某个特定资源的操作数量。通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。Semaphore允许线程获取许可, 未获得许可的线程需要等待.这样防止了在同一时间有太多的线程执行。Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中
的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
eg:模拟30辆车去泊车,而车位有10个的场景. 当车位满时,出来一辆车,才能有一辆车进入停车. 转载http://mouselearnjava.iteye.com/blog/1921468
package my.concurrent.semaphore; import java.util.concurrent.Semaphore; public class Car implements Runnable { private final Semaphore parkingSlot; private int carNo; /** * @param parkingSlot * @param carName */ public Car(Semaphore parkingSlot, int carNo) { this.parkingSlot = parkingSlot; this.carNo = carNo; } public void run() { try { parkingSlot.acquire(); parking(); sleep(300); parkingSlot.release(); leaving(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void parking() { System.out.println(String.format("%d号车泊车", carNo)); } private void leaving() { System.out.println(String.format("%d号车离开车位", carNo)); } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } package my.concurrent.semaphore; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class ParkingCars { private static final int NUMBER_OF_CARS = 30; private static final int NUMBER_OF_PARKING_SLOT = 10; public static void main(String[] args) { /* * 采用FIFO, 设置true */ Semaphore parkingSlot = new Semaphore(NUMBER_OF_PARKING_SLOT, true); ExecutorService service = Executors.newCachedThreadPool(); for (int carNo = 1; carNo <= NUMBER_OF_CARS; carNo++) { service.execute(new Car(parkingSlot, carNo)); } sleep(3000); service.shutdown(); /* * 输出还有几个可以用的资源数 */ System.out.println(parkingSlot.availablePermits() + " 个停车位可以用!"); } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行结果:
1号车泊车
4号车泊车
9号车泊车
2号车泊车
8号车泊车
10号车泊车
3号车泊车
12号车泊车
14号车泊车
6号车泊车
2号车离开车位
4号车离开车位
6号车离开车位
1号车离开车位
9号车离开车位
3号车离开车位
5号车泊车
8号车离开车位
10号车离开车位
11号车泊车
7号车泊车
12号车离开车位
13号车泊车
14号车离开车位
16号车泊车
17号车泊车
20号车泊车
19号车泊车
18号车泊车
15号车泊车
5号车离开车位
20号车离开车位
18号车离开车位
22号车泊车
11号车离开车位
7号车离开车位
13号车离开车位
15号车离开车位
21号车泊车
26号车泊车
23号车泊车
28号车泊车
25号车泊车
16号车离开车位
27号车泊车
17号车离开车位
30号车泊车
24号车泊车
29号车泊车
19号车离开车位
25号车离开车位
24号车离开车位
22号车离开车位
26号车离开车位
28号车离开车位
30号车离开车位
21号车离开车位
23号车离开车位
27号车离开车位
29号车离开车位
10 个停车位可以用!
4号车泊车
9号车泊车
2号车泊车
8号车泊车
10号车泊车
3号车泊车
12号车泊车
14号车泊车
6号车泊车
2号车离开车位
4号车离开车位
6号车离开车位
1号车离开车位
9号车离开车位
3号车离开车位
5号车泊车
8号车离开车位
10号车离开车位
11号车泊车
7号车泊车
12号车离开车位
13号车泊车
14号车离开车位
16号车泊车
17号车泊车
20号车泊车
19号车泊车
18号车泊车
15号车泊车
5号车离开车位
20号车离开车位
18号车离开车位
22号车泊车
11号车离开车位
7号车离开车位
13号车离开车位
15号车离开车位
21号车泊车
26号车泊车
23号车泊车
28号车泊车
25号车泊车
16号车离开车位
27号车泊车
17号车离开车位
30号车泊车
24号车泊车
29号车泊车
19号车离开车位
25号车离开车位
24号车离开车位
22号车离开车位
26号车离开车位
28号车离开车位
30号车离开车位
21号车离开车位
23号车离开车位
27号车离开车位
29号车离开车位
10 个停车位可以用!
六:任务执行
1、在线程中执行任务
首先需要找出清晰的任务边界,各个任务之间是相互独立的。任务并不依赖其它任务的状态、结果和边界。独立性有助于实现并发。在调度和负载均衡中实现更高的灵活性。在正常负载下,服务器应用程序应该表现良好的吞吐量和快速的响应性。
eg:主线程接受连接和处理请求之间不断交替运行。当服务器正在处理请求的时候,新到来的连接必须等待处理完以后才能在此调用accept()。此种方式,服务器的资源利用率非常低。
class singleThreadWebServer { ServerSocket socket = new ServerSocket(80) ; while (true) { Socket connection = socket.accept(); handleRequest(connection); } }
改进:
class MultiThreadWebServer { ServerSocket socket = new ServerSocket(80) ; while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } } new Thread(task).start(); } }
结论:任务处理从主线程分离出来。使住线程能够在完全前面的请求之前可以接受新的请求,从而提高响应性。
任务可以并行处理,从而同时服务多个请求,如果有多个处理器,或者某个任务某种原因被阻塞,程序的吞吐量提高。
2、Executor 框架
先给一张java.util.concurrent 的结构
Executor 基于生产者-消费者模式。提交任务相当是生产者,执行任务相当是消费者
a、执行策略:
任务在什么(What)线程中执行
任务以什么(What)顺序执行(FIFO/LIFO/优先级等)
同时有多少个(How Many)任务并发执行
允许有多少个(How Many)个任务进入执行队列
系统过载时选择放弃哪一个(Which)任务,如何(How)通知应用程序这个动作
任务执行的开始、结束应该做什么(What)处理
b、线程池:
线程池和工作者队列密切相关,工作者线程的任务:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
Executors类里面提供了一些静态工厂,生成一些常用的线程池。
newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
c、线程池Executor任务拒绝策略
java.util.concurrent.RejectedExecutionHandler描述的任务操作。
第一种方式直接丢弃(DiscardPolicy)
第二种丢弃最旧任务(DiscardOldestPolicy)
第三种直接抛出异常(AbortPolicy)
第四种任务将有调用者线程去执行(CallerRunsPolicy)
d、生命周期
java.util.concurrent.ExecutorService 接口对象来执行任务,该接口对象通过工具类java.util.concurrent.Executors的静态方法来创建。 Executors此包中所定义的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 类的工厂和实用方法。
ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。
shutdown():执行平缓的关闭过程,不再接受新的任务,同时等待已经提交的任务执行完成。
shutdownNow();执行粗暴的关闭过程,尝试取消所有运行中的任务,并且不再启动队列中尚未开始启动的任务。
awaitTermination: 这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。
// 创建一个固定大小的线程池
ExecutorService service = Executors. newFixedThreadPool(3); for ( int i = 0; i < 10; i++) { System. out.println( "创建线程" + i); Runnable run = new Runnable() { @Override public void run() { System. out.println( "启动线程"); } }; // 在未来某个时间执行给定的命令 service.execute(run); } // 关闭启动线程 service.shutdown(); // 每隔1秒监测一次ExecutorService的关闭情况. service.awaitTermination(1, TimeUnit. SECONDS); System. out.println( "all thread complete"); System. out.println(service.isTerminated());
3、携带结果的任务callable 和Future
七:取消和关闭
要想使任务和线程安全、快速、可靠的停下来,并不是件容易的事情,java没有提供任何机制来安全终止线程。但他提供了中断(Interruption),一种协作机制,能使线程终止另外一个线程的当前工作。
任务取消
中断
中断策略
响应中断
八、线程池的使用
九、避免活跃性危险
我们使用加锁机制来确保线程安全,但如果过度使用,会导致顺序死锁(Lock-Ordering Deadlock)。我们使用线程池和信号量来限制对资源的限制。但这些被限制的行为可能导致资源死锁(Resource
Deadlock)。
死锁:每个人拥有其他人需要的资源,同时等待其他人已经拥有的资源,并且每个人在获取所有需要的资源之前都不放弃已经拥有的资源。
饥饿:当线程无法访问到它所需要的资源而不能继续执行时,就会发生饥饿。引发饥饿的最常见资源就是CUP的时钟周期。
活锁:liveLock。改问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复执行相同的操作,而且总会失败。
十、性能与可伸缩性
可伸缩性:当增加计算资源时,(CPU,内存,存储容量和I/O宽带),程序的吞吐量和处理能力响应的增加。
吞吐量:指一组并发任务中已完成任务所占的比例。
响应性:指请求从发出到完成所需要的时间。
引入线程存在的开销:
(1)、上下文切换。cpu在做线程切换的时候,需要保存当前线程执行的上下文,并且新调度进来的线程执行上下文设置为当前上下文。发生越多的上下文切换,增加了调度开销,并因此降低吞吐量。
(2)、内存同步。synchronized 发生锁竞争的地方带来的开销会影响其它线程的性能。
(3)、阻塞。当在锁上发生竞争时,竞争失败的线程会阻塞,JVM 通过循环,不断的尝试获取锁,直到成功。或者通过操作系统挂起阻塞的线程。如果时间短,采用等待方式,如果时间长才适合采用线程挂起的方式。
串行操作降低可伸缩性,并行切换上下文也会降低性能。在锁发生竞争时,会同时导致上面两种问题,因此,减少锁的竞争能够提高性能和收缩性。在并发程序中,对可伸缩性最主要的威胁就是独占方式的资源锁。
两个因素将影响锁上面发生竞争的可能性:锁的请求频率以及每次持有该锁的时间。如果两者的乘积很小,那么大多数获取锁操作都不会发生竞争。
三种方式可以降低锁的竞争程度:
(1)、降低锁的请求频率。
降低线程请求锁的频率,可以通过锁分解和锁分段等技术来实现。即减小锁的粒度。如果一个锁同时需要保护好几个状态变量,那么可以把这个锁分解成多个锁,并且每个锁只保护一个状态变量,从而提高可伸缩性,并最终降低每个锁的请求频率。但是使用的锁越多,发生死锁的风险也会越高。
(2)、减少锁的持有时间。
缩小锁的范围(快进快出),可以将一些与锁无关的代码移出同步代码块,尤其是开销较大的操作,以及可能被阻塞的操作,比如I/O 操作。
(3)、放弃使用独占锁,并发容器,读-写锁,不可变对象以及原子变量,
十一、并发程序的测试
十二、显示锁
访问共享对象可以使用的机制有synchronized,volatile,ReentrantLock。
有了synchronized 为啥JSR 166 小组花了这么多时间来开发 java.util.concurrent.lock 框架呢?答案很简单-同步是不错,但它并不完美。它有一些功能性的限制 —— 它无法中断一个正在等候获得锁的线程,也无法通过投票得到锁,如果不想等下去,也就没法得到锁。同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行,多数情况下,这没问题(而且与异常处理交互得很好),但是,确实存在一些非块结构的锁定更合适的情况。
Lock 和ReentrantLock
Lock接口中定义了一组抽象的加锁操作。Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁后去操作。ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票。
格式:
Lock lock = new ReentrantLock();
lock.lock();
try {
// update object state
}
finally {
lock.unlock();
}
必须在finally 中来释放Lock。
访问共享对象可以使用的机制有synchronized,volatile,ReentrantLock。
有了synchronized 为啥JSR 166 小组花了这么多时间来开发 java.util.concurrent.lock 框架呢?答案很简单-同步是不错,但它并不完美。它有一些功能性的限制 —— 它无法中断一个正在等候获得锁的线程,也无法通过投票得到锁,如果不想等下去,也就没法得到锁。同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行,多数情况下,这没问题(而且与异常处理交互得很好),但是,确实存在一些非块结构的锁定更合适的情况。
Lock 和ReentrantLock
Lock接口中定义了一组抽象的加锁操作。Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁后去操作。ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票。
格式:
Lock lock = new ReentrantLock();
lock.lock();
try {
// update object state
}
finally {
lock.unlock();
}
必须在finally 中来释放Lock。
定时锁和轮询锁是为了避免死锁的发生。如果不能获取所需要的锁,可以使用定时的或者轮询的锁获取方式,从而使你重新获的控制权。
(1)、轮询锁( tryLock())
(2)、定时锁(tryLock(timeout, NANOSECONDS))。如果操作不能在指定的时间内给出结果,那么就会使程序提前结束
(3)、定时以及可中断的锁(lockInterruptibly)
(4)、读写锁(ReadWriteLock)。可以被多个读者访问或者被一个写者访问。
ReentrantLock 每次只能一个线程访问加锁的数据,从而达到维护数据完整性的目的。通过这种策略可以避免写/写和写/读冲突。但是也同时避免了读/读冲突。但是有些时候读操
作是可以被并发进行的。所以需要读写锁。
eg:实现一个简单的缓存。
十三、原子变量和非阻塞同步机制
(1)、原子变量
大多数现代处理器都包含对多处理的支持。当然这种支持包括多处理器可以共享外部设备和主内存,同时它通常还包括对指令系统的增加来支持多处理的特殊要求。特别是,几乎每个现代处理器都有通过可以检测或阻止其他处理器的并发访问的方式来更新共享变量的指令。
现在的处理器(包括 Intel 和 Sparc 处理器)使用的最通用的方法是实现名为“比较并交换(Compare And Swap)”或 CAS 的原语。
现在的处理器(包括 Intel 和 Sparc 处理器)使用的最通用的方法是实现名为“比较并交换(Compare And Swap)”或 CAS 的原语。
CAS 操作包含三个操作数—— 内存位置(V)、预期原值(A)和新值(B)。通常将 CAS 用于同步的方式是从地址
V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。
类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。
eg:模拟CAS来实现一个计数器。
public class CASCount implements Runnable { private SimilatedCAS counter = new SimilatedCAS(); @Override public void run() { for (int i = 0; i < 10000; i++) { System.out.println(this.increment()); } } public int increment() { int oldValue = counter.getValue(); int newValue = oldValue + 1; while (!counter.compareAndSwap(oldValue, newValue)) { //如果CAS失败,就去拿新值继续执行CAS oldValue = counter.getValue(); newValue = oldValue + 1; } return newValue; } public static void main(String[] args) { Runnable run = new CASCount(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); } } class SimilatedCAS { private int value; public int getValue() { return value; } // 这里只能用synchronized了,毕竟无法调用操作系统的CAS public synchronized boolean compareAndSwap(int expectedValue, int newValue) { if (value == expectedValue) { value = newValue; return true; } return false; } }
JDK 5.0引入底层CAS支持,java.util.concurrenent.atomic.AtomicXXX,使用底层的JVM支持为数字和引用类型提供一种高效的CAS操作。
eg:AtomicInteger,AtomicIntegerArray,AtomicLong,AtomicLongArray
原子变量能够支持原子的有条件的,读-改-写操作。
eg:使用原子变量类 实现一个计数器。
public class AtomicCounter implements Runnable{ //AtomicInteger采用了系统的CAS private AtomicInteger value = new AtomicInteger(); @Override public void run() { for (int i = 0; i < 10000; i++) { System.out.println(value.incrementAndGet()); } } public static void main(String[] args) { Runnable run = new AtomicCounter(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); new Thread(run).start(); } }
(1)、非阻塞算法提供比synchronized 机制更高的性能和可收缩性。可以使多个线程在竞争相同的数据时候不会发生阻塞。
基于锁的算法中可能会出现各种活跃性的障碍,比如I/O 阻塞,导致其它线程都无法进行下去,如果某种算法中,一个线程的失败或者挂起,不会影响其它线程的失败或者挂起,那么这种算法就是非阻塞算法。
(1)、非阻塞栈(链式存储结构)
push方法创建一个新的节点,改节点的next域指向当前栈顶。让后使用CAS把这个新节点放入栈顶。如果在开始插入的时候位于栈顶的节点没有变化则CAS成功,如果栈顶发生了变化(其它线程操作引起)那么CAS会失败。
eg:ConcurentStack.java
public class ConcurrentStack { class Node { public final int item; public Node next; public Node(int item) { this.item = item; } } private AtomicReference<Node> top = new AtomicReference<ConcurrentStack.Node>(); //入栈 public void push(int item) { Node newNode = new Node(item); Node oldHead; do { oldHead = top.get(); newNode.next = oldHead; } while (!top.compareAndSet(oldHead, newNode)); } //出栈 public int pop() { Node oldHead; Node newHead; do { oldHead = top.get(); if(oldHead == null ) { return -1; } newHead = oldHead.next; //Atomically sets the value to the given updated value if the current value {@code ==} the expected value. } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; } }