AQS原理及其ReentrantLock源码分析

1、加锁的本质

问大家一个问题,在并发编程中,加锁的本质是什么呢?

使得多个线程串行化的访问临界资源。即只能有一个线程去操作临界资源,其他的线程需要进入到阻塞队列中排队等候获取临界资源。

1.1、等待唤醒机制

  • 基于Object的,Object.wait()、Object.notify(),基于monitor机制实现会释放锁资源;注意,wait,和notify方法 只能在同步代码块中使用,否则会出现异常:java.lang.IllegalMonitorStateException.
  • 基于Thread的,LockSupport.park()、LockSupport.unparkl();

我们通过下面的代码来观察wait和notify方法:

public class Test3 {
    public void testWaitMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " begin wait()");
                lock.wait();   // 会释放锁资源,意味着其他线程可以争取该锁
                System.out.println(Thread.currentThread().getName() + " end wait");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void testNotifyMethod(Object lock) {
        try {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " begin notify()");
                Thread.sleep(5000);
                lock.notify();
                System.out.println(Thread.currentThread().getName() + " end notify");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Object lock = new Object();
        Test3 test = new Test3();

        Thread threadA = new Thread(() -> test.testWaitMethod(lock), "Thread A");
        Thread threadB = new Thread(() -> test.testNotifyMethod(lock), "Thread B");

        threadA.start();
        threadB.start();
    }
}

下面是运行结果:

Thread A begin wait()
Thread B begin notify()
Thread B end notify
Thread A end wait

Process finished with exit code 0

我们可以看到,当对象调用wait方法的时候,锁被释放,所以B线程可以进入到同步块中。唤醒之后等待B线程执行完毕,A线程才会继续执行。

这里一定要注意wait和notify的先后顺序,否则有可能导致无法唤醒!

接下来我们在看看基于LcckSupport的park和unpark方法。

public class TestParkUnparkMethod {
    public void testWaitMethod(Object lock) {
        try {
            System.out.println(Thread.currentThread().getName() + " begin wait()");
            // counter = 0 , 进行阻塞,为许可
            LockSupport.park();
            System.out.println(Thread.currentThread().getName() + " end wait");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testNotifyMethod(Object lock) {
        try {
            System.out.println(Thread.currentThread().getName() + " begin notify()");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + " end notify");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Object lock = new Object();
        TestParkUnparkMethod test = new TestParkUnparkMethod();

        Thread threadA = new Thread(() -> test.testWaitMethod(lock), "Thread A");
        Thread threadB = new Thread(() -> test.testNotifyMethod(lock), "Thread B");

        threadA.start();
        threadB.start();

        try {
            threadB.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 发放许可, counter=1 之后被park的线程才会继续执行!否则被park的线程会一直处于阻塞状态
        LockSupport.unpark(threadA);
    }
}

运行结果:

Thread A begin wait()
Thread B begin notify()
Thread B end notify
Thread A end wait

Process finished with exit code 0

这里,当我们LockSupport.park()一个线程之后,必须使用LockSupport.unpark(thread)方法来发放许可证,这样被park的线程才能继续向下执行,否则会一致处于阻塞状态!park的有点在于不用担心顺序,我们只要在任何地方调用LockSupport.unpark(thread)来发放许可,那么阻塞线程就会被唤醒继续执行了。

了解java.util.concurrent.locks.Condition的同学应该会知道,Condition中存在await等方法,也可以实现线程的park. 其实这个方法底层也是调用的LockSupport.park()方法来实现的。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);  // 这里调用了park方法来实现线程阻塞
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

可以说,Java中显示的阻塞线程的方法几乎都是调用LockSupport.park方法来实现的。

wait、park他们的底层都是调用操作系统的库函数来实现的, pthread_mutex_lock.

1.2、中断机制

在Java中,如何优雅的中断一个线程?

  • Thread.stop(),不建议使用这个方式,这是stop方法的注释:“Forces the thread to stop executing.”,会强制停止线程的执行,有可能你的线程刚刚获取到了锁还没有释放锁就被强制停止,那么其他线程就会无法再继续获取这个锁资源了!会造成死锁。或者是在进行一个任务,还没来及进行持久化操作就强制停掉线程执行,可能会造成数据一致性问题。
  • thread.interrupt()方法,

我们在下面的代码中测试interrupt方法:

public class TestStopThread {
    static int i = 0;

    public static void main(String[] args) {
        System.out.println("begin");
        Thread t1 = new Thread(new Runnable() {
            @Override
            public synchronized void run() {
                while (true) {
                    i++;
                    System.out.println(i);

                    // 判断是否有中断标志位
                    if (Thread.interrupted()) {
                        System.out.println("============");
                    }
                }
            }
        });

        t1.start();
        // 调用线程的中断方法
        t1.interrupt();
    }
}

运行结果:
887
841888
841889
841890
841891
. . . 

我们从执行结果中发现,调用interrupt方法之后,线程并没有中弄断执行!而是一直继续执行。这是为什么呢?

我们调用 t1.interrupt()发送一个中断信号的时候,线程并不会主动停下来,而是需要我们自己使用逻辑去控制中断。我们可以在循环体中调用break方法来中断循环和中弄线程。

Thread t1 = new Thread(new Runnable() {
            @Override
            public synchronized void run() {
                while (true) {
                    i++;
                    System.out.println(i);

                    // 判断是否有中断标志位
                    if (Thread.interrupted()) {
                        System.out.println("============");
                        break;  // 手动的中断
                    }
                }
            }
        });
        
 运行结果:
begin
1
============        

AQS原理及其ReentrantLock源码分析

注意,interrupt()方法只会对设置一个标志位,不会真的进行中单操作。

public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
上一篇:java并发编程-park/unpark


下一篇:Linux Oracle服务启动&停止脚本与开机自启动