1、什么是 JUC?
java.util 工具包
2、进程与线程
进程与线程
进程(Process):一段程序的执行过程,是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。狭义定义:进程是正在运行的程序实例。
线程(Thread):是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个进程中可以 并发多个线程。
进程和线程的关系:每个进程都有相应的线程,在执行程序时,实际上是执行相应的一系列线程。进程是资源分配的最小单位,线程是程序执行的最小单位。
- 根本区别:进程是资源分配的最小单位,线程是程序执行的最小单位。计算机在执行程序时,会为程序创建相应的进程,进行资源分配时,是以进程为单位进行相应的分配。每个进程都有相应的线程,在执行程序时,实际上是在执行一系列线程。
- 地址空间:进程有自己独立的地址空间,每启动一个进程,系统都会为其分配地址空间,建立数据表来维护代码段、堆栈段和数据段;线程没有独立的地址空间,同一进程的线程共享本进程的地址空间。
- 资源拥有:进程之间的资源是独立的;同一进程内的线程共享本进程的资源。
- 执行过程:每个独立的进程有一个程序运行的入口、顺序执行序列和程序入口。但线程不能独立运行,必须依存在应用程序中,有应用程序提供多线程执行控制。
- 系统开销:进程系统开销大,线程执行开销小。
Java 线程类:Thread 、Runnable、Callcble
Java 真的可以开启线程吗? 开不了,调用了本地方法,底层C++实现。
从 Thread.start()
方法点进去源码:
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
// 本地方法,底层C++,Java无法直接操作硬件。
private native void start0();
并发与并行
1、并发:单核CPU
2、并行:多核CPU
/**
* @desc
* @auth llp
* @date 2022年01月06日 9:13
*/
public class Test {
public static void main(String[] args) {
// 获取CPU核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
线程的几个状态
// Thread.State 查看源码
public enum State {
// 新生
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待
WAITING,
// 超时等待
TIMED_WAITING,
// 终止、死亡
TERMINATED;
}
wait/sleep 区别
1、来自不同的类
wait ==> Object
sleep ==> Thread
2、关于锁的释放
wait:会释放锁
sleep:不会释放锁,抱着锁睡觉
3、使用的范围不同
wait:必须在同步代码块中
sleep:可以在任意地方睡
3、Lock锁
传统的 synchronized
线程就是一个单独的资源类,没有任何附属的操作!
- 属性、方法
/**
* @desc 基本的卖票例子 线程就是一个单独的资源类,没有任何附属的操作! 属性和方法
* @auth llp
* @date 2022年01月07日 14:26
*/
public class SaleTicketsDemo01 {
public static void main(String[] args) {
// 多线程操作
final Tickets tickets = new Tickets();
// Runnable -> @FunctionalInterface 函数式接口
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets.sale(); }, "线程A").start();
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets.sale(); }, "线程B").start();
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets.sale(); }, "线程C").start();
}
}
// 资源类
class Tickets{
// 属性、方法
private int number = 50;
// 卖票的方式
// public void sale(){
public synchronized void sale(){
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number + "张票");
}
}
}
Lock 接口
JDK 8 中文帮助文档中的例子:
Lock的实现类:
可重入锁构造函数底层源码:
公平锁:加锁时考虑排队问题,按照申请锁的顺序,依照先进先出(FIFO),先申请的线程先取得锁,其他线程进入队列等待锁的释放,当锁释放后,在队头的线程被唤醒。
非公平锁(默认):加锁时不考虑排队等待问题,直接尝试获取锁。如果此时恰好锁处于unlock
,则不管有没有其他线程在等待,直接拿到锁;否则就转化成公平锁的模式,进入等待队列。
/**
* @desc Lock锁
* @auth llp
* @date 2022年01月07日 15:17
*/
public class SaleTicketsDemo02 {
public static void main(String[] args) {
Tickets02 tickets02 = new Tickets02();
// Runnable -> @FunctionalInterface 函数式接口
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets02.sale(); }, "线程A").start();
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets02.sale(); }, "线程B").start();
new Thread( ()->{ for (int i = 0; i < 60; i++) tickets02.sale(); }, "线程C").start();
}
}
// Lock
class Tickets02{
// 属性、方法
private int number = 50;
Lock lock = new ReentrantLock();
// 卖票的方式
public void sale(){
// 加锁
lock.lock();
try {
// 业务代码
if (number > 0){
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余:" + number + "张票");
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock(); // 解锁
}
}
}
synchronized 和 Lock的区别
1、synchronized 是Java内置的关键字, Lock 是一个Java类
2、synchronized 无法获取锁的的状态,Lock 可以判断是否获取到锁
3、synchronized 会自动释放锁,Lock 必须手动释放锁,否则会死锁!
4、synchronized 如果线程1获取得锁且阻塞了,线程2会一直傻傻的等待,Lock 就不一定会等待下去 lock.tryLock();
。
5、synchronized 可重入锁,不可以中断的、非公平,Lock 可重入锁,可以判断锁,非公平(可以自己设置)
6、synchronized 适合少量的同步代码问题,Lock 适合大量的同步代码问题。
4、生产者和消费者问题
生产者和消费者问题 synchronized 版 (使用while,防止虚假唤醒)
/**
* @desc
* @auth llp
* @date 2022年01月18日 16:22
*/
public class Test01 {
public static void main(String[] args) {
Data data = new Data();
// Runnable -> @FunctionalInterface 函数式接口
new Thread( ()->{
try {
for (int i = 0; i < 60; i++) data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程A").start();
new Thread( ()->{
try {
for (int i = 0; i < 60; i++) data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程B").start();
}
}
// 判断等待 业务 通知
class Data{ // 数字 资源类
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (number != 0){
// 判断等待
this.wait();
}
// 业务
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知 (通知其他线程,我+1完毕)
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
while (number == 0){
// 判断等待
this.wait();
}
// 业务
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知 (通知其他线程,我-1完毕)
this.notifyAll();
}
}
生产者和消费者问题 Lock 版
通过Lock找到Condition类
Condition
将 Object
监视器方法(wait
、notify
和 notifyAll
)分解成截然不同的对象,以便通过将这些对象与任意 [Lock
] 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock
替代了 synchronized
方法和语句的使用,Condition
替代了 Object 监视器方法的使用。
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 Object.wait
做的那样。
Condition
实例实质上被绑定到一个锁上。要为特定 Lock
实例获得 Condition
实例,请使用其 newCondition()
方法。
代码实现:
/**
* @desc
* @auth llp
* @date 2022年01月24日 14:20
*/
public class Test02 {
public static void main(String[] args) {
Data02 data = new Data02();
// Runnable -> @FunctionalInterface 函数式接口
new Thread( ()->{
for (int i = 0; i < 60; i++) data.increment();
}, "线程A").start();
new Thread( ()->{
for (int i = 0; i < 60; i++) data.decrement();
}, "线程B").start();
new Thread( ()->{
for (int i = 0; i < 60; i++) data.increment();
}, "线程C").start();
new Thread( ()->{
for (int i = 0; i < 60; i++) data.decrement();
}, "线程D").start();
}
}
// 判断等待 业务 通知
class Data02{ // 数字 资源类
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment(){
lock.lock();
try {
// 业务代码
while (number != 0){
// 判断等待
condition.await();
}
// 业务
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知 (通知其他线程,我+1完毕)
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
// -1
public void decrement(){
lock.lock();
try {
// 业务代码
while (number == 0){
// 判断等待
condition.await();
}
// 业务
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知 (通知其他线程,我-1完毕)
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
Condition 精准的通知和唤醒线程
代码测试:
/**
* @desc Condition 精准通知 A执行完调用B B执行完执行C C执行完执行A
* @auth llp
* @date 2022年01月24日 14:47
*/
public class Test03 {
public static void main(String[] args) {
Data03 data03 = new Data03();
new Thread( ()->{
for (int i = 0; i < 10; i++) data03.printA();
}, "线程A").start();
new Thread( ()->{
for (int i = 0; i < 10; i++) data03.printB();
}, "线程B").start();
new Thread( ()->{
for (int i = 0; i < 10; i++) data03.printC();
}, "线程C").start();
}
}
/**
* @desc 资源类
* @auth llp
* @date 2022/1/24 14:48
*/
class Data03{
private final Lock lock = new ReentrantLock();
private final Condition conditionA = lock.newCondition();
private final Condition conditionB = lock.newCondition();
private final Condition conditionC = lock.newCondition();
private int number = 1; // 1A 2B 3C
public void printA(){
lock.lock();
try {
// 业务 判断等待 执行 通知
while (number != 1){
// 等待
conditionA.await();
}
System.out.println(Thread.currentThread().getName() + "=> 执行打印");
// 唤醒,唤醒指定的人
number = 2;
conditionB.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
// 业务 判断等待 执行 通知
while (number != 2){
// 等待
conditionB.await();
}
System.out.println(Thread.currentThread().getName() + "=> 执行打印");
// 唤醒,唤醒指定的人
number = 3;
conditionC.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
// 业务 判断等待 执行 通知
while (number != 3){
// 等待
conditionC.await();
}
System.out.println(Thread.currentThread().getName() + "=> 执行打印");
// 唤醒,唤醒指定的人
number = 1;
conditionA.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
5、8锁现象
关于锁的8个问题
1、标准情况下,两个线程是先打印发短信还是打电话? – 1/发短信 2/打电话
2、sendSms睡4秒,两个线程是先打印发短信还是打电话? – 1/发短信 2/打电话
/**
* @desc
* @auth llp
* @date 2022年01月24日 15:19
* 1、标准情况下,两个线程是先打印发短信还是打电话? -- 1/发短信 2/打电话
* 2、sendSms睡4秒,两个线程是先打印发短信还是打电话? -- 1/发短信 2/打电话
*/
public class Test01 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendSms, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(phone::call, "线程B").start();
}
}
class Phone{
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
synchronized 锁的对象是方法的调用者(*对象锁) 两个方法用的是同一把锁,谁先拿到谁执行
3、增加了一个普通方法后,先执行发短信还是hello – 1/Hello 2/发短信
4、两个对象,两个同步方法,两个线程是先打印发短信还是打电话? – 1/打电话 2/发短信
/**
* @desc
* @auth llp
* @date 2022年01月24日 15:19
* 3、增加了一个普通方法后,先执行发短信还是hello -- 普通方法
* 4、两个对象,两个同步方法,两个线程是先打印发短信还是打电话? -- 1/打电话 2/发短信
*/
public class Test02 {
public static void main(String[] args) throws InterruptedException {
// 两个对象
Phone02 phone1 = new Phone02();
Phone02 phone2 = new Phone02();
new Thread(phone1::sendSms, "线程A").start();
TimeUnit.SECONDS.sleep(1);
// new Thread(phone1::hello, "线程B").start();
new Thread(phone2::call, "线程B").start();
}
}
class Phone02{
public synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("Hello");
}
}
5、增加两个静态同步方法,只有一个对象,先打印发短信还是打电话?
6、增加两个静态同步方法,两个对象,先打印发短信还是打电话?
/**
* @desc
* @auth llp
* @date 2022年01月24日 15:57
* 5、增加两个静态同步方法,只有一个对象,先打印发短信还是打电话?
* 6、增加两个静态同步方法,两个对象,先打印发短信还是打电话?
*/
public class Test03 {
public static void main(String[] args) throws InterruptedException {
// 两个对象
Phone03 phone1 = new Phone03();
Phone03 phone2 = new Phone03();
new Thread(()->{
phone1.sendSms();
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
}, "线程B").start();
}
}
class Phone03{
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("Hello");
}
}
static :当类一加载就有了,锁的是 Class ,用的是同一个锁。(*类锁)
7、一个普通同步方法和一个静态同步方法,一个对象,先打印发短信还是打电话? –
8、一个普通同步方法和一个静态同步方法,两个个对象,先打印发短信还是打电话? –
/**
* @desc
* @auth llp
* @date 2022年01月24日 16:06
* 7、一个普通同步方法和一个静态同步方法,一个对象,先打印发短信还是打电话? --
* 8、一个普通同步方法和一个静态同步方法,两个个对象,先打印发短信还是打电话? --
*/
public class Test04 {
public static void main(String[] args) throws InterruptedException {
// 两个对象
Phone04 phone1 = new Phone04();
Phone04 phone2 = new Phone04();
new Thread(()->{
phone1.sendSms();
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
phone2.call();
}, "线程B").start();
}
}
class Phone04{
public static synchronized void sendSms(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
public void hello(){
System.out.println("Hello");
}
}
总结:
1、普通方法不受锁的影响!
2、同步方法锁的是对象的调用者!(对象锁)
3、静态同步方法的锁对象是Class,Class模板是唯一的,不受创建对象数量的影响!(类锁)
对象锁和类锁时不同的锁,所以多线程同时执行这2个不同的锁的方法时,是异步的。
6、集合类不安全
list 不安全
/**
* @desc java.util.ConcurrentModificationException 并发修改异常
* @auth llp
* @date 2022年01月25日 10:48
*/
public class ListTest {
public static void main(String[] args) {
// ArrayList 并发下不安全
// List<String> list = new ArrayList<>();
/**
* 解决方案:
* 1、List<String> list = new Vector<>();
* 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3、List<String> list = new CopyOnWriteArrayList<>();
*/
// List<String> list = new Vector<>();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>(); // CopyOnWrite 写入时复制 在写入的时候避免覆盖,造成数据问题
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
set 不安全
/**
* @desc
* @auth llp
* @date 2022年01月25日 11:08
*/
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
/**
* 解决方法:
* 1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2、Set<String> set = new CopyOnWriteArraySet<>();
*/
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
map 不安全
/**
* @desc
* @auth llp
* @date 2022年01月25日 11:17
*/
public class MapTest {
public static void main(String[] args) {
// Map<String, String> map = new HashMap<>();
/**
* 解决办法:
* 1、Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
* 2、Map<String, String> map = new ConcurrentHashMap<>();
*/
// Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
7、Callable
1、可以有返回值
2、可以跑出异常
3、方法不同,run()
==> call()
Callable是一个函数式接口,call()
的返回值是泛型参数的类型。
代码测试
/**
* @desc
* @auth llp
* @date 2022年01月26日 9:54
*/
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask(Callable<V> callable)).start();
MyThread myThread = new MyThread();
// 适配类
FutureTask<String> futureTask = new FutureTask<>(myThread);
new Thread(futureTask, "线程A").start();
new Thread(futureTask, "线程B").start(); // 结果会被缓存,提高效率
// 返回值
String s = futureTask.get(); // get() 可能会产生阻塞,把他放到最后!或者使用异步通信来处理。
System.out.println(s);
}
}
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("call()");
return "test";
}
}
注意:
1、结果会被缓存
2、获取返回结果可能会产生阻塞。
8、常用辅助类
8.1 CountDownLatch
/**
* @desc 计算类
* @auth llp
* @date 2022年01月26日 10:36
*/
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 初始化总数, 必须要执行任务的时候,再使用
CountDownLatch latch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "走出了教室!");
latch.countDown(); // 数量 -1
}, String.valueOf(i)).start();
}
// 等待计数器归零,然后再向下执行
latch.await();
System.out.println("关门");
}
}
latch.countDown()
// 数量-1
latch.await()
// 等待计数器归零,然后向下执行
每次有线程调用 countDown()
数量-1,假如计数器为0,await()
就会被唤醒。
8.2 CyclicBarrier
/**
* @desc
* @auth llp
* @date 2022年01月26日 10:43
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
// 集齐七颗龙珠召唤神龙
// 召唤神龙的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("召唤神龙成功!"));
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "收集" + temp + "个龙珠");
try {
cyclicBarrier.await(); // 等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
这个辅助类是通过 await()
来计数的, await()
之后本次线程会被阻塞,直到召唤神龙后才执行 await()
之后的代码。
加法计数器。
8.3 Semaphore
/**
* @desc
* @auth llp
* @date 2022年01月26日 10:54
*/
public class SemaphoreTest {
public static void main(String[] args) {
// 线程数量 : 停车位!限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "得到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(); // release() 释放
}
}, String.valueOf(i)).start();
}
}
}
acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
release()
释放一个许可,将其返回给信号量。
作用:多个共享资源互斥使用!并发限流,控制最大线程数!
9、读写锁
ReadWriteLock
/**
* @desc
* @auth llp
* @date 2022年01月26日 11:08
*/
public class ReadWriteLockTest {
public static void main(String[] args) {
//MyCache myCache = new MyCache();
MyCacheLock myCacheLock = new MyCacheLock();
// 写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
//myCache.put(temp+"", temp);
myCacheLock.put(temp+"", temp);
}, String.valueOf(i)).start();
}
// 读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
//myCache.get(temp+"");
myCacheLock.get(temp+"");
}, String.valueOf(i)).start();
}
}
}
/**
* @desc 自定义缓存
* @auth llp
* @date 2022/1/26 11:08
*/
class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
// 存, 写
public void put(String key, int value){
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入成功!");
}
// 取, 读
public void get(String key){
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取成功!");
}
}
/**
* @desc 自定义缓存 加锁
* @auth llp
* @date 2022/1/26 11:08
*/
class MyCacheLock{
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 存, 写入的时候,只希望只有一个线程写
public void put(String key, int value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入成功!");
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
// 取, 读,所有人都可以读
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取成功!");
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
10、阻塞队列
写入:如果队列满了,就必须阻塞等待
读取:如果队列为空,就必须阻塞等待
什么时候会使用到阻塞队列:多线程并发处理、线程池
1、四组API
方式 | 抛出异常 | 有返回值 | 阻塞 等待 | 超时 等待 |
---|---|---|---|---|
插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 | remove() |
poll() |
take() |
poll(time, unit) |
检查 | element() |
peek() |
不可用 | 不可用 |
抛出异常:
/**
* @desc 抛出异常
* @auth llp
* @date 2022/1/26 14:56
*/
public static void test1(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException: Queue full : 抛出异常!
// System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.element()); // 返回队首元素
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// java.util.NoSuchElementException: 抛出异常
System.out.println(blockingQueue.remove());
}
有返回值:
/**
* @desc 有返回值
* @auth llp
* @date 2022/1/26 15:05
*/
public static void test2(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// 不抛出异常,返回 false
// System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.peek()); // 返回队首元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 不抛出异常,返回 null
System.out.println(blockingQueue.poll());
}
阻塞等待:
/**
* @desc 阻塞等待
* @auth llp
* @date 2022/1/26 15:08
*/
public static void test3() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take()); // 一直阻塞
}
超时等待:
/**
* @desc 超时等待
* @auth llp
* @date 2022/1/26 15:08
*/
public static void test4() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
blockingQueue.offer("d", 2, TimeUnit.SECONDS); // 等待2秒则退出
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
2、同步队列 SynchronousQueue
没有容量,进去一个元素必须等待取出来之后,才能再往里放一个元素!
/**
* @desc 同步队列 和其他的 BlockingQueue 不一样, SynchronousQueue不存储元素
* @auth llp
* @date 2022年01月26日 15:18
*/
public class SynchronousQueueTest {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>(); // 同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take "+ synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + " take "+ synchronousQueue.take());
System.out.println(Thread.currentThread().getName() + " take "+ synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
11、线程池(重点)
线程池:三大方法、7大参数、4种拒绝策略
池化技术
通过复用线程来提升性能
1、三大方法
Executors.newSingleThreadExecutor(); // 单个线程
Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
Executors.newCachedThreadPool(); // 可伸缩的
/**
* @desc Executors 工具类、3大方法
* @auth llp
* @date 2022年01月26日 15:44
*/
public class Demo01 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);// 创建一个固定的线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的,遇强则强,遇弱则弱
try {
// 使用了线程池后,要使用线程池创建线程
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
// 线程池使用完后要关闭线程池
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
2、大参数
源码分析:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
本质:创建 ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
: 核心线程池大小
maximumPoolSize
: 最大线程池大小
keepAliveTime
: 超时等待,没有人使用就会释放
unit
: 超时单位
workQueue
: 阻塞队列
threadFactory
: 线程工厂,创建线程的,一般不动
handler
: 拒绝策略
创建一个线程
/**
* @desc
* @auth llp
* @date 2022年01月26日 16:05
* new ThreadPoolExecutor.AbortPolicy() // 拒绝策略,如果队列满了,就不处理,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
* new ThreadPoolExecutor.DiscardPolicy() // 队列满了,丢掉任务,不会抛出异常
*
*/
public class Demo02 {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy() // 队列满了,尝试去和最早的竞争,如果竞争成功则执行,否则丢弃。也不会抛弃异常
);
try {
// 最大承载: Deque + max 抛出异常== RejectedExecutionException
for (int i = 1; i <= 10; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
poolExecutor.shutdown();
}
}
}
3、四种拒绝策略
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略,如果队列满了,就不处理,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的回哪去处理!
new ThreadPoolExecutor.DiscardOldestPolicy() // 队列满了,丢掉任务,不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy() // 队列满了,尝试去和最早的竞争,如果竞争成功则执行,否则丢弃。也不会抛弃异常
4、线程池最大线程大小到底该如何定义
1、CPU 密集型 几核的CPU,最大线程就是几
// 获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
2、IO 密集型 判断你的程序中耗 IO 的线程
(了解CPU密集型、IO密集型)
12、四大函数式接口
必须掌握 :lambda表达式、链式编程、函数式接口、Stream流式计算。
函数式接口:只有一个方法的接口。只要是函数式表达式,就能使用lambda表达式简化
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// forEach(消费者类的函数式接口)
1、Function
函数式接口
/**
* @desc
* @auth llp
* @date 2022年01月26日 16:43
*/
public class Demo01 {
public static void main(String[] args) {
// 工具类,输出输入的值
// Function function = new Function<String, String>() {
// @Override
// public String apply(String o) {
// return o;
// }
// };
Function function = (Function<String, String>) o -> o;
// Function<String, String> function = (str)->{return str;};
System.out.println(function.apply("test"));
}
}
2、Predicate
断定型接口:只有一个输入参数,返回值只能是 布尔值!
/**
* @desc
* @auth llp
* @date 2022年01月26日 16:54
*/
public class Demo02 {
public static void main(String[] args) {
// 判断字符串是否为空
// Predicate predicate = new Predicate<String>() {
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate<String> predicate = String::isEmpty;
System.out.println(predicate.test("s"));
System.out.println(predicate.test(""));
}
}
3、Consumer
消费型接口
/**
* @desc
* @auth llp
* @date 2022年01月26日 17:01
*/
public class Demo03 {
public static void main(String[] args) {
// 打印字符串
// Consumer consumer = new Consumer<String>() {
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
Consumer<String> consumer = System.out::println;
consumer.accept("test");
}
}
4、Supplier
供给型接口
/**
* @desc
* @auth llp
* @date 2022年01月26日 17:06
*/
public class Demo04 {
public static void main(String[] args) {
// Supplier<String> supplier = new Supplier<String>() {
// @Override
// public String get() {
// System.out.println("get()");
// return "1024";
// }
// };
Supplier<String> supplier = () -> { return "1024"; };
System.out.println(supplier.get());
}
}
13、Stream流式计算
什么是Stream流计算?
在Java8中,添加了一个新的接口Stream,可以通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。
Stream流式计算:将要处理的元素看做一种流,流在管道中传输,并且可以在管道的节点上处理,包括过滤筛选、去重、排序、聚合等,并把结果发送到下一计算节点。
/**
* @desc stream流计算
* @auth llp
* @date 2022年01月27日 14:16
* 题目要求,一分钟内完成此题,只能用一行代码实现
* 现有5个用户,筛选
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名倒着排序
* 5、只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(6, "e", 25);
// 集合就是存储的
List<User> userList = Arrays.asList(u1, u2, u3, u4, u5);
// 计算交给流
// lambda表达式、链式编程、函数式接口、Stream流式计算。
userList.stream()
.filter(u -> u.getId() % 2 == 0)
.filter(u -> u.getAge() > 23)
.map(u -> u.getName().toUpperCase())
.sorted(Comparator.reverseOrder())
.limit(1)
.forEach(System.out::println);
}
}
14、ForkJoin
什么是ForkJoin?
Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。
ForkJoin 框架的本质是一个用于并行执行任务的框架, 能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool
其实,在 Java 8中引入的并行流计算,内部就是采用的ForkJoinPool来实现的。
ForkJoin 在JDK1.7,并行执行任务!提高效率,大数据量的时候使用!
ForkJoin 特点:工作窃取
B线程执行完之后窃取A线程的部分任务执行,这个里面维护的是双端队列。
如何使用ForkJoin
1、通过 ForkJoinPool
来执行
2、计算任务
使用 ForkJoinPool.execute(ForkJoinTask<?> task) 没有返回值
public void execute(ForkJoinTask<?> task)
安排(异步)给定的任务的执行。
-
参数
task
-任务 -
异常
NullPointerException
-如果任务是空的RejectedExecutionException
如果任务不能按计划执行
使用 ForkJoinPool.submit(ForkJoinTask<?> task) 有返回值
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
提交forkjointask执行。
-
参数类型
T
-任务类型结果 -
参数
task
-任务提交 -
结果
任务
-
异常
NullPointerException
-如果任务是空的RejectedExecutionException
如果任务不能按计划执行
3、计算类 要继承ForkJoinTask
-
一个典型的例子,这是一个任务计算斐波那契数列:
class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } Integer compute() { if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } }
测试代码:
/**
* @desc 求和计算任务
* @auth llp
* @date 2022年01月27日 15:26
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private long start;
private long end;
// 临界值
private long temp = 10000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) < temp){
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else {
// ForkJoin 递归
long mid = (end + start) / 2; // 中间值
ForkJoinDemo forkJoinTask1 = new ForkJoinDemo(start, mid);
forkJoinTask1.fork(); // 拆分任务,把任务任务压入线程队列
ForkJoinDemo forkJoinTask2 = new ForkJoinDemo(mid+1, end);
forkJoinTask2.fork(); // 拆分任务,把任务任务压入线程队列
return forkJoinTask1.join() + forkJoinTask2.join();
}
}
}
/**
* @desc 3000 6000(ForkJoin) 9000(Stream)
* @auth llp
* @date 2022年01月27日 15:59
*/
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); // 333
// test2(); // 198
test3(); // 157
}
// 普通程序员
public static void test1(){
long start = System.currentTimeMillis();
long sum = 0L;
for (int i = 1; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间: " + (end-start));
}
// 会使用 ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task); // 提交任务
long sum = submit.get(); // 阻塞等待
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间: " + (end-start));
}
// Stream 并行流
public static void test3(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum + " 时间: " + (end-start));
}
}
15、异步回调
Future<T>
实现类 ==> java.util.concurrent Class CompletableFuture<T>
代码测试:
/**
* @desc 异步调用
* @auth llp
* @date 2022年01月27日 16:29
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
// });
// System.out.println("11111111");
// completableFuture.get(); // 获取阻塞执行结果
// 有返回值的异步回调
// ajax 成功和失败有回调
// 返回的是错误信息
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "supplyAsync==>Integer");
// int i = 10/0;
return 1024;
});
Integer res = completableFuture.whenComplete((T, U) -> {
System.out.println("T=> " + T); // 返回正常结果
System.out.println("U=> " + U); // 返回错误信息: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {
e.getMessage();
return 233; // 可以回去到错误的返回结果
}).get();
System.out.println("返回结果:" + res);
}
}
16、JMM
Java内存模型是什么? Java Memory Model ( JMM )
因为在不同的硬件生产商和不同的操作系统下,内存的访问逻辑有一定的差异,Java内存模型,就是为了屏蔽系统和硬件的差异,让一套代码在不同平台下能到达相同的访问结果。
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM 对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write ( 成对使用 )
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存 (工作内存变量改变及时通知主存)
- 不允许一个线程将没有assign的数据从工作内存同步回主内存 ()
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
关于JMM的一些同步的约定∶
1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的最新值到工作内存中!
3、加锁和解锁是同一把锁.
JMM 模型特征
原子性:例如上面八项操作,在操作系统里面是不可分割的单元。被synchronized关键字或其他锁包裹起来的操作也可以认为是原子的。
可见性:每个工作线程都有自己的工作内存,所以当某个线程修改完某个变量之后,在其他的线程中,未必能观察到该变量已经被修改。volatile关键字要求被修改之后的变量要求立即更新到主内存,每次使用前从主内存处进行读取。因此volatile可以保证可见性。synchronized和final也能实现可见性。
有序性:java的有序性跟线程相关。如果在线程内部观察,会发现当前线程的一切操作都是有序的。如果在线程的外部来观察的话,会发现线程的所有操作都是无序的。
代码测试:
/**
* @desc
* @auth llp
* @date 2022年01月27日 17:55
*/
public class JMMTest {
private static int num = 0;
public static void main(String[] args) {
new Thread(()->{
while (num == 0){ // 线程A对主内存的变化不知道
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
问题: 主线程修改了num,但是线程A不能及时可见
问题:程序不知道内存中的值已经修改过了。如何保证可见性?==> Volatile。
17、Volatile
volatile在Java语言中是一个关键字,用于修饰变量。
被volatile修饰的变量后,表示这个变量在不同线程中是共享,编译器与运行时都会注意到这个变量是共享的,因此不会对该变量进行重排序。
volatile 是java 虚拟机提供的轻量级同步机制
1、保证可见性 (变在不同的线程中是共享的)
/**
* @desc
* @auth llp
* @date 2022年01月27日 17:55
*/
public class JMMTest {
// 不加 volatile 程序就会死循环
private volatile static int num = 0;
public static void main(String[] args) {
new Thread(()->{ // 线程A对主内存的变化不知道
while (num == 0){
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
2、不保证原子性
线程A在执行任务的时候,不能被打扰,也不能被分割。要么同时成功,要么同时失败。
/**
* @desc 不保证原子性
* @auth llp
* @date 2022年01月28日 9:24
*/
public class VolatileDemo {
// volatile 不保证原子性
// private volatile static int num = 0;
private static int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}, String.valueOf(i)).start();
}
while (Thread.activeCount() > 2){ // main GC
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
num++
根本就不是一个原子操作:
如果不加 lock 和 synchronized ,怎么保证原子性?
使用原子类来解决原子性问题:
/**
* @desc 不保证原子性
* @auth llp
* @date 2022年01月28日 9:24
*/
public class VolatileDemo {
// volatile 不保证原子性
// private volatile static int num = 0;
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
// num++; // 不是一个原子性操作
num.getAndIncrement(); // getAndIncrement() +1 方法
}
public static void main(String[] args) {
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}, String.valueOf(i)).start();
}
while (Thread.activeCount() > 2){ // main GC
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}
这些类的底层都是使用 native
方法,和操作系统直接挂钩,在内存中直接修改, UnSafe
类是一个很特殊的存在。
3、禁止指令重排(加内存屏障)
18、单例模式
饿汉式,很饿一进来就创建。
/**
* @desc 饿汉式单例模式 可能会浪费空间
* @auth llp
* @date 2022年01月28日 9:52
*/
public class Hungry {
// 构造器私有
private Hungry(){}
private final static Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}
懒汉式,很懒使用的时候才创建。
/**
* @desc 懒汉式单例模式
* @auth llp
* @date 2022年01月28日 9:55
*/
public class LazyMan {
private LazyMan(){
System.out.println(Thread.currentThread().getName() + " ok");
}
private static LazyMan lazyMan;
// 单线程下没问题,多线程下有问题
public static LazyMan getInstance(){
if (lazyMan == null){
lazyMan = new LazyMan();
}
return lazyMan;
}
// 对线程测试
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
LazyMan.getInstance();
}, String.valueOf(i)).start();
}
}
}
双重检测锁模式的 懒汉式单例 DCL模式:
/**
* @desc 懒汉式单例模式
* @auth llp
* @date 2022年01月28日 9:55
*/
public class LazyMan {
private LazyMan(){
System.out.println(Thread.currentThread().getName() + " ok");
}
// 因此这里必须加 volatile 避免指令重排
private volatile static LazyMan lazyMan;
// 双重检测锁模式的 懒汉式单例 DCL模式
public static LazyMan getInstance(){
if (lazyMan == null){
synchronized (LazyMan.class){ // 锁类
if (lazyMan == null){
lazyMan = new LazyMan(); // 不是一个原子性操作,有可能发生指令重排
// 1、 分配内存空间
// 2、 执行构造方法,初始化对象
// 3、 把这个对象指向这个空间
}
}
}
return lazyMan; // 有可能没有完成构造
}
}
静态内部类:
/**
* @desc 静态内部类
* @auth llp
* @date 2022年01月28日 10:07
*/
public class Holder {
public Holder() {
}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
破坏懒汉式单例 DCL模式单例模式:
/**
* @desc 懒汉式单例模式
* @auth llp
* @date 2022年01月28日 9:55
*/
public class LazyMan {
private static boolean mainbao = false;
private LazyMan(){
synchronized (LazyMan.class){
if (mainbao == false){
mainbao = true;
}else {
throw new RuntimeException("不要使用反射破坏异常");
}
// if (lazyMan != null){
// throw new RuntimeException("不要使用反射破坏异常");
// }
}
}
// 因此这里必须加 volatile 避免指令重排
private volatile static LazyMan lazyMan;
// 双重检测锁模式的 懒汉式单例 DCL模式
public static LazyMan getInstance(){
if (lazyMan == null){
synchronized (LazyMan.class){
if (lazyMan == null){
lazyMan = new LazyMan(); // 不是一个原子性操作,有可能发生指令重排
// 1、 分配内存空间
// 2、 执行构造方法,初始化对象
// 3、 把这个对象指向这个空间
}
}
}
return lazyMan; // 有可能没有完成构造
}
// 通过反射破坏
public static void main(String[] args) throws Exception {
// LazyMan instance = LazyMan.getInstance(); // 构造器加锁判断抛异常
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance1 = declaredConstructor.newInstance();
LazyMan instance2 = declaredConstructor.newInstance(); // 都使用反射机制创建,构造器加锁判断也没有用 ==> 使用标志变量
// System.out.println(instance);
System.out.println(instance1);
System.out.println(instance2);
}
}
如何防止反射破坏单例,点击查看 declaredConstructor.newInstance()
源码:
使用枚举类单例模式:
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws Exception { // Cannot reflectively create enum objects
EnumSingle enumSingle1 = EnumSingle.INSTANCE;
EnumSingle enumSingle2 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);// 使用反编译工具查看得知,是个有参构造
declaredConstructor.setAccessible(true);
EnumSingle enumSingle3 = declaredConstructor.newInstance();
System.out.println(enumSingle1);
System.out.println(enumSingle2);
System.out.println(enumSingle3);
}
}
19、CAS (compare and swap)
什么是CAS?
CAS是compare and swap的缩写,即我们所说的比较交换。CAS基于乐观锁,悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
乐观锁一般来说有以下2种方式:
- 使用数据版本(Version) 记录机制实现,这是乐观锁最常用的一种实现方式。何谓数据版本?即为数据增加一个版本标识,一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。
- 使用时间戳(timestamp)。乐观锁定的第二种实现方式和第一种差不多,同样是在需要乐观锁控制的table中增加一个字段,名称无所谓,字段类型使用时间戳(timestamp), 和上面的version类似,也是在更新提交的时候检查当前数据库中数据的时间戳和自己更新前取到的时间戳进行对比,如果一致则OK,否则就是版本冲突。
如果期望值达到了,那么就更新,否则不更新,CAS是CPU并发原语。
CAS: 比较当前工作内存中的值,如果这个值是期望的,则执行操作!如果不是就一直循环(底层是自旋锁)。
Unsafe 类
自旋锁:
缺点:存在ABA问题
线程B执行了两次操作,但是线程A不知道此过程,以为A还是原来的A。
/**
* @desc
* @auth llp
* @date 2022年01月28日 10:39
*/
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2022);
// public final boolean compareAndSet(int expect, int update)
// 期望, 更新
// ====================== 捣乱的线程 ===========================
System.out.println(atomicInteger.compareAndSet(2022, 2023));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2023, 2022));
System.out.println(atomicInteger.get());
// ===================== 期望的线程 ===========================
System.out.println(atomicInteger.compareAndSet(2022, 2025));
System.out.println(atomicInteger.get());
}
}
20、原子引用
Integer 对象缓存机制,默认的范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用哦缓存机制,为 new 一定会创建新的对象分配新的内存空间。
解决ABA问题:引入原子引用。和乐观锁的原理相同,加版本号。
/**
* @desc
* @auth llp
* @date 2022年01月28日 10:39
*/
public class CASDemo {
public static void main(String[] args) {
AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(20, 1);
new Thread(()->{
int stamp = stampedReference.getStamp(); // 获得版本号
System.out.println("A1==>" + stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(stampedReference.compareAndSet(20, 22,
stampedReference.getStamp(), stampedReference.getStamp() + 1));
System.out.println("A2==>" + stampedReference.getStamp());
System.out.println(stampedReference.compareAndSet(22, 20,
stampedReference.getStamp(), stampedReference.getStamp() + 1));
System.out.println("A2==>" + stampedReference.getStamp());
}, "A").start();
new Thread(()->{
int stamp = stampedReference.getStamp(); // 获得版本号
System.out.println("B1==>" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(stampedReference.compareAndSet(20, 66, stamp, stamp + 1));
System.out.println("B2==>" + stampedReference.getStamp());
}, "B").start();
}
}
21、各种锁
1、公平锁、非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2、可重入锁
可重入锁(递归锁)
/**
* @desc
* @auth llp
* @date 2022年01月28日 14:42
*/
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(phone::send, "A").start();
new Thread(phone::send, "B").start();
}
}
class Phone{
public synchronized void send(){
System.out.println(Thread.currentThread().getName() + "send");
call(); // 这里有锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "call");
}
}
/**
* @desc
* @auth llp
* @date 2022年01月28日 14:45
*/
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(phone::send, "A").start();
new Thread(phone::send, "B").start();
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void send(){
lock.lock(); // 细节问题:lock-send -> lock-call -> unlock-call -> unlock-send
try {
System.out.println(Thread.currentThread().getName() + "send");
call(); // 这里有锁
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
3、自旋锁
自定义锁测试:
/**
* @desc 自旋锁
* @auth llp
* @date 2022年01月28日 14:51
*/
public class SpinlockDemo {
// int 默认值 0
// Thread 默认值 null
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==> myLock");
// 自旋锁
while (!atomicReference.compareAndSet(null, thread)){
}
}
// 解锁
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==> myUnlock");
atomicReference.compareAndSet(thread, null);
}
}
class Test{
public static void main(String[] args) throws InterruptedException {
// ReentrantLock reentrantLock = new ReentrantLock();
// reentrantLock.lock();
// reentrantLock.unlock();
SpinlockDemo spinlockDemo = new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
}, "T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
spinlockDemo.myLock(); // T2进去,锁被T1获得,只能自旋等待T1释放锁
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myUnlock();
}
}, "T2").start();
}
}
4、死锁
(镜子口红例子)
死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。
(多个线程互相抱着对方需要的资源,然后形成僵持)
死锁的发生必须同时具备以下四个条件:
- 互斥条件: 一个资源每次只能被一个进程使用。
- 请求与保持条件: 一个进程因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件: 进程已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件: 若干进程之间形成一种头尾相接的循环等待资源关系。
如何预防死锁?
- 加锁顺序(线程按顺序办事)
- 加锁时限 (线程请求所加上权限,超时就放弃,同时释放自己占有的锁)
- 死锁检测
解决问题
1、使用 jps -l
定位进程号。
2、使用 jstack -进程号
找到死锁问题。