在分布式开发中,锁是线程控制的重要途径。Java为此也提供了2种锁机制,synchronized和lock。做为Java爱好者,自然少不了对比一下这2种机制,也能从中学到些分布式开发需要注意的地方。
我们先从最简单的入手,逐步分析这2种的区别。
一、synchronized和lock的用法区别
synchronized:在需要同步的对象中加入此控制,synchronized可以加在方法上,也可以加在特定代码块中,括号中表示需要锁的对象。
lock:需要显示指定起始位置和终止位置。一般使用ReentrantLock类做为锁,多个线程中必须要使用一个ReentrantLock类做为对象才能保证锁的生效。且在加锁和解锁处需要通过lock()和unlock()显示指出。所以一般会在finally块中写unlock()以防死锁。
用法区别比较简单,这里不赘述了,如果不懂的可以看看Java基本语法。
二、synchronized和lock性能区别
synchronized是托管给JVM执行的,而lock是java写的控制锁的代码。在Java1.5中,synchronize是性能低效的。因为这是一个重量级操作,需要调用操作接口,导致有可能加锁消耗的系统时间比加锁以外的操作还多。相比之下使用Java提供的Lock对象,性能更高一些。但是到了Java1.6,发生了变化。synchronize在语义上很清晰,可以进行很多优化,有适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等。导致在Java1.6上synchronize的性能并不比Lock差。官方也表示,他们也更支持synchronize,在未来的版本中还有优化余地。
说到这里,还是想提一下这2中机制的具体区别。据我所知,
synchronized原始采用的是CPU悲观锁机制,即线程获得的是独占锁。独占锁意味着其他线程只能依靠阻塞来等待线程释放锁。而在CPU转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起CPU频繁的上下文切换导致效率很低。
而Lock用的是乐观锁方式。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁实现的机制就是CAS操作(Compare and Swap)。我们可以进一步研究ReentrantLock的源代码,会发现其中比较重要的获得锁的一个方法是compareAndSetState。这里其实就是调用的CPU提供的特殊指令。
现代的CPU提供了指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。这个算法称作非阻塞算法,意思是一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。
我也只是了解到这一步,具体到CPU的算法如果感兴趣的读者还可以在查阅下,如果有更好的解释也可以给我留言,我也学习下。
三、synchronized和lock用途区别
synchronized原语和ReentrantLock在一般情况下没有什么区别,但是在非常复杂的同步应用中,请考虑使用ReentrantLock,特别是遇到下面2种需求的时候。
1.某个线程在等待一个锁的控制权的这段时间需要中断
2.需要分开处理一些wait-notify,ReentrantLock里面的Condition应用,能够控制notify哪个线程
3.具有公平锁功能,每个到来的线程都将排队等候
下面细细道来……
先说第一种情况,ReentrantLock的lock机制有2种,忽略中断锁和响应中断锁,这给我们带来了很大的灵活性。比如:如果A、B2个线程去竞争锁,A线程得到了锁,B线程等待,但是A线程这个时候实在有太多事情要处理,就是一直不返回,B线程可能就会等不及了,想中断自己,不再等待这个锁了,转而处理其他事情。这个时候ReentrantLock就提供了2种机制,第一,B线程中断自己(或者别的线程中断它),但是ReentrantLock不去响应,继续让B线程等待,你再怎么中断,我全当耳边风(synchronized原语就是如此);第二,B线程中断自己(或者别的线程中断它),ReentrantLock处理了这个中断,并且不再等待这个锁的到来,完全放弃。(如果你没有了解java的中断机制,请参考下相关资料,再回头看这篇文章,80%的人根本没有真正理解什么是java的中断,呵呵)
这里来做个试验,首先搞一个Buffer类,它有读操作和写操作,为了不读到脏数据,写和读都需要加锁,我们先用synchronized原语来加锁,如下:
1 |
public class Buffer {
|
2 |
3 |
private Object lock;
|
4 |
5 |
public Buffer() {
|
6 |
lock = this ;
|
7 |
}
|
8 |
9 |
public void write() {
|
10 |
synchronized (lock) {
|
11 |
long startTime = System.currentTimeMillis();
|
12 |
System.out.println( "开始往这个buff写入数据…" );
|
13 |
for (;;) // 模拟要处理很长时间
|
14 |
{
|
15 |
if (System.currentTimeMillis()
|
16 |
- startTime > Integer.MAX_VALUE)
|
17 |
break ;
|
18 |
}
|
19 |
System.out.println( "终于写完了" );
|
20 |
}
|
21 |
}
|
22 |
23 |
public void read() {
|
24 |
synchronized (lock) {
|
25 |
System.out.println( "从这个buff读数据" );
|
26 |
}
|
27 |
}
|
28 |
} |
接着,我们来定义2个线程,一个线程去写,一个线程去读。
1 |
public class Writer extends Thread {
|
2 |
3 |
private Buffer buff;
|
4 |
5 |
public Writer(Buffer buff) {
|
6 |
this .buff = buff;
|
7 |
}
|
8 |
9 |
@Override
|
10 |
public void run() {
|
11 |
buff.write();
|
12 |
}
|
13 |
14 |
} |
15 |
16 |
public class Reader extends Thread {
|
17 |
18 |
private Buffer buff;
|
19 |
20 |
public Reader(Buffer buff) {
|
21 |
this .buff = buff;
|
22 |
}
|
23 |
24 |
@Override
|
25 |
public void run() {
|
26 |
27 |
buff.read(); //这里估计会一直阻塞
|
28 |
29 |
System.out.println( "读结束" );
|
30 |
31 |
}
|
32 |
33 |
} |
好了,写一个Main来试验下,我们有意先去“写”,然后让“读”等待,“写”的时间是无穷的,就看“读”能不能放弃了。
1 |
public class Test {
|
2 |
public static void main(String[] args) {
|
3 |
Buffer buff = new Buffer();
|
4 |
5 |
final Writer writer = new Writer(buff);
|
6 |
final Reader reader = new Reader(buff);
|
7 |
8 |
writer.start();
|
9 |
reader.start();
|
10 |
11 |
new Thread( new Runnable() {
|
12 |
13 |
@Override
|
14 |
public void run() {
|
15 |
long start = System.currentTimeMillis();
|
16 |
for (;;) {
|
17 |
//等5秒钟去中断读
|
18 |
if (System.currentTimeMillis()
|
19 |
- start > 5000 ) {
|
20 |
System.out.println( "不等了,尝试中断" );
|
21 |
reader.interrupt();
|
22 |
break ;
|
23 |
}
|
24 |
25 |
}
|
26 |
27 |
}
|
28 |
}).start();
|
29 |
30 |
}
|
31 |
} |
我们期待“读”这个线程能退出等待锁,可是事与愿违,一旦读这个线程发现自己得不到锁,就一直开始等待了,就算它等死,也得不到锁,因为写线程要21亿秒才能完成 T_T ,即使我们中断它,它都不来响应下,看来真的要等死了。这个时候,ReentrantLock给了一种机制让我们来响应中断,让“读”能伸能屈,勇敢放弃对这个锁的等待。我们来改写Buffer这个类,就叫BufferInterruptibly吧,可中断缓存。
1 |
import java.util.concurrent.locks.ReentrantLock;
|
2 |
3 |
public class BufferInterruptibly {
|
4 |
5 |
private ReentrantLock lock = new ReentrantLock();
|
6 |
7 |
public void write() {
|
8 |
lock.lock();
|
9 |
try {
|
10 |
long startTime = System.currentTimeMillis();
|
11 |
System.out.println( "开始往这个buff写入数据…" );
|
12 |
for (;;) // 模拟要处理很长时间
|
13 |
{
|
14 |
if (System.currentTimeMillis()
|
15 |
- startTime > Integer.MAX_VALUE)
|
16 |
break ;
|
17 |
}
|
18 |
System.out.println( "终于写完了" );
|
19 |
} finally {
|
20 |
lock.unlock();
|
21 |
}
|
22 |
}
|
23 |
24 |
public void read() throws InterruptedException {
|
25 |
lock.lockInterruptibly(); // 注意这里,可以响应中断
|
26 |
try {
|
27 |
System.out.println( "从这个buff读数据" );
|
28 |
} finally {
|
29 |
lock.unlock();
|
30 |
}
|
31 |
}
|
32 |
33 |
} |
当然,要对reader和writer做响应的修改
1 |
public class Reader extends Thread {
|
2 |
3 |
private BufferInterruptibly buff;
|
4 |
5 |
public Reader(BufferInterruptibly buff) {
|
6 |
this .buff = buff;
|
7 |
}
|
8 |
9 |
@Override
|
10 |
public void run() {
|
11 |
12 |
try {
|
13 |
buff.read(); //可以收到中断的异常,从而有效退出
|
14 |
} catch (InterruptedException e) {
|
15 |
System.out.println( "我不读了" );
|
16 |
}
|
17 |
18 |
System.out.println( "读结束" );
|
19 |
20 |
}
|
21 |
22 |
} |
23 |
24 |
/** |
25 |
* Writer倒不用怎么改动 |
26 |
*/ |
27 |
public class Writer extends Thread {
|
28 |
29 |
private BufferInterruptibly buff;
|
30 |
31 |
public Writer(BufferInterruptibly buff) {
|
32 |
this .buff = buff;
|
33 |
}
|
34 |
35 |
@Override
|
36 |
public void run() {
|
37 |
buff.write();
|
38 |
}
|
39 |
40 |
} |
41 |
42 |
public class Test {
|
43 |
public static void main(String[] args) {
|
44 |
BufferInterruptibly buff = new BufferInterruptibly();
|
45 |
46 |
final Writer writer = new Writer(buff);
|
47 |
final Reader reader = new Reader(buff);
|
48 |
49 |
writer.start();
|
50 |
reader.start();
|
51 |
52 |
new Thread( new Runnable() {
|
53 |
54 |
@Override
|
55 |
public void run() {
|
56 |
long start = System.currentTimeMillis();
|
57 |
for (;;) {
|
58 |
if (System.currentTimeMillis()
|
59 |
- start > 5000 ) {
|
60 |
System.out.println( "不等了,尝试中断" );
|
61 |
reader.interrupt();
|
62 |
break ;
|
63 |
}
|
64 |
65 |
}
|
66 |
67 |
}
|
68 |
}).start();
|
69 |
70 |
}
|
71 |
} |
这次“读”线程接收到了lock.lockInterruptibly()中断,并且有效处理了这个“异常”。
至于第二种情况,ReentrantLock可以与Condition的配合使用,Condition为ReentrantLock锁的等待和释放提供控制逻辑。
例如,使用ReentrantLock加锁之后,可以通过它自身的Condition.await()方法释放该锁,线程在此等待Condition.signal()方法,然后继续执行下去。await方法需要放在while循环中,因此,在不同线程之间实现并发控制,还需要一个volatile的变量,boolean是原子性的变量。因此,一般的并发控制的操作逻辑如下所示:
1 |
volatile boolean isProcess = false ;
|
2 |
ReentrantLock lock = new ReentrantLock();
|
3 |
Condtion processReady = lock.newCondtion(); |
4 |
thread: run() { |
5 |
lock.lock();
|
6 |
isProcess = true ;
|
7 |
try {
|
8 |
while (!isProcessReady) { //isProcessReady 是另外一个线程的控制变量
|
9 |
processReady.await(); //释放了lock,在此等待signal
|
10 |
} catch (InterruptedException e) {
|
11 |
Thread.currentThread().interrupt();
|
12 |
} finally {
|
13 |
lock.unlock();
|
14 |
isProcess = false ;
|
15 |
}
|
16 |
}
|
17 |
}
|
18 |
} |
这里只是代码使用的一段简化,下面我们看Hadoop的一段摘取的源码:
1 |
private class MapOutputBuffer<K extends Object, V extends Object>
|
2 |
implements MapOutputCollector<K, V>, IndexedSortable {
|
3 |
... |
4 |
boolean spillInProgress;
|
5 |
final ReentrantLock spillLock = new ReentrantLock();
|
6 |
final Condition spillDone = spillLock.newCondition();
|
7 |
final Condition spillReady = spillLock.newCondition();
|
8 |
volatile boolean spillThreadRunning = false ;
|
9 |
final SpillThread spillThread = new SpillThread();
|
10 |
... |
11 |
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
12 |
TaskReporter reporter
|
13 |
) throws IOException, ClassNotFoundException {
|
14 |
...
|
15 |
spillInProgress = false ;
|
16 |
spillThread.setDaemon( true );
|
17 |
spillThread.setName( "SpillThread" );
|
18 |
spillLock.lock();
|
19 |
try {
|
20 |
spillThread.start();
|
21 |
while (!spillThreadRunning) {
|
22 |
spillDone.await();
|
23 |
}
|
24 |
} catch (InterruptedException e) {
|
25 |
throw new IOException( "Spill thread failed to initialize" , e);
|
26 |
} finally {
|
27 |
spillLock.unlock();
|
28 |
}
|
29 |
}
|
30 |
31 |
protected class SpillThread extends Thread {
|
32 |
33 |
@Override
|
34 |
public void run() {
|
35 |
spillLock.lock();
|
36 |
spillThreadRunning = true ;
|
37 |
try {
|
38 |
while ( true ) {
|
39 |
spillDone.signal();
|
40 |
while (!spillInProgress) {
|
41 |
spillReady.await();
|
42 |
}
|
43 |
try {
|
44 |
spillLock.unlock();
|
45 |
sortAndSpill();
|
46 |
} catch (Throwable t) {
|
47 |
sortSpillException = t;
|
48 |
} finally {
|
49 |
spillLock.lock();
|
50 |
if (bufend < bufstart) {
|
51 |
bufvoid = kvbuffer.length;
|
52 |
}
|
53 |
kvstart = kvend;
|
54 |
bufstart = bufend;
|
55 |
spillInProgress = false ;
|
56 |
}
|
57 |
}
|
58 |
} catch (InterruptedException e) {
|
59 |
Thread.currentThread().interrupt();
|
60 |
} finally {
|
61 |
spillLock.unlock();
|
62 |
spillThreadRunning = false ;
|
63 |
}
|
64 |
}
|
65 |
}
|
代码中spillDone 就是 spillLock的一个newCondition()。调用spillDone.await()时可以释放spillLock锁,线程进入阻塞状态,而等待其他线程的 spillDone.signal()操作时,就会唤醒线程,重新持有spillLock锁。
这里可以看出,利用lock可以使我们多线程交互变得方便,而使用synchronized则无法做到这点。
最后呢,ReentrantLock这个类还提供了2种竞争锁的机制:公平锁和非公平锁。这2种机制的意思从字面上也能了解个大概:即对于多线程来说,公平锁会依赖线程进来的顺序,后进来的线程后获得锁。而非公平锁的意思就是后进来的锁也可以和前边等待锁的线程同时竞争锁资源。对于效率来讲,当然是非公平锁效率更高,因为公平锁还要判断是不是线程队列的第一个才会让线程获得锁。