Lock应用之 线程协作

内部锁(对象监视器)使用Object类的wait(), notify(), notifyAll()方法来进行线程之间的协作通信;Lock锁引入Condition来支持线程之间通信协作,Condition提供await(), signal(), signalAll()方法实现与内部锁同样的等待与唤醒功能,但与内部锁不同的是一个Lock可以绑定多个Condition,以满足不同条件下唤醒不同线程的功能。


最典型的线程协作例子就是生产者与消费者。如果使用内部锁控制线程通信,所有线程,不管生产者还是消费者,都被同一个对象监视,当新生产的对象放入消费队列后,生产者会唤醒所有的线程,包括其它生产者,这时队列可能已经满了,所以被唤醒的生产者只好继续回头进入等待队列,一直轮询直到唤醒一个消费者才会进行消费,然后继续。因为Lock支持绑定多个Condition,所以如果使用Lock实现,可以为当队列有对象可消费创建一个Condition,当队列有空位创建一个Condition,这样,生产者和消费者可以相互明确告知,而不是像内部锁广播的方式而缺乏明确的目标对象。


简而言之,内部锁的notify()与notifyAll()会唤醒任何一个线程,不管线程的等待条件是否已经满足;Condition则提供了针对不同类型线程定制唤醒条件的实现,减少无谓的唤醒。


另外,经过测试,在多核情况下,一个线程还可能被一些其它的条件唤醒,所以测试结果并不一定会满意,有待细究。


内部锁与Lock方式实现的生产者与消费者代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLockCondition {
    private static final int MAX_SIZE = 3;
    private static final int PRODUCER_NUM = 5;
    private static final int CONSUMER_NUM = 1;
    private Queue<Object> queue = new LinkedList<Object>();
    private volatile boolean isWorkingTime = true;
    private volatile boolean isUsingLock = true;
    private Lock lock = new ReentrantLock();
    private Condition full = lock.newCondition();  
    private Condition empty = lock.newCondition();
       
    private void produceWithLock() throws InterruptedException{
        lock.lock();
        try{
            while(queue.size() >= MAX_SIZE){
                System.out.println(Thread.currentThread().getName() + " go wait.");
                full.await();
                System.out.println(Thread.currentThread().getName() + " was awaken.");
            }
                       
            queue.offer(new Object());
            System.out.println(Thread.currentThread().getName() + " produce one object.");
            empty.signalAll();
            //empty.signal();
        }
        finally{
            lock.unlock();
        }              
    }
       
    private void consumeWithLock() throws InterruptedException{
        lock.lock();
        try{
            while(queue.isEmpty()){
                System.out.println(Thread.currentThread().getName() + " go wait.");
                empty.await();
                System.out.println(Thread.currentThread().getName() + " was awaken.");
            }
                       
            queue.poll();
            System.out.println(Thread.currentThread().getName() + " consume one object.");
            full.signalAll();
            //full.signal();
        }
        finally{
            lock.unlock();
        }              
    }
       
    private synchronized void produce() throws InterruptedException{
        while(queue.size() >= MAX_SIZE){
            System.out.println(Thread.currentThread().getName() + " go wait.");
            wait();
            System.out.println(Thread.currentThread().getName() + " was awaken.");
        }
                   
        queue.offer(new Object());
        System.out.println(Thread.currentThread().getName() + " produce one object.");
        //notifyAll();
        notify();
    }
       
    private synchronized void consume() throws InterruptedException{
        while(queue.isEmpty()){
            System.out.println(Thread.currentThread().getName() + " go wait.");
            wait();
            System.out.println(Thread.currentThread().getName() + " was awaken.");
        }
                   
        queue.poll();
        System.out.println(Thread.currentThread().getName() + " consume one object.");
        //notifyAll();
        notify();
    }
       
    public static void main(String[] args) throws InterruptedException {
        TestLockCondition testLockCondition = new TestLockCondition();
           
        Thread[] producers = new Thread[PRODUCER_NUM];
           
        for(int i=0; i<PRODUCER_NUM; i++){
            producers[i] = new Thread(testLockCondition.new Producer());
            producers[i].start();
        }      
           
        Thread[] consumers = new Thread[CONSUMER_NUM];
        for(int i=0; i<CONSUMER_NUM; i++){
            consumers[i] = new Thread(testLockCondition.new Consumer());
            consumers[i].start();
        }
           
        TimeUnit.SECONDS.sleep(10);
           
        for(int i=0; i<PRODUCER_NUM; i++){
            System.out.println(producers[i].getName() + " : " + producers[i].getState());
            producers[i].interrupt();
        }
           
        for(int i=0; i<CONSUMER_NUM; i++){
            System.out.println(consumers[i].getName() + " : " + consumers[i].getState());
            consumers[i].interrupt();
        }
                   
        testLockCondition.isWorkingTime = false;
    }
       
    private class Consumer implements Runnable{
        @Override
        public void run() {
            try {
                while(isWorkingTime){
                    if(isUsingLock){
                        //System.out.println(Thread.currentThread().getName() + " is using Lock.");
                        consumeWithLock();
                    }
                    else{
                        consume();
                    }                  
                }              
            catch (InterruptedException e) {
                e.printStackTrace();
            }          
        }
           
    }
    private class Producer implements Runnable{
        @Override
        public void run() {
            try {
                while(isWorkingTime){
                    if(isUsingLock){
                        //System.out.println(Thread.currentThread().getName() + " is using Lock.");
                        produceWithLock();
                    }
                    else{
                        produce();
                    }
                }              
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
           
    }
}






     本文转自sarchitect 51CTO博客,原文链接http://blog.51cto.com/stevex/1300223,如需转载请自行联系原作者


上一篇:携手美国糖尿病协会,IBM将用大数据改变糖尿病的未来


下一篇:IsBackground对线程的重要作用