目录
1.什么是JUC
2.进程和线程回顾
3.Lock锁
4.生产者和消费者
5.8锁的现象
6.集合类不安全
7.Callable
8.CountDownLatch、CyclicBarrier、Semaphore
9.读写锁
10.阻塞队列
11.线程池
12.四大函数式接口
13.Stream流式计算
14.分支合并
15.异步回顾
16.JMM
17.volatile
18.深入单例模式
19.深入理解CAS
20.原子引用
21.可重入锁、公平锁、非公平锁。自旋锁、死锁....
1 什么是JUC
JUC涉及三个工具包"java.util.concurrent","java.util.concurrent.atomic","java.util.concurrent.lock"
回顾
- Java程序默认有两个线程,一个main线程,一个GC线程
- java 不能开线程,无法操作硬件,调start0 C语言开线程
- 并发vs并行,单核cpu并发,模拟出多线程,实际上是串行快速交替。多核CPU才是真正并行
- 查看CPU几核的方法
- 任务管理器-> 性能->CPU->查看
- 代码查看
Runtime.getRuntime().availableProcessors();
并发的本质充分利用CPU的资源
- 任务管理器-> 性能->CPU->查看
- 线程的状态--Thread.State枚举类6个值
NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITIN,TERMINATED. - wait/sleep区别
- 来自不同类 wait>Object,sleep>Thread
企业中一般不用Thread.sleep,而是java.util.concurrent.TimeUnit,如TimeUnit.DAYS.sleep() - 有没有释放锁
sleep抱着锁睡觉,不会释放锁,wait释放锁,其他的执行等待被notify. - 使用范围不同
wait 不许在同步代码块中使用
sleep 可以在任何地方睡
3. LOCK锁
3.1 传统的synchronized
package juc;
public class SaleTicketDemo01 {
public static void main(String[] args) {
//并发:多个线程操作同一个资源对象,把资源对象丢到线程里
Ticket ticket =new Ticket();
new Thread(()->{ ticket.sale();},"thread 1").start();
new Thread(()->{ticket.sale(); },"thread 2").start();
new Thread(()->{ ticket.sale();},"thread 3").start();
}
}
//面向对象编程,资源类是一个解耦的标准类,具有属性和方法,但是不用实现Runnable/继承Thread
//从资源对象角度看,和线程是解耦的。
class Ticket{
private int count = 50;
//卖票方法上加上synchronized,synchronized本质上是队列,锁
public synchronized void sale(){
while(count>0) {
System.out.println(Thread.currentThread().getName()+" 卖掉第" + count + "张票");
count--;
}
}
}
3.2 LOCK
- 涉及操作
Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); } - Lock下有三个实现类,可重入锁,读锁和写锁
- ReentrantLock实现分公平锁,非公平锁
公平锁:十分公平,遵循先来后到
非公平锁:十分不公平,可以插队。java默认采用非公平锁(如果前面的任务耗时3个小时,后面的任务只耗时3s,如果使用公平锁这样排队等待并不合理高效)
class Ticket2 {
private int count = 50;
Lock lock = new ReentrantLock();
public void saleTicket() {
lock.lock();
try {
//access the resouce protected by the lock in try block
while (count > 0) {
System.out.println("sale ticket" + count--);
}
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
- Lock锁和Synchronized的区别
- Synchronized是java内置的关键字,Lock是个类
- Synchronized 无法判断锁的状态,Lock可以判断是否获取到了锁
- Synchronized 会自动释放锁,Lock需要手动上锁放锁。如果不释放锁就会造成死锁
- Synchronized 线程1获得锁阻塞,线程2只能等待;Lock锁有tryLock方法,锁不了也可以继续,不需要一直等待
- Synchronized 是可重入锁,不可中断的,非公平的,这些特性不可更改
Lock 可重入锁,可以判断锁,可以设置公平/非公平 - Synchronized 适合锁少量的代码同步问题,Lock适合大量的同步代码
4. 生产者和消费者
(面试时候手写的问题:单例模式,8大排序算法,生产者和消费者问题,死锁)
- 线程通信问题,就是生产者消费者问题;
涉及到线程交替执行,等待唤醒,和通知唤醒。 A,B 操作一个变量; num=0;
A num+1;
B num-1; - 只要并发操作资源,就要有锁。过去是用Synchronized来实现的,结合wait notify,后面用JUC里的Lock来实现
4.1 Synchronized例
package juc;
public class A {
public static void main(String[] args) {
//定义资源对象
Data data = new Data();
//可以启动更多的线程
new Thread(()->{ for(int i =0;i<10;i++){
data.increment()
}
},"producer").start();
new Thread(()->{ for(int i =0;i<10;i++){
data.decrement()
}
},"consumer").start();
}
}
//资源类必须干净,只有属性和方法
//方法:写法是等待,业务逻辑, 通知
class Data{
private int number =0;
//+1
public synchronized void increment(){
try {
//先判断需不需要等待,然后等待
if(number!=0){
this.wait();
}
//完成业务逻辑
number++;
//完成后通知
System.out.println(Thread.currentThread().getName()+" "+number);
this.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//-1
public synchronized void decrement(){
if(number ==0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(Thread.currentThread().getName()+" "+number);
this.notify();
}
}
注意main里启动两个线程结果是对的,但是启动更多线程如4个输出是错的,这涉及虚假唤醒问题
- 虚假唤醒:
常出现在多核CPU上,当满足if条件后多个线程被同时唤醒,线程A已经改变了对象,不再满足条件,线程B仍旧继续执行。
类似于商品只到货一件,但是客户A B都被通知到了,但是A拿到了,B无法拿到锁被无用的假唤醒了 - 解决方案:
if只执行一次判断,接续执行下面流程
采用while循环判断,只有不满足条件了,才会执行while后的流程。对应wait notify过程,在第一次被唤醒后,再次进行while判断,直到不满足条件,才会执行while下面的业务逻辑
//increment里
while(number!=0) this.wait();
//decrement里
while(number ==0) this.wait();
wait判断需不需要等待一定要用while循环,不能用if,可能会出现虚假等待
4.2 使用JUC下的Lock
- 类比学习,JUC里应该也有类似wait notify功能,就是condition里的await和signal方法
查找API可看见contion的用法
例: - 使用lock锁,先锁,try-catch-finally(释放锁),然后业务逻辑放在try里
- 用同一把锁,同时只能执行一个
- 资源类必须干净,只有属性和方法,不继承线程相关接口、类
- 方法:写法是等待,业务逻辑, 通知
- 使用lock锁,先锁,try-catch-finally(释放锁),然后业务逻辑放在try里
package juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class B {
public static void main(String[] args) {
//定义资源对象
Data data = new Data();
//四个线程两个加,两个减
new Thread(()->{ for(int i =0;i<10;i++){
data.increment();
}
},"A").start();
new Thread(()->{ for(int i =0;i<10;i++){
data.decrement();
}
},"B").start();
new Thread(()->{ for(int i =0;i<10;i++){
data.increment();
}
},"C").start();
new Thread(()->{ for(int i =0;i<10;i++){
data.decrement();
}
},"D").start();
}
}
// 资源类必须干净,只有属性和方法
// 方法:写法是等待,业务逻辑, 通知
class Data {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// +1,使用lock锁,先锁,try-catch-finally(释放锁),然后业务逻辑放在try里
public void increment() {
lock.lock();
try {
// 先判断需不需要等待,然后等待
while (number != 0) {
condition.await();
}
// 完成业务逻辑
number++;
// 完成后通知
System.out.println(Thread.currentThread().getName() + " " + number);
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public synchronized void decrement() {
lock.lock();
try {
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + " " + number);
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
这种只用一个condition的情况和原来用wait notify一样。
任何新技术绝对不仅仅覆盖原有技术,必然有优势和补充.
上例输出结果是随机的状态 ABCD唤醒的任意,但是condition可以让其有序执行A唤醒B->C->D
Contion可以精准的通知线程
例
package juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class C {
// Execute A ,when finish call B; Execute B,then call C
// When C is finished call A
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for(int i =0; i<10;i++){
data3.printA();
}
},"A").start();
new Thread(()->{
for(int i =0; i<10;i++){
data3.printB();
}
},"B").start();
new Thread(()->{
for(int i =0; i<10;i++){
data3.printC();
}
},"c").start();
}
}
class Data3 {
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
int number = 1;//1A,2B,3C设置标志位,根据条件判断是否等待
public void printA(){
lock.lock();
try{
while(number!=1){
conditionA.await();
}
System.out.println(Thread.currentThread().getName()+"AAAAA");
number++;
//精准定向唤醒
conditionB.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void printB(){
lock.lock();
try{
while(number!=2){
conditionB.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"BBBBBB");
conditionC.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void printC(){
lock.lock();
try{
while(number!=3){
conditionC.await();
}
number=1;
System.out.println(Thread.currentThread().getName()+"CCCCCC");
conditionA.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}
5 8锁现象(关于锁的8个问题)
锁是什么?如何判断锁的是谁
锁只会锁对象,或者class对象
8个问题都是探讨两个线程是先打电话还是先发短信?但是是在不同的实现条件下:
- 第一种锁,标准情况下,两个synchronized修饰的成员方法,只有一个实例对象,不设置休眠
/*
--结果先发短信,后打电话
--原因
synchronized 方法的锁是this,调用的对象本身。此处传递的是同一个调用者phone.
由于发短信的线程先start获取CPU时间片概率大,获得对象锁,所以先发短信,后打电话
*/
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sendSms();}).start();
new Thread(()->phone.call()).start();
}
}
class Phone{
public synchronized void sendSms(){
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
}
- 第二种锁 两个synchronized修饰的对象方法,同一个对象,其中先调用的方法sendSms延迟4秒,是先打电话还是发短信?
/*
--结果先发短信,后打电话
--原因:仍旧锁的是同一个对象锁,发短信先调用大概率先获取锁,先执行
*/
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sendSms();}).start();
new Thread(()->phone.call()).start();
}
}
class Phone{
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
}
- 第三种锁 一个synchronized方法睡眠4秒,一个普通方法,同一个对象
/*
----结果先输出hello,然后发短信
原因hello上没锁,不用竞争发短信的锁,且发短信睡了4秒
*/
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sendSms();}).start();
new Thread(()->phone.hello()).start();
}
}
class Phone{
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
public void hello(){
System.out.println("hello");
}
}
- 第四种锁:两个synchronized的不同方法,两个不同对象
/*
----结果先输出call,然后发短信
原因锁对象是两个不同的对象互不影响,且发短信睡了4秒
*/
public class Test1 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(()->{phone1.sendSms();}).start();
new Thread(()->phone2.call).start();
}
}
class Phone{
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
}
- 第五种锁:两个静态synchronized的不同方法,同一个对象
/*
--结果先输出发短信,后打电话
原因 锁对象.class模板的对象,这个对象有且只有一个。
是同一个锁,程序会从上往下一次执行。
*/
public class Test1 {
public static void main(String[] args) {
Phone phone1 = new Phone();
new Thread(()->{phone1.sendSms();}).start();
new Thread(()->phone1.call).start();
}
}
class Phone{
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public static synchronized void call(){
System.out.println("call");
}
}
- 第六种锁 synchronized修饰的普通方法和静态方法,同一个对象调用
/*
--结果先输出打电话后发短信
原因 锁的对象是不同的,一个是类对象,一个是普通对象互不影响
*/
public class Test1 {
public static void main(String[] args) {
Phone phone1 = new Phone();
new Thread(()->{phone1.sendSms();}).start();
new Thread(()->phone1.call).start();
}
}
class Phone{
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
}
- 第七种锁 两个不同的静态方法,两个不同对象
/*
--结果先输出发短信
原因 虽然调用者是两个不同对象,但是静态方法锁的都是同一个类对象,先调用的先执行
*/
public class Test1 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(()->{phone1.sendSms();}).start();
new Thread(()->phone2.call).start();
}
}
class Phone{
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public static synchronized void call(){
System.out.println("call");
}
}
- 第8种锁 synchronized修饰的普通方法和静态方法,不同的对象调用
/*
--结果先输出打电话后发短信
原因 不论调用者是谁,静态同步方法锁的就是类对象
锁的对象是不同的,一个是类对象,一个是普通对象互不影响
*/
public class Test1 {
public static void main(String[] args) {
Phone phone1 = new Phone();
new Thread(()->{phone1.sendSms();}).start();
new Thread(()->phone1.call).start();
}
}
class Phone{
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("sendSms");
}
public synchronized void call(){
System.out.println("call");
}
}
6. 集合类不安全
6.1 ArrayList不安全
解决方案
1.使用Vector读写安全都加synchronized锁
2.使用list=Collections.synchronizedList(arraylist)结合工具类,编程带锁的list
3.使用JUC下的 new CopyOnWriteArrayList(),该方法只锁写方法,读方法不加锁,(加的lock锁,不是synchronized)。写时复制,先将当前容器复制一份,然后在新副本上执行写操作,结束之后再将原容器的引用指向新容器。读写分离
6.2 Set不安全
解决方案
1.使用工具类Collections.synchronizedSet(new HashSet<>())
2. 使用JUC下 new CopyOnWriteArraySet()
扩展: HashSet底层是一个HashMap,其add方法是把元素当做key添加到map里,key是不重复的,set也是不重复的
6.3 Map不安全
HashMap
1.Collections.synchronizedMap(m)
2. new ConcurrentHashMap() ConcurrentHashMap重点具体原理自查
7. Callable
- 接口,不同于Runnable,它可以抛出异常,有返回值,调用call方法
- Callable和Thread没有直接联系,那怎么线程启动Callable呢?
使用适配类FutureTask,它是Runnable的是实现类,且FutureTask的构造方法参数可以传递Callable接口
所以可以调用new Thread(new FutureTask(new Callable()).start来启动
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableTest {
public static void main(String[] args) throws InterruptedException, ExecutionException{
//new Thread(new Runnable()).start
//new FutureTask(new Callable)
//FutureTask implements Runnalbe
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();//启动两个线程但是只输出一次调用结果
//从futureTask里获取返回值,get方法等待返回结果可能会产生阻塞
//通常放在最后,或者使用异步通信
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String>{
public String call(){
System.out.println("call()");
return "1024";
}
}
输出结果
call()
1024
两个线程启动同一个futureTask,只输出一次结果原因,futureTask run的时候会判断FutureTask状态,只有转换为running后才可以调用call。
而第一次调用后状态已经记录过了,不会再运行。(待看源码深入)
8. 常用的辅助类(必会)
8.1 CountDownLatch
闭锁 减法计数器,当有必须要先执行的任务时可以用countDownLatch计数等待完成后,再执行当前任务的其他流程
countDownLatch.countDown()//用在各个线程里,执行数量减1操作
countDownLatch.await();等待计数器归零,然后向下执行
package juc;
import java.util.concurrent.CountDownLatch;
//计数器,countDown方法减一
public class CountDownLatchDemo {
public static void main(String[] args) {
//总数是6,当有必须要执行的任务时候,再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for(int i =1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"go out");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await(); //等待计数器归零,然后再向下执行
System.out.println("close Door")
}
}
8.2 CyclicBarrier
循环屏障 也是用于线程控制的
CyclicBarrier(int parties, Runnable barrierAction),参数1构造器里除了指定计数数量,参数2指定了当屏障跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行.
await各线程阻塞,等待所有parties都在这个屏障上调用了await
//加法计数器
public class CyclicBarrierDemo {
public static void main(String[] args) {
//例集7颗龙珠召唤神龙
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙");
})
for(int i =1;i<=7;i++){
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+
"收集"+temp+"个龙珠");
try{
cyclicBarrier.await();//等待
} catch(InterruptedException e){
e.printStackTrace();
} catch ( BrokenBarrierException e){
e.printStackTrace();
}
}.start();
}
}
}
CountDownLatch和CyclicBarrier的区别
- countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;
- 前者由countdown来执行减1操作,后者await执行加1操作
- 前者不可循环使用,后者当数量达到指定的parites后清零,重新计数,可循环使用
扩展Lambda表达式是否能引用外部变量
Lambda表达式能引用effectively final的外部变量.即外部变量在它所在的作用域范围内,只赋值过一次,变量值不曾改变,就算是effectively final。
对于增强for循环里的变量i,它式effectively final的,因为增强for底层使用的迭代器。一次循环i只赋值一次。
对于显示for的i++的就不是final的了,一次for循环变量值改变。详细参考 https://www.cnblogs.com/skyblue123/p/13819163.html
8.3 Semaphore 信号量
semaphore.aquire();获得许可证,如果满了没有则会阻塞,等待被释放为止
semaphore.release();释放,将当前信号量释放,总许可证+1,然后唤醒等待的线程
作用:多个共享资源互斥的使用;并发限流,控制最大的线程数,保证服务器安全性
例 抢车位 6个车只有3个车位
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for(int i =1; i<=6;i++){
new Thread(()->{
//aquire()得到,release释放
try{
semaphore.aquire();//阻塞式获得
System.out.println(Thread.currentThread().getName()
+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()
+"离开车位");
}catch(InterruptedException e){
e.printStackTrace();
}finally{
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
9. 读写锁 ReadWriteLock
关联了一对读写锁,读的时候可以多个阅读同时进行,写只能有一个。(读锁/共享锁,写锁/独占锁)
为什么读也加锁?读锁与写锁互斥,如果你的代码修改数据,只能有一个人在写,且不能同时读取,防止脏读那就上写锁。
ReadWriteLock
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for(int i = 1; i<=5; i++){
final int temp = i;
new Thread(()->{
myCache.put(temp+"");
},String.valueOf(i)).start();
}
//读取
for(int i = 1; i<=5; i++){
final int temp = i;
new Thread(()->{
myCache.get(i);
},String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String,Objce> map = new HashMap<>();
public void put(String key, Object value){
System.out.println(thread.currentThread().getName+"写入"+key);
map.put(key,value);
System.out.println(thread.currentThread().getName+"写入OK");
}
public void get(){
System.out.println(thread.currentThread().getName+"读取"+key);
Object o = map.get(key);
System.out.println(thread.currentThread().getName+"读取OK");
}
}
class MyCacheLock{
private volatile Map<String,Objce> map = new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value){
readWriteLock.writeLock().lock();
try{
System.out.println(thread.currentThread().getName+"写入"+key);
map.put(key,value);
System.out.println(thread.currentThread().getName+"写入OK");
}catch(Exception e){
e.printStackTrace();
} finally{
readWriteLock.writeLock().unlock();
}
}
public void get(){
readWriteLock.readLock().lock();
System.out.println(thread.currentThread().getName+"读取"+key);
Object o = map.get(key);
System.out.println(thread.currentThread().getName+"读取OK");
readWriteLock.readLock().unlock();
}
}
10. 阻塞队列BlockingQueue
10.1 什么试阻塞队列
写入:如果队列满了,就必须阻塞等待
读:如果队列是空的,必须阻塞等待生产
BlockingQueue不是新的东西,和set,list同级,都实现Collections接口
什么情况下我们会使用阻塞队列? 多线程并发处理,线程池!
学会使用队列
添加,移除
10.2 四组API
1.抛出异常
2.不会抛出异常
3.阻塞等待
4.超时等待
方式 | 抛出异常 | 不会抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(E e, long timeout, TimeUnit unit) |
移除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
检测队列首元素 | element() | peek() | - | - |
例:
public static void test4() throws InterruptedException{
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//false doesn't throw exception
System.out.println(blockingQueue.offer("d",2,TimeUnit.SECONDS));
System.out.println("========");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//return null
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
10.3 SynchronousQueue同步队列
最多只有一个元素,必须等待取出来后才可以继续添加元素
put,take
public static void test5(){
BlockingQueue<String> blockingQueue = new SynchronousQueue();
new Thread(()->{
try{
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e){
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try{
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" => "
+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" => "
+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" => "
+blockingQueue.take());
} catch (InterruptedException e){
e.printStackTrace();
}
},"T2").start();
}
11 线程池(重点)
11.1 池化技术
程序的运行,占用系统的资源,使用池化技术优化系统资源的使用,如线程池,JDBC连接池,内存池,对象池
池化技术:事先准备一些资源,用的时候直接取,用完归还池中
11.2 线程池的好处:
1.降低资源的消耗(创建销毁十分消耗资源)
2.提高效率
3.方便管理
线程复用,可以控制最大并发数,管理线程
线程池: 三大方法,7大参数,4中拒绝策略
11.3 Executors工具类下的三大方法
newSingleThreadScheduledExecutor()单个线程的线程池
newFixedThreadPool()固定总数的线程池
newCachedThreadPool() 根据需要创建线程池,如果没有可用的线程,将创建一个新的线程并将其添加到该池中。 未使用六十秒的线程将被终止并从缓存中删除。
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
//Executors.newFixedThreadPool(5);//固定有5个线程大小
//Executors.newCachedThreadPool();//根据cpu的能力和需求综合考虑决定池中的线程数
try{
for(int i =0 ;i<10 ; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
}
}
}
11.4 七大参数
源码分析, 三大方法都调用ThreadPoolExecutor
int corePoolSize, //核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没人用就会释放
TimeUnit unit,//超时时间单位
BlockingQueue
ThreadFactory threadFactory,//线程工厂,用来创建线程的,一般不用动
RejectedExecutionHandler handler) { //拒绝策略
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没人用就会释放
TimeUnit unit,//超时时间单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,用来创建线程的,一般不用动
RejectedExecutionHandler handler) { //拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
7个参数类比银行办理业务图(回去加上)
例
public static void test1(){
BlockingQueue blockingQueue = new ArrayBlockingQueue(4);
ExecutorService threadPool = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,
new LinkedBlockingDeque(3),Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());//abortPolicy默认抛出异常
//AbortPolicy 是static的,不用new 外部类直接new内部类对象
try{
for(int i =0 ;i<10 ; i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
}
11.5 四种拒绝策略
查看源码ThreadPoolExecutor构造器,可以看到handler传递的类型接口,找到所有实现类对应4中拒绝策略
new ThreadPoolExecutor.AbortPolicy() 抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() 调用者处理
new ThreadPoolExecutor.DiscardPolicy() 队列满了不会抛出异常,丢掉新来的
new ThreadPoolExecutor.DiscardOldestPolicy()队列满了尝试和最早的老的竞争,也不会抛出异常
11.6 池的最大线程数maximumPoolSize该如何定义(调优)
1.CPU密集型 如12核,最多12条线程同时执行.策略1,几核的就定义几,保证CPU效率最高
Runtime.getRuntime().availableProcessors()运行时实时获取CPU核数
2.IO密集型, 判断你的程序中十分耗IO的线程数
如程序 有15个大型任务,io十分占用资源,可以设置为两倍30个
12. 函数式接口
参考 ![java 8 新特性篇](https://www.cnblogs.com/drying-net/p/14714242.html)
13. Stream流计算
大数据=存储+计算 参考 ![java 8 新特性篇](https://www.cnblogs.com/drying-net/p/14714242.html)
14. ForkJoin
- 什么是ForkJoin在JDK 1.7,并行执行任务!提高效率,大数据量
大数据 Map Reduce(把大任务拆分小任务,小任务的执行结果返回给上级,最后形成最终结果) - ForkJoin 特点:工作窃取
B线程执行完把A线程的一个任务拿过来继续执行。
这里面维护的都是双端队列 - FrokJoin用法
package juc;
public class ForkjoinDemo extends ForkJoinTask<Long{
/*
* sum task
* 普通写法<forkJoin<Stream
* 如何使用forkJoin
* 1.通过forkjoinPool来执行
* 2.计算任务forkjoinPool.execute(ForkJoinTask<?> task)
*/
private long start;
private long end;
private long temp= 1000L;
public static void main(String[] args) {
long begin = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<long> task = new ForkJoinDemo(0L,10_0000_0000L)
//forkJoinPool.execute(task); 异步
ForkJoinTask<long> submit = forkJoinPool.submit(task);
Long sum = submit.sum();
long endtime = System.currentTimeMillis();
System.out.println(begin-endtime);
}
/*
test1()普通for循环算法略
*/
//使用forkjoin分治计算次优
//overide
protected long compute(){
if((end-start)>temp){
Long sum =0L;
for (int i =start; i<end;i++){
sumd+=i;
}
System.out.println(sum);
}else{
long middle = (start+end)/2;
ForkjoinDemo task1 = new ForkjoinDemo(start, middle);
task1.fork();把任务压入线程队列
ForkjoinDemo task2 = new ForkjoinDemo(middle+1, end);
task2.fork();把任务压入线程队列
long l = task1.join()+task2.join();
}
}
//使用stream 并行流计算最优
public static void test3(){
long begin = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L,10_0000_0000L).
parallel().reduce(0,Long::sum)
long endtime = System.currentTimeMillis();
System.out.println(begin-endtime);
}
}
15. 异步调用CompletableFuture
想要获取调用结果使用supplyAsync方法,不想要结果使用runAsync
public class FutureDemo{
public static void main(String[] args){
}
public static void test2(){
CompletableFuture<Interger> completableFuture = completableFuture.supplyAsync(
()->{
System.out.println(Thread.currentThread().getName()+
" supplyAsync=>Interger");
int i = 10/0;
return 1024;
}
);
completableFuture.whenComplete((t,u)->{
System.out.println("t=>"+t); //正常时返回正常结果,错误时返回null
System.out.println("u=>"+u); //正常情况为null,当completable里出现异常时,返回错误信息
}).exceptionally((e)->{
System.out.println(e.getMessage());
return 233;
}).get());//只有在报错是exceptionally才会走到输出错误信息
}
public static void test1(){
CompletableFuture<Void> completableFuture = completableFuture.runAsync(
()->{
try{
TimeUnit.SECONDS.sleep(2);
}catch(InterruptedException e){
e.printStackTracve();
}
System.out.println(Thread.currentThread().getName()
+"runAsync=>Void");
}
);
System.out.println("1111");
completableFuture.get();//获取执行结果
}
}
16. JMM 17 volatile
相关讨论请你谈谈Volatile的理解
Volatile是Java虚拟机提供轻量级的同步机制
保证可见性
不保证原子性
禁止指令重排
16.1 保证可见性-- 什么是JMM
java内存模型,概念,约定
1)线程解锁前,必须把共享变量刷回主存
2)线程加锁前,必须读取贮存中的最新值到工作内存
3)加锁和解锁必须是同一把锁
javap可以看到字节码文件,如javap =c VDem02.class
详参 JVM篇
16.2 不保证原子性
volatile不保证原子性,那使用什么来保证?synchronized还是Lock
可选择比较高级的原子类atomic
原来是 private volatile static int num =0; num++操作不保证原子性
现在
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
num.getAndIncrement();//AtomicInteger+1方法,底层使用CAS 并发效率高
}
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在
16.3 禁止指令重排
什么是指令重排?你写的程序,计算机并不是按照你写的那样取执行的。
源代码->编译器优化的重排->指令并行也可能重排->呢村系统也会重排->执行
例
int x =1; //1
int y = 2; //2
x = x + 5; //3
y = x * x; //4
期望执行顺序1,2,3,4,但是可能是2134, 1324,但是不可能是4123
** 处理器在进行指令重排的时候,会考虑数据之间的依赖性**,同一线程内最终结果不受影响
a,b,x,y四个值默认都是0
线程A | 线程B |
---|---|
x = a | y = b |
b = 1 | a = 2 |
正常x=0,y=0,但是由于存在指令重排,可能先执行b=1, a =2,才执行 x=a, y=b,两个线程的结果受到影响
volatile可以避免指令重排
内存屏障,CPU指令,作用:
1.保证特定的操作的执行顺序,在volatile上面和下面都增加屏障,上面的代码始终在上面,下面的代码始终在下面
2. 可以保证某些变量的内存可见性(利用这些特性,volatile实现了可见性)
18 单例模式
饿汉式,DCL懒汉式(使用volatile
18.1 单例实现方式
18.1.1 饿汉式
package single;
public class Hungry {
private final static Hungry HUNGRY = new Hungry();
//饿汉模式,一上来就已经有new Hungry()对象了,可能会浪费空间data1~4属性占内存
private byte[] data1 = new byte[1024*1024];
private byte[] data2 = new byte[1024*1024];
private byte[] data3 = new byte[1024*1024];
private byte[] data4 = new byte[1024*1024];
public static void main(String[] args) {
}
private Hungry(){
}
public static Hungry getInstance(){
return HUNGRY;
}
}
18.1.2 懒汉模式,用的时候再创建
package single;
public class LazyMan {
public static void main(String[] args) {
for(int i =0;i<10;i++){
new Thread(new Runnable(){
public void run() {
System.out.println(Thread.currentThread().getName()
+LazyMan.getInstance());
}
}, i+"").start();
}
}
private LazyMan(){
System.out.println(Thread.currentThread().getName()+" ok");
}
private static LazyMan lazyMan;
public static LazyMan getInstance(){
if(lazyMan == null){
lazyMan = new LazyMan();
}
return lazyMan;
}
}
这种情况在多线程的时候,不能保证只创建了一个对象
1 ok
0 ok
0single.LazyMan@5a347448
1single.LazyMan@1bb1deea
5single.LazyMan@5a347448
3single.LazyMan@5a347448
4single.LazyMan@5a347448
2single.LazyMan@5a347448
7single.LazyMan@5a347448
6single.LazyMan@5a347448
9single.LazyMan@5a347448
8single.LazyMan@5a347448
** 多线程不安全,用锁改进在getInstance里加锁加判断 **
public class LazyMan{
private static volatile LazyMan lazyMan;
private LazyMan(){
System.out.println(Thread.currentThread().getName()+" OK ");
}
public static LazyMan getInstance(){
if(lazyMan == null){
//创建前加锁
synchronized(LazyMan.class){
if(lazyMan == null){
lazyMan = new LazyMan();//不是一个原子操作
/*
* 1.分配内存空间
* 2.执行构造方法,初始化对象
* 3. 把引用指向这个空间
* 执行顺序可能式123,也可能式132,
* 在并发操作lazyMan不等于null但是并为初始化的时候可能有另个线程B进入方法
* B 会直接return lazyMan,所以除了锁,还要讲单例变量设置为volatile不允许指令重排
*/
}
}
}
return lazyMan;
}
}
18.1.3 静态内部类
外部类构造器私有,静态内部的类属性是外部类的对象,这个对象有且只有一个
public clas Holder{
private Holder(){
}
public static Holder getInstance(){
return InnetClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
18.2 反射破坏单例
setAccessible破坏构造器的私有性,创建两个实例不相等
public class LazyMan{
private static volatile LazyMan lazyMan;
private LazyMan(){
System.out.println(Thread.currentThread().getName()+" OK ");
}
public static LazyMan getInstance(){
if(lazyMan == null){
//创建前加锁
synchronized(LazyMan.class){
if(lazyMan == null){
lazyMan = new LazyMan();//不是一个原子操作
}
}
}
return lazyMan;
}
public static void main(String[] args){
LazyMan instance = LazyMan.getInstance();
Constructor<LazyMan> declaredConstructor = LazyMan.class.
getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance1 = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(instance1);
}
}
** 当一个实例new出来,另一个是反射得到,改进在构造器里加判断 **
private LazyMan(){
synchronized(LazyMan.class){
if(LazyMan != null){
throw new RuntimeException("不要试图使用反射破坏异常")
}
}
System.out.println(Thread.currentThread().getName()+" OK ");
}
** 当实例都是由反射得到,设置标志位进一步防止被破坏(别人通常不知道这个标志位名字)**
private static boolean secure = flase;//标志位隐藏字段
private LazyMan(){
synchronized(LazyMan.class){
if(secure == flase){
secure = true;
}else{
throw new RuntimeException("不要试图使用反射破坏异常")
}
}
}
** 但如果其他人知道这个字段,可以破解放开权限,然后创建多个实例 **
public static void main(String[] args){
//LazyMan instance = LazyMan.getInstance();
Field flag = LazyMan.class.getDeclaredField("secure");
flag.setAccessible(true);
Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
LazyMan instance = declaredConstructor.newInstance();
secure.set(instance,flase); //重置标志位
LazyMan instance1 = declaredConstructor.newInstance();
System.out.println(instance);
System.out.println(instance1);
}
18.4 枚举单例
深入getInstance源码,发现enum枚举自带单例模式
经过源码反编译(jad.exe)分析发现枚举没有无参构造,只有有参构造
public enum EnumSingle{
INSTANCE; //实例
public EnumSingle getInstance(){
return INSTANCE
}
}
class Test{
public static void main(String[] args){
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor =
EnumSingle.class.getDeclatedConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.prinln(instance1);
System.out.println(instance2);
}
}
19 深入理解CAS
19.1 什么是CAS(compareAndSet)
(修内功,操作系统,计算机网络原理)
getAndIncrement里面是自旋锁,一直循环到满足条件才退出。
unsafe里都是native方法,操作内存效率很高
JAVA的CAS调用C++ CAS实现操作内存
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环
缺点:
1.循环会耗时(底层自旋锁)
2.一次性只能保证一个共享变量的原子性(因为是底层CPU操作)
3. ABA问题
19.2 ABA问题(狸猫换太子)
SQL中怎么解决这个问题用乐观锁??待查
20. 原子引用
可以解决ABA问题,带版本号的原子操作 AtomicReference, AtomicStampedReference(带印记、戳的原子引用)
AtomicStampedReference(V initialRef, int initialStamp) 初始值,版本
** Integer使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为ValueOf使用缓存,而new一定会创建新的对象分配新的内存空间**
值设置为2020踩了Integer大坑,实际开发中泛型传入的不会是包装类,而是类似user这种业务对象,不会遇到这种坑
atomicReference.compareAndSet(2020,2023,
atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1)
2020自动装箱,重新在堆里new的对象,调用compareAndSet(使用==比较)一定不相等,所以一直返回flase
例 原理和乐观锁相同
public class CASDemo {
// CAS compareAndSet:比较并交换
public static void main(String[] args){
//2020踩了Integer大坑,实际开发中泛型传入的不会是包装类,而是类似user这种业务对象
//对象的引用是为一的,不会遇到这种坑
//AtomicStampedReference<Integer> atomicReference = new AtomicStampedReference<>(2020,1);
AtomicStampedReference<Integer> atomicReference = new AtomicStampedReference<>(1,1);
new Thread(()->{
int stamp = atomicReference.getStamp();//获得版本号
System.out.println("a1=>"+stamp);
try{
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
e.printStackTrace();
}
//compareAndSet(V expectedReference, V newReference,
//int expectedStamp, int newStamp)
System.out.println(atomicReference.compareAndSet(1,2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp()+1));
System.out.println("a2 =>"+atomicStampedReference.getStamp());
System.out.println( atomicReference.compareAndSet(2,1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp()+1));
System.out.println("a3 =>"+atomicStampedReference.getStamp());
},"a").start();
new Thread(()->{
int stamp = atomicReference.getStamp();//获得版本号
System.out.println("b1=>"+stamp);
try{
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e){
e.printStackTrace();
}
//修改失败,版本号不一样flase
System.out.println(atomicReference.compareAndSet(1,3,
stamp),stamp+1);
System.out.println("b2 =>"+atomicStampedReference.getStamp()));
},"b").start();
}
}
21 各种锁的理解
21.1 公平锁、非公平锁,必须先来后到
公平锁:不能插队的锁
非公平锁:可以插队的锁,3s,3h可能3s的线程插队在前面
默认的是非公平锁,无论是Synchronized还是Lock都要保证效率问题
如ReentrantLock实现是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
可以调用重载的方法,实现使用公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
21.2 可重入锁
可重入锁(递归锁),线程可以进入任何一个它已经拥有的锁所同步着的代码块。
即一个线程可以多次进入该锁的代码块。
例,线程A的发短信和打电话锁都使用同一个锁,发短信可以调用打电话,都执行完线程B才能获得锁开始执行。
public class Demo01{
public static void main(String[] args){
Phone phone = new Phone();
new Thread(()->{
phone.sms()
},"A").start();
new Thread(()->{
phone.sms()
},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName());
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName());
}
}
例2 如果换成用lock锁,虽然执行结果一样,但实际上每次执行lock.lock()可以想象成一把锁,
实际上sms()方法和call方法对应两把锁(待深入研究,字幕有说锁的时候计数会加1,退出时必须保证计数清零)
public class Demo02{
public static void main(String[] args){
Phone phone = new Phone();
new Thread(()->{
phone.sms()
},"A").start();
new Thread(()->{
phone.sms()
},"B").start();
}
}
class Phone{
Lock lock = new ReentrantLock();
public synchronized void sms(){
lock.lock();
//lock.lock();锁加锁解锁必须配对,如果锁两次,只解开一次
//相当于两把锁,直接开了一把,仍然锁着
try{
System.out.println(Thread.currentThread().getName());
call();
}catch (Exception e){
e.printStackTrace();
} finally{
lock.unlock();
}
}
public synchronized void call(){
lock.lock();
try{
System.out.println(Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
} finally{
lock.unlock();
}
}
}
}
21.3 自旋锁
AutomicInteger.java里的getAndIncrement
/*
* 利用原子类compareAndSet写的自旋锁
*/
public class SpinLockDemo{
//AtomicReference() 使用null初始值创建新的AtomicReference
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加锁自旋
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> myLock");
while(!atomicReference.compareAndSet(null,thread)){
}
}
//解锁
public void myunLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> myLock");
atomicReference.compareAndSet(thread,null))
}
}
//测试类 T2自旋等待T1释放锁
public class TestSpinLock{
public static void main(String[] args){
SpinlockDemo lock = new SpinlockDemo();
lock.myLock();
lock.myUnLock();
new Thread().start(()->{
lock.myLock();
try{
TimeUnit.SECONDS.sleep(3);
} catch(Exception e){
e.printStackTrace();
} finally{
lock.myUnLock();
}
},"T1");
new Thread().start(()->{
lock.myLock();
try{
TimeUnit.SECONDS.sleep(1);
} catch(Exception e){
e.printStackTrace();
} finally{
lock.myUnLock();
}
},"T2");
}
}
21.4 死锁
死锁就是互相抢对方的东西,都不让出自己的东西
死锁测试,怎么排除死锁
例:死锁
public class DeadLockDemo {
public static void main(String[] args){
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA,lockB),"T1").start();
new Thread(new MyThread(lockB,lockA),"T2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, string lockB){
this.lockA = lockA;
this.lockB = lockB;
public void run(){
synchronized(lockA){
System.out.println(Thread.currentThread().getName()+"lock:"+
lockA+" =>get "+ lockB);
try{
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e){
e.printStackTrace();
}
Synchronizd(lockB){
System.out.println(Thread.currentThread().getName()+"lock:"
+lockB+" =>get "+ lockA);
}
}
}
}
解决问题
1.使用jdk bin目录下的“jps -l"工具,定位进程号(在IDEA的terminal里执行)
2. jstack + 进程号查看堆栈信息,找到死锁信息
面试或者工作中排查问题:
- 日志
- 堆栈信息