前面文章讲述的Synchronized和ReentrantLock都是Java中的互斥锁,排它锁
,其实在J.U.C中还有一种锁,那就是共享锁
,让我们一起来看看吧.
什么是共享锁?
该锁可以被多个线程共有,不与其他线程互斥,非独占状态。
主要以CountDownLatch、Semaphore、CyclicBrrier
来分析共享锁原理
1.CountDownLatch
计数器,通过wait/notify操作state
1.1.Java中的使用
//初始化计数器 3
static CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是线程1,我执行完了");
//减少计数器
latch.countDown();
}).start();
new Thread(()->{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是线程2,我执行完了");
latch.countDown();
}).start();
new Thread(()->{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是线程3,我执行完了");
latch.countDown();
}).start();
//在这里阻塞 等待计数器归0后被唤醒
latch.await();
System.out.println("所有线程都执行完了!");
}
1.2.源码分析
- 构造方法和 latch.await(); 主线程等待其他结束
//static CountDownLatch latch = new CountDownLatch(3);
Sync(int count) {
//通过构造方法 实际上state=3
setState(count);
}
//latch.await();
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//判断共享锁的状态 (getState() == 0) ? 1 : -1;
if (tryAcquireShared(arg) < 0)
//尝试去获取共享锁
doAcquireSharedInterruptibly(arg);
}
//抢占锁并处理interrupt
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//当前线程添加到AQS队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前置节点
final Node p = node.predecessor();
//前置节点 = head节点的话 当前线程排在第一位 直接尝试获取
if (p == head) {
//尝试获取共享锁 (getState() == 0) ? 1 : -1;
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果next节点是共享并且Node.SIGNAL等待唤醒状态 这里都会去尝试唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//抢占失败的走这里 判断一些条件 并阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
//LockSupport.park(this); 阻塞并检查interrupt
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- latch.countDown(); state == 0 则唤醒等待中的主线程
public final boolean releaseShared(int arg) {//arg = 1
//释放一次锁
if (tryReleaseShared(arg)) {
//唤醒节点中的等待线程 也就是主线程
//也就是AbstractQueuedSynchronizer#unparkSuccessor中的LockSupport.unpark(s.thread);
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//实际上就是操作state
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//原始值为计数器的值 每次减一直到0才返回true唤醒主线程
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2.Semaphore
本质:
- 抢占一个令牌,如果抢到就通行,否则就阻塞
- 限制资源的并发数
实际应用:信号灯、限流、限制资源的访问
2.1 Java中的使用
static class Elevator extends Thread {
int num;
Semaphore semaphore;
//电梯可同时容纳人数
public Elevator(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
//令牌数-1,如果为0就阻塞 可以阻塞多个
semaphore.acquire();
num++;
System.out.println("第" + num + "进入电梯!");
Thread.sleep(1000);
System.out.println("第" + num + "人下了");
} catch (Exception e) {
} finally {
//令牌数+1,唤醒等待的线程 可以唤醒多个
semaphore.release();
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 20; i++) {
new Elevator(i,semaphore).start();
}
}
原理和计数器差不多,区别就在于多了release加令牌数
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//令牌数会加回去
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
3.CyclicBarrier
不是AQS实现,可重复实现的栅栏,需要达到指定数量线程才会被唤醒,比如投票,需要等所有票数收集完才会执行统计
3.1 Java中的使用
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5,()->{
System.out.println("投票完成,开始统计!");
});
for (int i = 0; i < 5; i++) {
new Toupiao(barrier).start();
}
}
static class Toupiao extends Thread{
private CyclicBarrier cyclicBarrier;
public Toupiao(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"投票了");
//与CountDownLatch有异曲同工的之妙
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2 源码分析
- await()
//构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
//所有线程执行完以后 会执行这个线程
this.barrierCommand = barrierAction;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//count 每次减1,直到为0就唤醒所有的线程
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
//同步执行构造方法传入的统计线程代码块
command.run();
ranAction = true;
//跟进去就是trip.signalAll(); 唤醒所有的线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
//上面不满足的话 会在这里阻塞
if (!timed)
//trip Condition trip = lock.newCondition();
//使用的Condition 就不再重复了 上面signalAll唤醒所有线程
trip.await();
else if (nanos > 0L)//带超时时间的
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
。。。
}
} finally {
lock.unlock();
}
}
以上就是本章的全部内容了。重点是自己根据不同的需求去使用
上一篇:线程通信synchronized中的wait/notify、J.U.C Condition的使用和源码分析
下一篇:Java本地线程变量ThreadLocal的神秘面纱
知之者不如好之者,好之者不如乐之者