线程池
1. 线程状态
-
在Java中的线程存在6中状态
线程状态 具体含义 新建(NEW) 一个尚未启动的线程的状态。也称之为初始状态、开始状态。线程刚被创建,但是并未启动。还没调用start方法。MyThread t = new MyThread()只有线程象,没有线程特征。 就绪
(RUNNABLE)当我们调用线程对象的start方法,那么此时线程对象进入了RUNNABLE状态。那么此时才是真正的在JVM进程中创建了一个线程,线程一经启动并不是立即得到执行,线程的运行与否要听令与CPU的调度,那么我们把这个中间状态称之为可执行状态(RUNNABLE)也就是说它具备执行的资格,但是并没有真正的执行起来而是在等待CPU的度。 阻塞(BLOCKED) 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态。 等待(WAITING) 一个正在等待的线程的状态。也称之为等待状态。造成线程等待的原因有两种,分别是调用Object.wait()、join()方法。处于等待状态的线程,正在等待其他线程去执行一个特定的操作。例如:因为wait()而等待的线程正在等待另一个线程去调用notify()或notifyAll();一个因为join()而等待的线程正在等待另一个线程结束。 计时等待
(TIMED_WAITING)一个在限定时间内等待的线程的状态。也称之为限时等待状态。造成线程限时等待状态的原因有三种,分别是:Thread.sleep(long),Object.wait(long)、join(long)。 终止
(TERMINATED)一个完全运行完成的线程的状态。也称之为终止状态、结束状态 这些状态被定义在java.lang.Thread.State枚举类中。
-
状态的转换:
2. 线程池
2.1 基本原理
-
概述:
线程池就是存储了很多个线程的一个”池子“
存在的意义:
- 我们频繁的创建线程和销毁线程是对系统资源的一种不合理的应用,每次的创建成本都是较高的。
- 针对这种情况,就采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态。等待下一次任务的执行。
2.2 线程池—Executors默认线程池
-
概述
JDK对线程池也进行了相关的实现,在真实企业开发中我们也很少去自定义线程池,而是使用JDK中自带的线程池。
-
使用Executors中提供的静态方法来创建线程池
方法名 说明 static ExecutorService newCachedThreadPool() 创建一个默认的线程池(默认最多可以容纳int类型的最大值) static newFixedThreadPool(int nThreads) 创建一个指定最多线程数量的线程池 - Executors — 可以帮助我们创建线程池对象
- ExecutorService — 可以帮助我们控制线程池
- ExecutorService 的方法:
方法名 说明 Future<?> submit(Runnable task) 提交Runnable任务以执行并返回表示该任务的Future。 void shutdown() 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 -
小栗子
public class ExecutorsDemo01 { public static void main(String[] args) { //创建池子 ExecutorService executorService = Executors.newCachedThreadPool(); //将任务给线程池 executorService.submit(()->{ System.out.println(Thread.currentThread().getName()+"线程运行了"); }); executorService.submit(()->{ System.out.println(Thread.currentThread().getName()+"线程运行了"); }); //关闭池 executorService.shutdown(); //输出: // pool-1-thread-2线程运行了 // pool-1-thread-1线程运行了 } }
2.3 线程池—ThreadPoolExecutor
-
构造方法
方法名 说明 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 使用给定的初始参数创建新的ThreadPoolExecutor -
创建线程池对象
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
核心线程数量
,最大线程数量
,空闲线程最大存活时间
,任务队列
,创建线程工厂
,任务的拒绝策略
); -
参数详解
参数 说明 参数一:核心线程数量 不能小于0 参数二:最大线程数 不能小于等于0,
最大数量>=核心线程数量参数三:空闲线程最大存活时间 不能小于0 参数四:时间单位 使用TimeUnit枚举类 参数五:任务队列 不能为null,
可以是BlockingQueue接口下的实现类
让任务在队列中等着,等有线程空闲了,再偶从这个队列中获取任务并执行参数六:创建线程工厂 不能为null,
可以使用Executors类的static ThreadFactory defaultThreadFactory()
方法
按照默认的方式创建线程对象参数七:任务的拒绝策略 不能为null,
使用ThreadPoolExecutor类的AbortPolicy()方法,默认拒绝策略 -
任务拒绝策略
-
什么时候拒绝:
当线程池的线程都在执行,并且任务队列也都满了时候,就触发任务拒绝策略
当提交的任务 > 池子中的线程数量 + 任务队列的长度
-
如何拒绝
RejectedExecutionHandler是JDK提供的一个任务拒绝策略接口,它下面存在4个子类。
子类 说明 ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。
是默认的策略。ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常
这是不推荐的做法。ThreadPoolExecutor.DiscardOldestPolicy 抛弃队列中等待最久的任务 然后把当前任务加入队列中。 ThreadPoolExecutor.CallerRunsPolicy 调用任务的run()方法绕过线程池直接执行。
注:明确线程池最多可执行的任务数 = 队列容量 + 最大线程数
注意:
如果进来的任务数,小于 (核心线程数+队列长度),那么不会创建临时线程;
当进来的任务数,大于 (核心线程数 + 队列长度),就会开始创建临时线程;
-
-
小栗子
public class ThreadDemo { public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor( 5,//核心线程数 10,//总线程数 60,//空闲时间60之后关闭线程 TimeUnit.SECONDS,//空闲时间60 “秒” 之后关闭线程 new ArrayBlockingQueue<>(10),//阻塞队列,长度10 Executors.defaultThreadFactory(),//默认创建线程方式 new ThreadPoolExecutor.AbortPolicy()//默认拒绝模式 ); for (int i = 0; i < 20; i++) { MyRunable mr = new MyRunable(); pool.submit(mr); } pool.shutdown(); } }
3. 原子性
3.1 volatile
- 堆内存是唯一的,每一个线程都有自己的线程栈。
- 每一个线程在使用堆里面变量的时候,都会先拷贝一份到变量的副本中。
- 在线程中,每一次使用是从变量的副本中获取的。
**volatile关键字:**强制线程每次在使用的时候,都会看一下共享区域最新的值
3.2 synchronized解决
synchronized的执行步骤:
- 线程获得锁
- 清空变量副本
- 拷贝共享变量最新的值到变量副本中
- 执行代码
- 将修改后变量副本中的值赋值给共享数据
- 释放锁
3.3 原子性
-
概述
所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体。
-
AtomicInteger
概述
java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。
因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。
下面列举了更新基本类型的3个类
AtomicBoolean: 原子更新布尔类型
AtomicInteger: 原子更新整型
AtomicLong: 原子更新长整型
以上3个类提供的方法几乎一模一样,所以仅以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下:
构造方法
方法名 说明 public AtomicInteger() 初始化一个默认值为0的原子型Integer public AtomicInteger(int initialValue) 初始化一个指定值的原子型Integer 成员方法
方法名 说明 int get(): 获取值 int getAndIncrement() 以原子方式将当前值加1,注意,这里返回的是自增前的值。 int incrementAndGet() 以原子方式将当前值加1,注意,这里返回的是自增后的值。 int addAndGet(int data) 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。 int getAndSet(int value) 以原子方式设置为newValue的值,并返回旧值。 -
小栗子
public class AtomDemo { public static void main(String[] args) { AtomicInteger ai = new AtomicInteger(100); // int get(): 获取值 System.out.println(ai.get());//100 // int getAndIncrement(): // 以原子方式将当前值加1,注意,这里返回的是自增前的值。 System.out.println(ai.getAndIncrement());//100 System.out.println(ai.get());//101 // int incrementAndGet(): // 以原子方式将当前值加1,注意,这里返回的是自增后的值。 System.out.println(ai.incrementAndGet());//102 System.out.println(ai.get());//102 // int addAndGet(int data): // 以原子方式将参数与对象中的值相加,并返回结果。 System.out.println(ai.addAndGet(10));//112 System.out.println(ai.get());//112 // int getAndSet(int value): // 以原子方式设置为newValue的值,并返回旧值。 System.out.println(ai.getAndAdd(10));//112 System.out.println(ai.get());//122 } }
-
AtomicInteger-内存解析
原理:自旋锁 + CAS算法
-
CAS算法
有3个操作数(内存值V, 旧的预期值A,要修改的值B)
当旧的预期值A == 内存值 此时修改成功,将V改为B
当旧的预期值A!=内存值 此时修改失败,不做任何操作
并重新获取现在的最新值(这个重新获取的动作就是自旋)
-
-
源码解析
代码实现:
public class AtomDemo { public static void main(String[] args) { MyAtomThread atom = new MyAtomThread(); for (int i = 0; i < 100; i++) { new Thread(atom).start(); } } }
public class MyAtomThread implements Runnable { //private volatile int count = 0; //送冰淇淋的数量 //private Object lock = new Object(); AtomicInteger ac = new AtomicInteger(0); @Override public void run() { for (int i = 0; i < 100; i++) { //1,从共享数据中读取数据到本线程栈中. //2,修改本线程栈中变量副本的值 //3,会把本线程栈中变量副本的值赋值给共享数据. //synchronized (lock) { // count++; // ac++; int count = ac.incrementAndGet(); System.out.println("已经送了" + count + "个冰淇淋"); // } } } }
源码:
//先自增,然后获取自增后的结果 public final int incrementAndGet() { //+ 1 自增后的结果 //this 就表示当前的atomicInteger(值) //1 自增一次 return U.getAndAddInt(this, VALUE, 1) + 1; } public final int getAndAddInt(Object o, long offset, int delta) { //v 旧值 int v; //自旋的过程 do { //不断的获取旧值 v = getIntVolatile(o, offset); //如果这个方法的返回值为false,那么继续自旋 //如果这个方法的返回值为true,那么自旋结束 //o 表示的就是内存值 //v 旧值 //v + delta 修改后的值 } while (!weakCompareAndSetInt(o, offset, v, v + delta)); //作用:比较内存中的值,旧值是否相等,如果相等就把修改后的值写到内存中,返回true。表示修改成功。 // 如果不相等,无法把修改后的值写到内存中,返回false。表示修改失败。 //如果修改失败,那么继续自旋。 return v; }
3.4 悲观锁和乐观锁
synchronized和CAS的区别 :
**相同点:**在多线程情况下,都可以保证共享数据的安全性。
不同点:
synchronized总是从最坏的角度出发,认为每次获取数据的时候,别人都有可能修改。所以在每次操作共享数据之前,都会上锁。(悲观锁)
cas是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据。
如果别人修改过,那么我再次获取现在最新的值。
如果别人没有修改过,那么我现在直接修改共享数据的值.(乐观锁)
4. 并发工具类
4.1 并发工具类-Hashtable
-
HashMap是线程不安全的,多线程环境下会有数据安全问题
-
Hashtable是线程安全的,但是会将整张表锁起来,效率低下
-
采取悲观锁的形式,在有线程操作的时候,会把整张表锁起来
4.2 并发工具类-ConcurrentHashMap基本使用
-
ConcurrentHashMap也是线程安全的,效率较高
在JDK7和JDK8中,底层原理不一样
-
1.7
-
默认创建一个长度16,加载因子为0.75的大数组,这个数组无法扩容
-
还会创建一个长度为2的小数组,把地址值赋值给0索引处,其它索引位置的元素为null
-
第一次会根据键的哈希值算出大数组中应存入的位置
如果为null,这按照模板创建小数组,创建完毕会二次哈希,计算出在小数组中应存入的位置,直接存入
如果不为null,就会根据记录的地址值找到小数组,二次哈希,计算出在小数组中应存入的位置。如果需要扩容,则将小数组扩容两倍,如果不需要扩容,
-
-
1.8
- 如果使用空参构造创建ConcurrentHashMap对象,则什么事情都不做。 在第一次添加元素的时候创建哈希表
2. 计算当前元素应存入的索引。
3. 如果该索引位置为null,则利用cas算法,将本结点添加到数组中。
4. 如果该索引位置不为null,则利用volatile关键字获得当前位置最新的结点地址,挂在他下面,变成链表。
5. 当链表的长度大于等于8时,自动转换成红黑树6,以链表或者红黑树头结点为锁对象,配合悲观锁保证多线程操作集合时数据的安全性
4.3 并发工具类-CountDownLatch
-
CountDownLatch类的构造
方法 解释 public CountDownLatch(int count) 参数传递线程数,表示等待线程数量,(定义一个计数器) public void await() 让线程等待(计数器为0时,唤醒) public void countDown() 当前线程执行完毕(将计数器-1)
使用场景: 让某一条线程等待其他线程执行完毕之后再执行
-
小栗子
public class MotherThread implements Runnable{ private CountDownLatch countDownLatch; public MotherThread(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("妈妈在收拾碗筷"); } } public class ChileThread1 implements Runnable{ private CountDownLatch countDownLatch; public ChileThread1(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName()+"吃了"+i+"个饺子"); } countDownLatch.countDown(); } } public class ChileThread2 implements Runnable{ private CountDownLatch countDownLatch; public ChileThread2(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { for (int i = 1; i <= 20; i++) { System.out.println(Thread.currentThread().getName()+"吃了"+i+"个饺子"); } countDownLatch.countDown(); } } public class ChileThread3 implements Runnable{ private CountDownLatch countDownLatch; public ChileThread3(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName()+"吃了"+i+"个饺子"); } countDownLatch.countDown(); } } public class CountDownLatchDemo { public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(new MotherThread(countDownLatch)).start(); new Thread(new ChileThread1(countDownLatch),"小明").start(); new Thread(new ChileThread2(countDownLatch),"小红").start(); new Thread(new ChileThread3(countDownLatch),"小刚").start(); } }
-
总结:
- CountDownLatch(int count):参数写等待线程的数量。并定义了一个计数器。
- await():让线程等待,当计数器为0时,会唤醒等待的线程
- countDown(): 线程执行完毕时调用,会将计数器-1。
4.4 并发工具类-Semaphore
-
使用场景
可以控制访问特定资源的线程数量。
-
实现步骤
- 创建Semaphore对象,传递int参数,一次允许几条线程执行
- acquire()方法,该线程获得许可,可以执行
- release()方法,归还许可,让其他线程可以执行
-
小栗子
public class SemaphoreDemo { public static void main(String[] args) { MyRunnable mr = new MyRunnable(); for (int i = 0; i < 100; i++) { new Thread(mr).start(); } } } class MyRunnable implements Runnable{ private Semaphore semaphore = new Semaphore(2); @Override public void run() { try { semaphore.acquire(); System.out.println("获得..许可"); Thread.sleep(200); System.out.println("归还许可"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }