juc并发编程
juc简介
java.util .concurrent,Java并发包
笔试题目
- 手写单例模式
- 手写冒泡排序
- 生产者消费者变种题目
进程/线程回顾
进程/线程是什么?
**进程:**进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。(简单来说,进程就是后台运行的一个程序)
线程: 通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。
进程/线程例子?
使用QQ,查看进程一定有一个QQ.exe的进程,我可以用qq和A文字聊天,和B视频聊天,给C传文件,给D发一段语言,QQ支持录入信息的搜索。
大四的时候写论文,用word写论文,同时用QQ音乐放音乐,同时用QQ聊天,多个进程。
word如没有保存,停电关机,再通电后打开word可以恢复之前未保存的文档,word也会检查你的拼写,两个线程:容灾备份,语法检查
什么是并发?什么是并行?
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点
例子:小米9今天上午10点,限量抢购
春运抢票
电商秒杀…
**并行: ** 多项工作一起执行,之后再汇总
例子:泡方便面,电水壶烧水,一边撕调料倒入桶中
thread.start
thread.start表示线程进入就绪状态(不是立刻进行调用),是在调用run方法是在cup和操作系统对这个线程进行调度的时候。
Thread的状态
waite()和sleep()的比较,wait放开去睡,放开了手里的锁。
sleep()捏紧了手去睡,醒了还有手里的锁。
/**
* Thread state for a thread which has not yet started.
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,
WAITING和TIMED_WAITING的区别
WAITING这个是一直等,死死的等,俗称不见不散
等待线程的线程状态。
由于调用了一个
以下方法:
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,
TIMED_WAITING是定时等待,俗称过时不候
具有指定等待时间的等待线程的线程状态。
由于调用其中一个线程,线程处于定时等待状态
具有指定正等待时间的以下方法:
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
经典卖票问题
一般多线程问题可以简化为:线程 操作(资源类暴露的方法) 资源类
版本1使用匿名内部类
资源类:这里也可以使用同步方法或者同步代码块(但是粒度太大)
这里使用的Lock的实现类ReentrantLock()可重入锁,相比synchronized同步锁来说更加灵活,
/**
* 资源类:票
*/
class Ticket1 {
// 设定初始值为30张
private int number = 30;
// ReentrantLock可重入锁
private Lock lock = new ReentrantLock();
/**
* 操作,这里使用synchronized就相当于把这个saleTicket全部都锁住了,粒度太大
*/
public void saleTicket() {
// 上锁
lock.lock();
try {
// 先判断number是否大于0
if (number > 0) {
System.out.println("ThreadName = " + Thread.currentThread().getName() + "\t卖出第:" + (number--) + "\t还剩下" + number);
} else {
System.out.println("票已经卖完");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
lock.unlock();
}
}
}
抢票类
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/03 10:10
* @Version:1.0
*
* 题目:三个售票员 卖出 30张票
*多线程编程的企业套路+模板
*
* 1.在高内聚低耦合的前提下:线程 操作(资源类对外暴露的调用方法) 资源类
*
*抢票类
*
*/
public class SaleTicket {
public static void main(String[] args) { // main程序一切入口
// 创建两个线程t1,t2
// Thread t1 = new Thread();
// Thread t2 = new Thread();
//Thread(Runnable target, String name)
// // 启动线程
// t1.start();
// t2.start();
// 下面三个线程(A、B、C)操作同一个资源类
Ticket1 ticket = new Ticket1();
// 匿名内部类
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 40; i++) {
ticket.saleTicket();
}
}
}, "A");
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 40; i++) {
ticket.saleTicket();
}
}
}, "B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 40; i++) {
ticket.saleTicket();
}
}
}, "C").start();
}
}
版本二使用lambda表达式
Lambda Express 使用的场景:当一个接口中只有一个抽象方法,其他的默认方法有几个无所谓。
Lambda Express表达式的小口诀: 拷贝小括号(小括号是接口中的方法),写死右箭头,落地大括号。
@FunctionalInterface注解声明式函数式接口,其实可以不用加,在底层默认给加上了
Foo接口
/**
* 有且只有一个抽象方法的接口称为“函数式接口”
*/
@FunctionalInterface
interface Foo {
// public void sayHello();
int add(int a, int b);
default int div(int a, int b) {
return a / b;
}
/**
* 静态方法实现
* @param a
* @param b
* @return
*/
static int mv(int a, int b) {
return a * b;
}
}
使用Foo接口中的Lambda表达式
public class LambdaExpressDemo {
public static void main(String[] args) {
/* Foo foo = new Foo() {
@Override
public void sayHello() {
System.out.println("*******************hello java 2022");
}
};*/
/*
Foo foo = () -> {
System.out.println("*******************hello java lambda");
};
foo.sayHello();
*/
// 可以把方法的参数类型省略
Foo foo = (a, b) -> {
System.out.println("a+b = " + a + b);
return a + b;
};
foo.add(1234, 45);
System.out.println(foo.div(10, 2));
System.out.println(Foo.mv(5, 3));
}
}
线程交互问题:笔试题目之交替打印0和1
这个就是生产者和消费者模型。
笔试题目:
题目:现在两个线程,可以操作初始值为零的一个变量,
实现一个线程对该变量加,一个线程对该变量减i,
实现交替,来10轮,变量初始值为零。
两个线程交替打印1.0版
这里有一个坑,但是两个线程的时候不会出现,当线程多的时候就会出现。这里的判断条件不能用if,只能用while。
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/03 18:29
* @Version:1.0
* 笔试题目:
* 题目:现在两个线程,可以操作初始值为零的一个变量,
* 实现一个线程对该变量加,一个线程对该变量减i,
* 实现交替,来10轮,变量初始值为零。
*
*
* // 1.高内聚低耦合的前提下,线程 操作 资源类
* 2. 判断/干活/通知
*
* 生产者消费者复习
*
*
*
*/
public class ThreadWaitNotifyDemo {
public static void main(String[] args) {
AirCondition airCondition = new AirCondition();
/**
* 生产蛋糕,做加法
*/
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
airCondition.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
/**
* 消费蛋糕,做减法
*/
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
airCondition.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
/**
* 资源类
* 1.0版
*/
class AirCondition {
private int number = 0;
/**
* 对number进行加1
* 这里有三步,
* 1、判断number的值是否为0
* 2、做加法
* 3、通知消费者来吃蛋糕
*
* 生产者线程
*
*/
public synchronized void increment() throws InterruptedException {
// 1.判断
if (number != 0) {
this.wait();
}
// 2.干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3.通知(唤醒等待的线程)
this.notifyAll();
}
/**
* 对number进行减1
* synchronized加上同步锁
* 就相当于,消费者去买蛋糕,如果没有蛋糕,就先得等待
* 消费者线程
*
*
*/
public synchronized void decrement() throws InterruptedException {
// 1.判断,等于0,没有蛋糕,就得先等
if (number == 0) {
this.wait();
}
// 2.干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3.通知(唤醒等待的线程)
this.notifyAll();
}
}
两个线程交替打印2.0版(变成四个线程)
这里升级为四个线程进行操作,一个加一个减,一个加一个减
/* if (number != 0) {
// A可能没有执行下面的wait的时候,线程调度的时候就停止了,
// 但是他苏醒过来了之后,没有进行判断,因为number可能已经加成1了(下面的消费者也一样)
// 一直等待进入阻塞状态,当有其他线程notify()或者notifyAll()就被唤醒了
this.wait();
}*/
public class ThreadWaitNotifyDemo2 {
public static void main(String[] args) {
AirCondition1 ac1 = new AirCondition1();
// 消费者线程A
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(200);
ac1.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
// 消费者线程B
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(400);
ac1.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
// 消费者线程C
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(500);
ac1.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
// 消费者线程D
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
ac1.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
/**
* 资源类
* 2.0版
*/
class AirCondition1 {
private int number = 0;
/**
* 对number进行加1
* 这里有三步,
* 1、判断number的值是否为0
* 2、做加法
* 3、通知消费者来吃蛋糕
*
* 生产者线程
*
*/
public synchronized void increment() throws InterruptedException {
/**
* sleep不会释放线程的所有权(锁)而wait会,所以下面配合synchronized使用
*
* 这个式调用无参的wait
* 导致当前线程等待,直到另一个线程调用该对象的notify()方法或notifyAll()方法。
* 换句话说,这个方法的行为就好像简单地执行呼叫wait(0) 。
*
*
* wait()
* 当前的线程必须拥有该对象的显示器。 线程释放此监视器的所有权(锁),并等待直到发生以下两种情况之一:
*
* 另一个线程通知等待该对象的监视器的线程通过调用notify方法或notifyAll方法来唤醒。
* 由timeout毫秒加nanos纳秒参数指定的超时时间已过。
*
*
*
* 像在一个参数版本中,中断和虚假唤醒是可能的,并且该方法应该始终在循环中使用:
*
* synchronized (obj) {
* while (<condition does not hold>)
* obj.wait(timeout, nanos);
* ... // Perform action appropriate to condition
* }
*
*
*/
// 1.判断
// 注意这里不能用if进行判断,如果使用if的话会出现虚假唤醒,因为if只判断一次(这个是最主要原因)
/* if (number != 0) {
// A可能没有执行下面的wait的时候,线程调度的时候就停止了,
// 但是他苏醒过来了之后,没有进行判断,因为number可能已经加成1了(下面的消费者也一样)
// 一直等待进入阻塞状态,当有其他线程notify()或者notifyAll()就被唤醒了
this.wait();
}*/
// 1.判断
while (number != 0) {
// 一直等待进入阻塞状态,当有其他线程notify()或者notifyAll()就被唤醒了
this.wait();
}
// 2.干活
number++;
System.out.println(Thread.currentThread().getName() + "生产了数据:" + number);
// 3.通知(唤醒等待的线程)
this.notifyAll();
}
/**
* 对number进行减1
* synchronized加上同步锁
* 就相当于,消费者去买蛋糕,如果没有蛋糕,就先得等待
* 消费者线程
*
*
*/
public synchronized void decrement() throws InterruptedException {
// 1.判断,等于0,没有蛋糕,就得先等
while (number == 0) {
this.wait();
}
// 2.干活
number--;
System.out.println(Thread.currentThread().getName() + "消费了数据:" + number);
// 3.通知(唤醒等待的线程)
this.notifyAll();
}
}
改编题目
题目内容
多线程之间按顺序调用,实现A->B->C三个线程启动,要求如下:
AA打印5次,BB打10次,CC打印15次接着 (这里又线程之间的通信,这里是精确通知)
AA打印5次,BB打印10次,CC打印15次
由于这里是三个线程进行打印而且还有顺序,通过使用标志位加Condition接口中的signal();实现精准通知。当A线程进入执行完打印操作之后,将标志位进行改变,就精准通知B线程。同理B完成之后通知C,C然后再通知A实现A->B->C(其实都是number标志位起了作用)
资源类的代码如下
- 第一种使用Lock锁
/**
* 资源类
* 作业:将三个方法合并成一个方法
*/
class ShareResource {
// 1:A 2:B 3:C 标志位1对于A,2对应B,3对应C
private int number = 1; // 设定标志位的初始值为1
// lock就相当于锁,而下面的condition就相当于钥匙,一把锁配三把钥匙
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
/**
* 打印五次
*/
public void print5() {
lock.lock();
try {
// 1.判断
while (number != 1) {
// 进行等待
condition1.await();
}
// 2.干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + ":\t" + i);
}
// 3.通知
// 将标志位进行更改
number = 2;
// 这里精准唤醒2线程
// 唤醒condition2锁了的线程(唤醒2线程)
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 打印十次
*/
public void print10() {
lock.lock();
try {
// 1.判断
while (number != 2) {
// 进行等待
condition2.await();
}
// 2.干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + ":\t" + i);
}
// 3.通知
// 将标志位进行更改
number = 3;
// 唤醒等待的线程 (唤醒3线程)
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 打印十五次
*/
public void print15() {
lock.lock();
try {
// 1.判断
while (number != 3) {
// 进行等待
condition3.await();
}
// 2.干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + ":\t" + i);
}
// 3.通知
// 将标志位进行更改
number = 1;
// 唤醒等待的线程(唤醒1线程)
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
- 第二种使用同步方法
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/07 9:15
* @Version:1.0
* 也可以用synchronized同步方法和wait和notifyAll,其实就是number标志位起了作用
*
*/
class AirCondition3{
// 1代表A , 2代表B 3代表C
private int number = 1;
public synchronized void print5s() {
// 判断
while (number != 1) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t\t" + i);
}
number = 2;
notifyAll();
}
public synchronized void print10s() {
// 判断
while (number != 2) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t\t" + i);
}
number = 3;
notifyAll();
}
public synchronized void print15s() {
// 判断
while (number != 3) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t\t" + i);
}
number = 1;
notifyAll();
}
}
public class ThreadWaitNotifyDemo1 {
public static void main(String[] args) {
AirCondition3 airCondition2 = new AirCondition3();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
airCondition2.print5s();
}
}, "线程A").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
airCondition2.print10s();
}
}, "线程B").start();
new Thread(() -> {
for (int i = 1; i <= 15; i++) {
airCondition2.print15s();
}
}, "线程C").start();
}
}
使用lock替换synchronized
将synchronized同步方法换成Lock锁,Lock更加灵活,**synchronized使用的是Object类中wait和notify/notifyAll。**而在Lock中Lock
实现提供比使用synchronized
方法和语句可以获得的更广泛的锁定操作。 它们允许更灵活的结构化,可能具有完全不同的属性,并且可以支持多个相关联的对象Condition
。
Condition
因素出Object
监视器方法( wait
, notify
和notifyAll
)成不同的对象,以得到具有多个等待集的每个对象,通过将它们与使用任意的组合的效果Lock
实现。 Lock
替换synchronized
方法和语句的使用, Condition
取代了对象监视器方法的使用。 Condition
使用的是await()
和signal()
/signalAll()
对应Object类的wait和notify/notifyAll。
资源类和操作类的代码。
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/04 16:49
* @Version:1.0
*
* 将锁换成Lock
* 1.高内聚低耦合的前提下,线程 操作 资源类
* 2. 判断/干活/通知
* 3. 多线程交互的过程中,必须要防止多线程的虚假唤醒,也即(判断使用while,不能用if)
* (在方法的判断中不许用if,只能用while)
* 4. 注意标志位的修改和定位
*
*/
public class ThreadWaitNotifyDemo2 {
public static void main(String[] args) {
AirCondition2 airCondition2 = new AirCondition2();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
airCondition2.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
airCondition2.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程B").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
airCondition2.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程C").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
try {
airCondition2.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程D").start();
}
}
class AirCondition2 {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int number = 0;
// 生产者对number的值进行加1
public void increase() throws InterruptedException {
// 对资源进行上锁
lock.lock();
try {
// 1.判断
while (number != 0) {
// this.wait();
condition.await();
}
// 2.干活
++number;
System.out.println(Thread.currentThread().getName() + "生产了" +"\t"+ number);
// 通知
// this.notify();
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对资源进行释放锁
lock.unlock();
}
}
// 生产者对number的值进行减1
public void decrease() throws InterruptedException {
// 对资源进行上锁
lock.lock();
try {
// 1.判断
while (number == 0) {
// this.wait();
condition.await();
}
// 2.干活
number--;
System.out.println(Thread.currentThread().getName() + "消费了" +"\t"+ number);
// 通知
// this.notify();
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对资源进行释放锁
lock.unlock();
}
}
}
多线程8锁
题目:多线程8锁
其实下面就是看synchronized锁的是什么对象
普通同步方法锁的就是this这个当前对象,而不是锁住这个单独的synchronized方法,也就是说同一时间段,只能有一个线程进入当前类,访问一个synchronized方法
静态同步方法锁的就是类.Class这个对象也就是类对象,同时去复习一下static关键字
普通方法没有加synchronized就不会上锁
对于同步方法块,锁是Synchonized括号里配置的对象
A 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,
其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
加个普通方法后发现和同步锁无关
换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。
具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,
可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,
这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,
而不管是同一个实例对象的静态同步方法之间,
还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
- 1、标准访问,先打印短信还是邮件
邮件
- 2、 停4秒在短信方法内,先打印短信还是邮件
邮件
- 3 、普通的hello方法,是先打短信还是hello
hello
- 4 、现在有两部手机,先打印短信还是邮件
短信
- 5 、两个静态同步方法,1部手机,先打印短信还是邮件
邮件
- 6 、两个静态同步方法,2部手机,先打印短信还是邮件
邮件
- 7 、1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
短信
- 8 、1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
短信
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
phone.sendEmail();
}, "线程A").start();
// 由于这里线程睡了0.2秒钟,线程A就已经完成启动和调度了
Thread.sleep(200);
new Thread(() -> {
// phone.sendMsg();
phone2.sendMsg();
// phone.hello();
}, "线程B").start();
}
}
class Phone {
/**
* 发送邮件的方法
*/
public static synchronized void sendEmail() {
try {
// 线程睡四秒钟
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "---------------sendEmail");
}
/**
* 发送短信方法
*/
public static synchronized void sendMsg() {
System.out.println(Thread.currentThread().getName() + "---------------sendMsg");
}
/**
* 普通方法,没有加锁,就不用争抢锁资源
*/
public void hello() {
System.out.println(Thread.currentThread().getName() + "-------------------------hello");
}
}
交替打印100以内的奇数和偶数
这里有一个大坑,你要先将另一把锁唤醒之后,然后将自己进入等待状态。不然的话,会出现死锁,连哥哥线程都在等待对方唤醒。
死锁
**是指多个进程在运行过程中因争夺资源而造成的一种僵局,**当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进。 因此我们举个例子来描述,如果此时有一个线程T1,按照先锁R1再获得锁R2的的顺序获得锁,而在此同时又有另外一个线程T2,按照先锁T2再锁T1的顺序获得锁。
第一种使用Lock锁
public class Test2 {
public static void main(String[] args) {
ShareSource1 ss = new ShareSource1();
new Thread(()->{ss.print();},"线程A").start();
new Thread(()->{ss.print();},"线程B").start();
}
}
class ShareSource1 {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int number = 1;
public void print() {
// 判断
lock.lock();
try {
while (number <= 100) {
System.out.println(Thread.currentThread().getName() + "\t\t" + number);
number++;
// 线程死锁描述的是这样一种情况:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止.
// 不能先wait,因为假如先wait之后,B再进来也是wait然后他俩就相互等待,就会死锁
// this.wait();
// this.notify();
// 为了避免死锁,得先将进行唤醒,然后再去等待
condition.signal();
condition.await();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
第二种使用synchronized静态代码块
public class Test3 {
public static void main(String[] args) {
ShareSource2 ss2 = new ShareSource2();
new Thread(() -> {
try {
ss2.print();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程A").start();
new Thread(() -> {
try {
ss2.print();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程B").start();
}
}
class ShareSource2 {
private int number = 1;
public void print() throws InterruptedException {
// 判断
synchronized (this) {
while (number <= 100) {
System.out.println(Thread.currentThread().getName() + "\t\t" + number);
number++;
// 线程死锁描述的是这样一种情况:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止.
// 不能先wait,因为假如先wait之后,B再进来也是wait然后他俩就相互等待,就会死锁
// this.wait();
// this.notify();
// 为了避免死锁,得先将进行唤醒,然后再去等待
this.notify();
this.wait();
}
}
}
}
集合中的一些比较
请举例说明集合类是不安全的
ArrayList线程不安全
ArrayList、HashMap、HashSet都是线程不安全的
List<String> list = new ArrayList<>(); // Collections.synchronizedList(new ArrayList<String>());
// 出现异常java.util.ConcurrentModificationException
// 多线程条件下,既要读又要写
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(Thread.currentThread().getName() + list);
}, String.valueOf(i)).start();
}
-
故障现象
java.util.ConcurrentModificationException并发修改异常
-
导致原因
多线程条件下,既要读又要写 -
解决方案
3.1 用Vector,它的底层在add方法加了同步锁
3.2 Collections.synchronizedList(new ArrayList());
3.3 new CopyOnWriteArrayList<>(); -
将代码块抽取成方法的快捷键Ctrl+Alt+m
ArrayList的add方法
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
在多线程条件下不使用ArrayList
public static void main(String[] args) {
/* // 将数组转List
List<String> list = Arrays.asList("a", "b", "c");
// 方法的引用
list.forEach(System.out ::println);*/
// 生成当前的时间戳
// System.currentTimeMillis();
// 第一种方法:(线程不安全) new ArrayList<>();
// 第二种方法(不推荐) new Vector<>();
// 第三种:使用Collections.synchronizedList将线程不安全的的List转换成线程安全的
// 第四种:使用 new CopyOnWriteArrayList<>();
// HashMap线程不安全使用ConcurrentHashMap
List<String> list = new ArrayList<>(); // Collections.synchronizedList(new ArrayList<String>());
// 出现异常java.util.ConcurrentModificationException
// 多线程条件下,既要读又要写
for (int i = 1; i <= 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(Thread.currentThread().getName() + list);
}, String.valueOf(i)).start();
}
}
java.util.concurrent
多线程条件下,一般使用这个报下面的类,后面的三个都是java.util.concurrent并发包下面的。
- ArrayList ------> CopyOnWriteArrayList
- HashSet ------> CopyOnWriteArraySet
- HashMap ------> ConcurrentHashMap
回忆集合的基础知识
HashSet的底层数据结构是HashMap,存的是map的key,它的value是一个写死的Object的常量
HashSet的add方法是调用的HashMap的put方法
HashMap的底层1.8是Node数组+链表+红黑树,1.7是哈希表
jdk1.8
构造求一个空的HashMap,默认的初始容量是(16),默认的负载因子是(0.75)
HashMap和HashSet都是无序的
无序性:无序性不等于随机性。指的是存储的数据在底层数组中并非按照数组索引的顺序进行添加的,而使根据数值的哈希值决定的。
ArrayList扩容是扩容为原来的1.5倍
HashMap扩容是扩大到原来的两倍,在使用的时候可以将他的初始容量设大,这样就可以避免它反复扩容了。
CopyOnWriteArrayList
- java.util.concurrent.CopyOnWriteArrayList
写时复制技术原理
**CopyOnWrite容器即写时复制的容器。**往一个容器添加元素的时候,不直接往当前容器Object[]添加,
而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
CopyOnWriteArrayList的原理:简单的来说当A线程去写的时候,先复制一份1.1版的,然后自己用这个1.1版的去写,如果其他的线程要读的时候就去读1.0版本的,当A写完之后再发布1.1版的。(其实也就是读写分离思想)
CopyOnWriteArrayList是线程安全的,下面是它的add方法的源码,它会先将传入的集合进行复制一份,然后将新的
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
final Object[] getArray() {
return array;
}
/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 这里就是复制一份新的,然后用Object数组进行接收
Object[] elements = getArray();
// 获得原来的集合的长度
int len = elements.length;
// 对elements数组进行扩容长度加一得到新的newElements
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 将要添加的e,添加到数组的最后
newElements[len] = e;
// 再将newElements重新设置回CopyOnWriteArrayList对象中,就是相当于1.1版本的替换1.0版本的
setArray(newElements);
// 添加完成
return true;
} finally {
// 释放锁
lock.unlock();
}
}
HashMap的put方法,put里面其实装的是一个个的Node结点
/**
* Associates the specified value with the specified key in this map.
* If the map previously contained a mapping for the key, the old
* value is replaced.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>.
* (A <tt>null</tt> return can also indicate that the map
* previously associated <tt>null</tt> with <tt>key</tt>.)
*/
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
/**
* Implements Map.put and related methods.
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
// HashMap的put装的是一个个的Node结点
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
Callable接口
复习获得多线程的方式
面试题:获得多线程的方法几种?
- 继承Thread类
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/06 19:39
* @Version:1.0
* 实现多线程的方式二继承Thread类,重写run方法
*
*
*/
public class ThreadDemo1 {
public static void main(String[] args) {
MyThread1 t1 = new MyThread1();
new Thread(t1,"线程1").start();
new Thread(t1,"线程2").start();
/*
t1.setName("线程A");
t1.start();
MyThread1 t2 = new MyThread1();
t2.setName("线程B");
t2.start();*/
}
}
class MyThread1 extends Thread {
private static int ticket = 100;
@Override
public void run() {
synchronized (this) {
while (true) {
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + "\t\t" + ticket);
ticket--;
} else {
break;
}
}
}
}
}
- 实现Runnable接口
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/06 19:35
* @Version:1.0
* 实现多线程的方式一
* 实现Runnable接口
*/
class MyThread implements Runnable {
private int number = 100;
@Override
public void run() {
synchronized (this) {
while (true) {
if (number>0){
System.out.println(Thread.currentThread().getName() +"\t\t"+ number);
number--;
}else{
break;
}
}
}
}
}
public class ThreadDemo {
public static void main(String[] args) {
MyThread myThread = new MyThread();
new Thread(myThread,"线程A").start();
new Thread(myThread,"线程B").start();
}
}
Callable的简单使用
- 实现Callable接口
实现多线程的方式三之实现Callable接口,之前两种太简单了就不写了
get方法一般请放在最后一行,因为如果实现的Callable类中有阻塞的话,它会等它执行完才会执行下面的
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/06 20:11
* @Version:1.0
*
* 选中那个接口,然后使用Ctrl+Alt+U查看这个类或者接口的类图
* get方法一般请放在最后一行,因为如果实现的Callable类中有阻塞的话
* 它会等它执行完才会执行下面的
*在·
*/
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* 多线程实现方法三实现Callable接口
*/
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+"\t\t hello world");
TimeUnit.SECONDS.sleep(4);
return 1024;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
// Interface RunnableFuture<V> 是Runnable的子接口 。FutureTask实现了RunnableFuture接口、
// FutureTask的一个构造方法 FutureTask(Callable<V> callable)
// Thread(Runnable target, String name) , 在Thread的构造器中只要传入Runnable或者Runnable的子接口或者实现类即可
// 这里主要是多态的的思想
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
/**
* 打印结果
* hello world
* 1024
* main计算完成、
* 这里只会打印一次hello world,因为它有缓存
*/
new Thread(futureTask,"线程a").start();
new Thread(futureTask,"线程b").start();
System.out.println(futureTask.get());
System.out.println(Thread.currentThread().getName() + "计算完成");
}
}
面试题:callable接口与runnable接口的区别?
答:(1)是否有返回值
(2)是否抛异常
(3)落地方法不一样,一个是run,一个是call
FutureTask
未来的任务,用它就干一件事,异步调用
main方法就像一个冰糖葫芦,一个个方法由main串起来。
但解决不了一个问题:正常调用挂起堵塞问题
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bYT2g7y5-1643028768964)(C:\Users\losser\AppData\Roaming\Typora\typora-user-images\image-20220108213632391.png)]
例子:
(1)老师上着课,口渴了,去买水不合适,讲课线程继续,我可以单起个线程找班长帮忙买水,
水买回来了放桌上,我需要的时候再去get。
(2)4个同学,A算1+20,B算21+30,C算31*到40,D算41+50,是不是C的计算量有点大啊,
FutureTask单起个线程给C计算,我先汇总ABD,最后等C计算完了再汇总C,拿到最终结果
(3)高考:会做的先做,不会的放在后面做
原理
在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,
当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,
就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,
然后会返回结果或者抛出异常。
只计算一次
get方法放到最后
FutureTask类图 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4tKbdRHL-1643028768964)(C:\Users\losser\AppData\Roaming\Typora\typora-user-images\image-20220107131432435.png)]
JUC强大的辅助类讲解
CountDownLatch 减少计数
原理
- CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
- 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
- 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
代码
生活中的实例就是晚自习,班长关门,也就是说要等所有的同学出去之后,班长才去关门,减少计数
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/07 15:19
* @Version:1.0
* CountDownLatch
* 这个类似生活中的例子:晚自习,班长关门,也就是说要等所有的同学出去之后,班长才去关门
* 减少计数
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 构造一个以给定计数 CountDownLatch CountDownLatch。 java.util.concurrent
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "离开教室");
//countDown() 减少锁存器的计数,如果计数达到零,释放所有等待的线程。
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 导致当前线程等到锁存器计数到零,除非线程是 interrupted,
// 其实也就是说当里面的count减少到0就不用等待了,唤醒线程
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t班长关闭教室");
}
private static void closeDoor() {
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "离开教室");
}, String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName() + "\t班长关闭教室");
}
}
CyclicBarrier 循环栅栏
原理
CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞, 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。
这个可以举一个简单的例子就是,集齐七颗龙珠召唤神龙,还有就是开会,必须要等到人到齐了才能开会。
代码
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/07 15:55
* @Version:1.0
* java.util.concurrent CyclicBarrier
* 其实这个有点像生活中的例子:开会,还有七龙珠,集齐七颗龙珠召唤神龙
* 开会必须等待所有人都到齐了才行
*
*
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙");
});
/**
* 等待所有parties已经在这个障碍上调用了await 。
* 如果当前线程不是最后一个线程,那么它被禁用以进行线程调度,并且处于休眠状态,直到发生下列事情之一:
* 最后一个线程到达; 要么
* 一些其他线程当前线程为interrupts ; 要么
* 一些其他线程interrupts其他等待线程之一; 要么
* 一些其他线程在等待屏障时超时; 要么
* 其他一些线程在这个屏障上调用reset() 。
*/
for (int i = 1; i <= 7; i++) {
int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集到的\t第" + temp + "颗龙珠");
try {
//等待所有parties已经在这个障碍上调用了await 。
//如果当前线程不是最后一个线程,那么它被禁用以进行线程调度,并且处于休眠状态,直到发生下列事情之一:最后一个线程到了
// 也就是等到最后一个线程到了,然后就开始执行start,start之后,线程的调度顺序还得看操作系统和cup的调度
cb.await();
// 执行完成之后,就调用CyclicBarrier(int parties, Runnable barrierAction) 里面的barrierAction线程
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, "线程\t" + i).start();
}
}
}
Semaphore 信号灯
原理
在信号量上我们定义两种操作:
acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
这个可以用生活中的抢车位来类比
代码
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/07 16:40
* @Version:1.0
* Semaphore这个主要是应用于抢车位
* 这个主要用于多线程的并发控制和资源的互斥
*
*
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 模拟资源类,有三个空车位
// 当permits设置为1就相当于synchronized了
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
//从此信号量获取许可证,阻止直到可用,否则线程为interrupted 。
//获得许可证,如果有可用并立即返回,则将可用许可证数量减少一个。
// 这里相当于抢占到车位
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到了车位");
// 线程睡4秒钟,模拟停车停了4秒钟
TimeUnit.SECONDS.sleep(4);
System.out.println(Thread.currentThread().getName() + "\t离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//发放许可证,将可用许可证的数量增加一个。 如果任何线程尝试获取许可证,
// 那么选择一个被授予刚被释放的许可证。 (重新)线程调度用于线程调度。
// 这里相当于释放掉车位
semaphore.release();
}
}, "线程" + i).start();
}
}
}
ReentrantReadWriteLock读写锁
类似案例
红蜘蛛、缓存、数据库的更新和查询操作。
代码
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/07 22:00
* @Version:1.0
* 多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行
* 但是
* 如果有一个线程想去写共享资源来,就不应该再有其他线程可以对该资源进行读或者写
* 小总结:
* 读-读能共存
* 读-写不能共存
* 写-写不能共存
* 使用读写锁,保证了数据的一致性操作 读锁是共享锁
* 其实就是写加写锁,读就加读锁 写锁是排他锁
*
*
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/**
* 添加操作
* @param key
* @param value
*/
public void put(String key, Object value) {
try {
// 加写锁
readWriteLock.writeLock().lock();
// 写入开始
System.out.println(Thread.currentThread().getName() + "\t------写入开始");
map.put(key, value);
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t------写入完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
readWriteLock.writeLock().unlock();
}
}
/**
* 读取操作
* @param key
*/
public void get(String key) {
try {
// 加写锁
readWriteLock.readLock().lock();
// 读取开始
System.out.println(Thread.currentThread().getName() + "\t~~~~~~~~~~~~~~~~~~~~~~读取开始");
map.get(key);
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t~~~~~~~~~~~~~~~~~~~~~~" +
".读取完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
readWriteLock.readLock().unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "\t", temp + "\t");
}, String.valueOf(i)).start();
}
for (int i = 1; i <=5 ; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "\t");
}, String.valueOf(i)).start();
}
}
}
阻塞队列,空的时候不能消费,满的时候不能增加
BlockingQueueDemo 阻塞队列
栈与队列
栈:先进后出,后进先出
队列:先进先出
阻塞队列的简单说明
阻塞:必须要阻塞/不得不阻塞
阻塞队列是一个队列,在数据结构中起的作用如下图:
当队列是空的,从队列中获取元素的操作将会被阻塞
当队列是满的,从队列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
阻塞队列的用处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
在concurrent包发布以前,在多线程环境下,**我们每个程序员都必须去自己控制这些细节,尤其还要兼顾 **效率和线程安全,而这会给我们的程序带来不小的复杂度。
阻塞队列的类图
下面三个最常用
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
BlockingQueue核心方法
抛出异常 | 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException |
---|---|
特殊值 | 插入方法,成功ture失败false 移除方法,成功返回出队列的元素,队列里没有就返回null |
一直阻塞 | 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用 |
超时退出 | 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
阻塞队列代码
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/09 8:56
* @Version:1.0
*
* 队列:先进先出,FIFO,就相当于食堂排队
*
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
/**
* 抛出异常
* add:添加方法
* remove:移除方法
* element:检查方法, element是返回队首的元素,如果队列没有元素就返回null
*/
// add方法是添加元素
/* System.out.println(queue.add("a"));
System.out.println(queue.add("a"));
System.out.println(queue.add("c"));
// 队列满了
// queue.add("d"); // Exception in thread "main" java.lang.IllegalStateException: Queue full
System.out.println("========================================================================================");
// element是返回队首的元素
System.out.println(queue.element());
// remove是移除元素
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
// 队列为空了
// System.out.println(queue.remove()); // Exception in thread "main" java.util.NoSuchElementException
System.out.println("========================================================================================");
*/
/**
* 特殊值
* offer:插入成功返回true
* poll:移除操作(移除队首的操作),成功返回出队列的元素,队列里面没有就返回null
* peek:检索但不删除由此queue表示的队列的头部(换句话说,该deque的第一个元素),
* 如果此deque为空,则返回 null 。
*/
/*
System.out.println(queue.offer("aa"));
System.out.println(queue.offer("bb"));
System.out.println(queue.offer("cc"));
// System.out.println(queue.offer("cc")); // false
System.out.println(queue.peek());
System.out.println(queue.peek());
System.out.println(queue.peek());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
// System.out.println(queue.poll()); // null
System.out.println("========================================================================================");
*/
/* queue.put("aa");
queue.put("aa");
queue.put("aa");
// queue.put("aa"); //这里本来只能放三个元素,但是想要添加四个元素,所以,最后的元素就会被阻塞,就相当于一直在等待消费去消费
queue.take();
queue.take();
queue.take();
// queue.take(); // 这队列里面本来只有三个元素,这里要获取第四个,所以就得一直等着,等生产者生产第四个
*//*
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
*//*
*/
System.out.println(queue.offer("aa"));
System.out.println(queue.offer("aa"));
System.out.println(queue.offer("aa"));
// 这个是等三秒,三秒要是还没有蛋糕就走了,过时不候
System.out.println(queue.offer("aa", 3L, TimeUnit.SECONDS));
}
}
方法参数的值传递机制
-
方法,必须由其所在类或对象调用才有意义。若方法含有参数:
- 形参:方法声明时的参数
- 实参:方法调用时实际传给形参的参数值
Java的实参值如何传入方法呢? Java里方法的参数传递方式只有一种:值传递。 即将实际参数值的副本 (复制品)传入方法内,而参数本身不受影响。
形参是基本数据类型:将实参基本数据类型变量的**“数据值”**传递给形参
形参是引用数据类型:将实参引用数据类型变量的**“地址值”**传递给形参
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/09 10:10
* @Version:1.0
*/
public class TestTransferValue {
public void changeValue1(int age) {
age = 30;
}
public void changeValue2(Person person) {
person.setName("xxxx");
}
public void changeValue3(String str) {
str = "~~~~";
}
public static void main(String[] args) {
TestTransferValue transferValue = new TestTransferValue();
// 在一个方法里面,基本类型只传复印件,原件不动
int age =20;
transferValue.changeValue1(age);
System.out.println("age = " + age);
// 这个自定义类型Person穿的是引用类型
Person person = new Person("zsf");
transferValue.changeValue2(person);
System.out.println("personName ------------->" + person.getName());
String str= "abc";
transferValue.changeValue3(str);
System.out.println("str = " + str);
}
}
Person类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {
private String name;
private int age;
public Person(String name) {
this.name = name;
}
}
ThreadPool线程池
为什么用线程池
例子:
10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。
现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
线程池的优势:
线程池做的工作只要是控制运行的线程数量,**处理过程中将任务放入队列,**然后在线程创建后启动这些任务,**如果线程数量超过了最大数量,超出数量的线程排队等候,**等其他线程执行完毕,再从队列中取出任务来执行。
**它的主要特点为:线程复用;控制最大并发数;管理线程。 **
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池如何使用
架构说明
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
这几个类
编码实现
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/09 14:10
* @Version:1.0
*
* 线程池
*
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {
// =================第一种=================
// Executors是线程池的工具类
// newFixedThreadPool创建 一个线程池,该线程池重用固定数量的从共享*队列中运行的线程。
// 创建线程池,设置办理窗口(初始线程)为5
// 一池5线程,类似一个银行有5个受理窗口
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
// =================第二种=================
// 线程池只有一个工作线程,类似于,银行只有一个办理窗口
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 一池N线程,一个池子里面有多个线程,类似于银行有N个受理窗口,这和每个线程执行的事件有关
// 创建一个根据需要创建新线程的线程池,但在可用时将重新使用以前构造的线程。
// =================第三种=================
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 1; i <= 10; i++) {
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理完成");
});
// 暂停1秒钟,模拟办理时候的耗时
// TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
底层原理
其实他底层都是创建一个ThreadPoolExecutor,然后传入不同类型的参数,调用不同的构造器
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CjmJinAS-1643028768967)(C:\Users\losser\AppData\Roaming\Typora\typora-user-images\image-20220109181909648.png)]
线程池几个重要参数
-
corePoolSize
:线程池中的常驻核心线程数 -
maximumPoolSize
:线程池中能够容纳同时
执行的最大线程数,此值必须大于等于1 -
keepAliveTime
:多余的空闲线程的存活时间
当前池中线程数量超过corePoolSize时,当空闲时间
达到keepAliveTime
时,多余线程会被销毁直到
只剩下corePoolSize
个线程为止 -
unit:
keepAliveTime
的单位 -
workQueue
:任务队列,被提交但尚未被执行的任务 -
threadFactory
:表示生成线程池中工作线程的线程工厂,
用于创建线程,一般默认的即可 -
handler
:拒绝策略,表示当队列满了,并且工作线程大于
等于线程池的最大线程数(maximumPoolSize
)时如何来拒绝
请求执行的runnable的策略
线程池底层工作原理
步骤是下面的1到4
下图的corePoolSize
为2,maximumPoolSize
为5,当一开始有两个客户(1,2线程)来办理业务的时候,两个今日窗口正好能够处理。当后面3,4,5又来时,BlockingQueue
也正好满了(为5);然后大堂经理就叫其他的员工来值班扩大到maximumPoolSize
(也就是扩容到最大的线程数5)。当后面又进来三个线程(6,7,8)的时候就进入候客区(BlockingQueue
),候客区也满了。但此时如果进来了另一个客户(9),就会告知本银行今天的业务繁忙,请到其他网点去办理业务(这里是按照拒绝策略处理)。如果设置了keepAliveTime
的话,新增的扩容的员工,在处理完所有的业务之后,过keepAliveTime
这么久时间,就会恢复到原来的corePoolSize
.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U6BfUT4k-1643028768969)(C:\Users\losser\AppData\Roaming\Typora\typora-user-images\image-20220109195333905.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FZGqEKKy-1643028768969)(C:\Users\losser\AppData\Roaming\Typora\typora-user-images\image-20220109201025539.png)]
以下重要:
-
在创建了线程池后,开始等待请求。
-
当调用**execute()**方法添加一个请求任务时,线程池会做出如下判断:
2.2如果正在运行的线程数量小于**corePoolSize**,那么马上创建线程运行这个任务; 2.2如果正在运行的线程数量大于或等于**corePoolSize**,那么将这个任务放入队列; 2.3如果这个时候队列满了且正在运行的线程数量还小于**maximumPoolSize**,那么还是要创建非核心线程立刻运行这个任务; 2.4如果队列满了且正在运行的线程数量大于或等于**maximumPoolSize**,那么线程池会启动饱和拒绝策略来执行。
3当一个线程完成任务时,它会从队列中取下一个任务来执行。
4当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
线程池用哪个?生产中如设置合理参数
线程池的拒绝策略
是什么
等待队列已经排满了,再也塞不下新任务了
同时,线程池中的max线程也达到了,无法继续为新任务服务。
这个是时候我们就需要拒绝策略机制合理的处理这个问题。
JDK内置的拒绝策略
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。 - DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
尝试再次提交当前任务。 - DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。
如果允许任务丢失,这是最好的一种策略。
以上内置拒绝策略均实现了RejectedExecutionHandle接口
在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?超级大坑
- 答案是一个都不用,我们工作中只能使用自定义的
- Executors中JDK已经给你提供了,为什么不用?
Java8之流式计算复习
函数式接口
java.util.function
java内置核心四大函数式接口
伪代码:
//R apply(T t);函数型接口,一个参数,一个返回值
Function<String,Integer> function = t ->{return t.length();};
System.out.println(function.apply("abcd"));
//boolean test(T t);断定型接口,一个参数,返回boolean
Predicate<String> predicate = t->{return t.startsWith("a");};
System.out.println(predicate.test("a"));
// void accept(T t);消费型接口,一个参数,没有返回值
Consumer<String> consumer = t->{
System.out.println(t);
};
consumer.accept("javaXXXX");
//T get(); 供给型接口,无参数,有返回值
Supplier<String> supplier =()->{return UUID.randomUUID().toString();};
System.out.println(supplier.get());
Stream流
流是什么
流(Stream) 到底是什么呢?
是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
“集合讲的是数据,流讲的是计算!”
特点
- Stream 自己不会存储元素
- Stream 不会改变源对象。相反,他们会返回一个持有结果的新Stream。
- Stream 操作是延迟执行的。这意味着他们会等到需要结果的时候才执行。
阶段
- 创建一个Stream:一个数据源(数组、集合)
- 中间操作:一个中间操作,处理数据源数据
- 终止操作:一个终止操作,执行中间操作链,产生结果
过程:
源头=>中间流水线=>结果
代码示例:
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/10 13:15
* @Version:1.0
*
* (lambda表达式的几个步骤 :
*1 、 拷贝小括号 ( 如果只有一个参数的话, 可以省略参数类型和括号)
* 2、写死右箭头
* 3/落地大括号
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
// 支持链式调用
@Accessors(chain = true)
class User {
// 链式编程 + 流式计算
private Integer id;
private String userName;
private Integer age;
}
/**
* 题目:请按照给出数据,找出同时满足以下条件的用户,也即以下条件全部满足
* 偶数ID且年龄大于24且用户名转为大写且用户名字母倒排序
* 只输出一个用户名字
*/
public class StreamDemo {
public static void main(String[] args) {
User u1 = new User(11, "a", 23);
User u2 = new User(12, "b", 24);
User u3 = new User(13, "c", 22);
User u4 = new User(14, "d", 28);
User u5 = new User(16, "e", 26);
// 将数组转换成list集合
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
List<User> users = new ArrayList<>();
list.stream().filter(user -> {
// 过滤掉为id为偶数的
return user.getId() % 2 == 0;
}).filter(user -> {
// 过滤年龄大于24的
return user.getAge() > 24;
// map是映射
// sorted是stream的排序,默认是升序
}).map(user -> user.getUserName().toUpperCase()).sorted((o1, o2) -> {
// 降序排列
return -o1.compareTo(o2);
// limit和MySQL中的类似,限制个数.forEach进行遍历,然后加
}).limit(1).forEach(System.out::println);
list.stream().sorted((o1, o2) -> {
return -1;
});
/* for (User user : list) {
if (user.getId() % 2 == 0 && user.getAge() > 24) {
user.setUserName(user.getUserName().toUpperCase());
users.add(user);
}
}*/
}
}
分支合并框架
原理
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
相关类
和Exector相关,以及线程池有关
ForkJoinPool
分支合并池 类比=> 线程池
ForkJoinTask
ForkJoinTask 类比=> FutureTask
RecursiveTask
递归任务:继承后可以实现递归(自己调自己)调用的任务
伪代码
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
ForkJoin的使用代码
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/10 22:05
* @Version:1.0
* ForkJoin框架
* 用到了分治算法
* 基础回顾:
* 抽象类被继承
* 接口被实现
*/
class MyTask extends RecursiveTask<Integer> {
private static final Integer ADJUST_VALUE = 10;
public int begin;
public int end;
public int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (end - begin <= ADJUST_VALUE) {
// 当数值差小于等于10的时候就直接进行累加
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
int middle = (begin + end) / 2;
// 这里是递归
// 第一个任务从begin加到中间
MyTask task01 = new MyTask(begin, middle);
// 第二个任务从中间加到最末尾
MyTask task02 = new MyTask(middle + 1, end);
// 回来调用compute
task01.fork();
task02.fork();
// 当 is done返回计算结果。
result = task01.join() + task02.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) {
// 创建forkjoin池
ForkJoinPool forkJoinPool = null;
try {
MyTask myTask = new MyTask(0, 100);
forkJoinPool = new ForkJoinPool();
// 计算任务
ForkJoinTask<Integer> task = forkJoinPool.submit(myTask);
// 获取计算的结果
Integer integer = task.get();
System.out.println("integer = " + integer);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
}
异步回调CompletableFuture
CompletableFuture
这一段,听得挺迷的,有点类似于ajax的异步请求
/**
* Created with IntelliJ IDEA.
* @Author: pzx
* @Date: 2022/01/10 22:47
* @Version:1.0
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t没有返回值");
});
completableFuture.get();
// 异步回调,有返回值supplyAsync供给型函数式接口
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture2");
// return 10 / 0;
return 1024;
});
// whenComplete方法返回一个新的CompletableFuture,当CompletableFuture完成时完成,
// whenComplete(
// BiConsumer<? super T, ? super Throwable> action)
Integer result = completableFuture2.whenComplete((a, b) -> {
// 不出异常就走这一步
System.out.println("a = " + a);
// 这里为null就表示没有异常
System.out.println("b = " + b);
//结果是异常触发此CompletableFuture的完成特殊功能的给定功能;
// 否则,如果此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。
}).exceptionally(f -> {
// CompletableFuture<T> exceptionally(
// Function<Throwable, ? extends T> fn)
// 这个是当出现异常的时候就会走这一步
System.out.println(f.getMessage());
return 444;
}).get();
System.out.println(result);
}
}