Java 多线程的一次整理

一天没有出过家门,实属无聊,没事瞎写写

1. 基本概念

1.1 多进程和多线程的概念

程序是由指令和数据组成,指令要运行,数据要加载,指令被 CPU 加载运行,数据被加载到内存,指令运行时可由 CPU 调度硬盘、网络等设备。一个线程就是一个指令,CPU 调度的最小单位,一个进程就是一系列的指令流,由 CPU 一条一条执行

  • 进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。一系列指令

  • 线程是进程中的实际运行单位,是独立运行与进程之中的子任务。一条单独的指令

1.2 并行与并发

并发和并行都是同时处理多路请求,目的是最大化 CPU 的利用率。并行是指两个或者多个事件在同一时刻发生,并发是指多个事件在同一事件间隔内发生

  • 并发是指单核 CPU 运行多线程时,时间片进行很快的切换,线程轮流执行 CPU

  • 并行是指多核 CPU 运行多线程时,真正的在同一时刻运行

1.3 计算机存储体系

在很早之前,CPU 的频率与内存的频率在一个层面上,上世纪 90 年代,CPU 的频率大大提升,但内存的频率没有得到提升,导致 CPU 的运行速度比内存读写速度快很多,使 CPU 花费很长的时间等待数据的到来或把数据写入到内存中。为了解决 CPU 运算速度与内存读写速度不匹配的矛盾,就出现了 CPU 缓存,CPU 缓存分为三个级别,分别是 L1、L2、L3,级别越小越接近 CPU,速度也越来越快,容量也越来越小

Java 多线程的一次整理

 

 

 

多核 CPU 的情况下有多个一级缓存,如何保证缓存内部数据一致性,不让系统数据混乱,解决方案就是缓存一致性协议(Modified Exclusive Shared Or Invalid,MESI)或者锁住总线,其中锁住总线,效率非常低下CPU 串行,所以实际使用 MESI。MESI 通过四种状态来进行标记

状态 描述 监听任务 状态转换
M 修改(Modified) 该Cache line有效,数据被修改了,和内存中的数据不一致,数据只存在于本Cache中。 缓存行必须时刻监听所有试图读该缓存行相对就主存的操作,这种操作必须在缓存将该缓存行写回主存并将状态变成S(共享)状态之前被延迟执行。 当被写回主存之后,该缓存行的状态会变成独享(exclusive)状态。
E 独享、互斥(Exclusive) 该Cache line有效,数据和内存中的数据一致,数据只存在于本Cache中。 缓存行也必须监听其它缓存读主存中该缓存行的操作,一旦有这种操作,该缓存行需要变成S(共享)状态。 当CPU修改该缓存行中内容时,该状态可以变成Modified状态
S 共享(Shared) 该Cache line有效,数据和内存中的数据一致,数据存在于很多Cache中。 缓存行也必须监听其它缓存使该缓存行无效或者独享该缓存行的请求,并将该缓存行变成无效(Invalid)。 当有一个CPU修改该缓存行时,其它CPU中该缓存行可以被作废(变成无效状态 Invalid)。
I 无效(Invalid) 该Cache line无效。

对于 M 和 E 状态而言总是精确的,他们在和该缓存行的真正状态是一致的,而 S 状态可能是非一致的

1.4 线程的状态

sleep、yield 和 join 区别:

  • sleep 执行后线程进入阻塞状态,当前线程休眠一段时间

  • yield 执行后线程进入就绪状态,使当前线程和所有等待的线程一起进行竞争 CPU 资源

  • join执行线程进入阻塞状态,t.join 表示阻塞调用此方法的线程,直到线程 t 完成,方可继续执行。底层实际调用 wait 方法

Java 多线程的一次整理

 

 

 

  1. 新建状态(New):线程对象被创建后,就进入了新建状态。例如:Thread thread = new Thread()

  2. 就绪状态(Runnable):也被称为"可执行状态"。线程对象呗创建后,其它线程调用了该对象的 start() 方法,从而就启动该线程。例如T.stat(),处于就绪状态的线程,随时可能被CPU调度执行

  3. 运行状态(Running):线程获取 CPU 权限进行执行。需要注意的是,线程只能从就绪状态进入到运行状态

  4. 阻塞状态(Blocked):阻塞状态是线程放弃CPU使用权,暂时停止运行,直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:

    1. 等待阻塞:通过调用线程的wait() 方法,让线程等待某工作的完成

    2. 同步阻塞:线程在获取 synchronized 同步锁失败,它会进入同步阻塞状态

    3. 其它阻塞:通过调用线程的 sleep() 或 join() 或发出 I/O 请求时,线程会进入到阻塞状态。当 sleep() 状态超时,join() 等待线程终止或者超时、或者 I/O 处理完毕时,线程重新转入到就绪状态

  5. 死亡状态(Dead):线程执行完了或者因异常退出了 run() 方法,该线程结束生命周期

2.多线程的实现方式

2.1继承 Thread 类创建线程

Thead 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例

 1 /**
 2  * @description: 多线程实现方法1:集成Thread类
 3  * @author: DZ
 4  **/
 5 @Slf4j
 6 public class MyThread1 extends Thread {
 7     @Override
 8     public void run() {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11     }
12 ​
13     public static void main(String[] args) {
14         MyThread1 t1 = new MyThread1();
15         MyThread1 t2 = new MyThread1();
16         t1.start();
17         t2.start();
18     }
19 }

 

2.2实现 Runnable 接口创建线程

如果自己的类已经 extends 另一个类,就无法直接 extends Thread,此时可以通过实现 Runnable 接口

避免单继承的局限性、适合多个相同的线程去处理同一个资源

 1 /**
 2  * @description: 多线程实现方法2:实现Runnable接口
 3  **/
 4 @Slf4j
 5 public class MyThread2 implements Runnable {
 6 ​
 7     @Override
 8     public void run() {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11     }
12 ​
13     public static void main(String[] args) {
14         MyThread2 m = new MyThread2();
15         //1.调用run方法
16         Thread t1 = new Thread(m);
17         Thread t2 = new Thread(m);
18         t1.start();
19         t2.start();
20     }
21 }

 

2.3实现 Callable 接口,通过 Future Task 包装器来创建 Thread 线程

可以获取线程的返回值

 1 /**
 2  * @description: 多线程实现方法2:实现Callable接口
 3  * @author: DZ
 4  **/
 5 @Slf4j
 6 public class MyThread3 implements Callable {
 7     @Override
 8     public String call() throws Exception {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11         return "MyThread3";
12     }
13 ​
14     public static void main(String[] args) throws ExecutionException, InterruptedException {
15         MyThread3 m = new MyThread3();
16         //存储返回值,其中泛型为返回值的类型
17         FutureTask<String> futureTask = new FutureTask<>(m);
18         new Thread(futureTask).start();
19         System.out.println(futureTask.get());
20     }
21 ​
22 }

 

2.4通过线程池

2.4.1 线程池的主要参数

1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
2   this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
3      Executors.defaultThreadFactory(), defaultHandler);
4 }
  • corePoolSize

当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时,(除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。)

  • maximumPoolSize

线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于*队列,可忽略该参数

  • keepAliveTime

当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数

  • workQueue

用于传输和保存等待执行任务的阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列

  • LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列

  • PriorityBlockingQueue 一个支持优先级排序的*阻塞队列

  • DelayQueue 一个使用优先级队列实现的*阻塞队列

  • SynchronousQueue 一个不存储元素的阻塞队列

  • LinkedTransferQueue 一个由链表结构组成的*阻塞队列

  • LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列

作用:阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。

  • threadFactory

用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)

  • handler

当线程池和队列都满了,再加入线程会执行此策略

AbortPolicy: 直接抛出异常,阻止线程正常运行

1 public static class AbortPolicy implements RejectedExecutionHandler {
2     public AbortPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
5     }
6   }

 

CallerRunsPolicy: 直接在方法的调用线程中执行,除非线程池已关闭

1 public static class CallerRunsPolicy implements RejectedExecutionHandler {
2     public CallerRunsPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       if (!e.isShutdown()) {
5         r.run();
6       }
7     }
8   }

 

DiscardPolica: 丢弃当前的线程任务而不做任何处理。如果系统允许在资源不足的情况下弃部分任务,则这将是保障系统安全、稳定的一种很好的方案

1 public static class DiscardPolicy implements RejectedExecutionHandler {
2     public DiscardPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4     }
5   }

 

DiscardOlderPolicy: 移除线程队列中最早(老)的一个线程任务,并尝试提交当前任务

1 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2     public DiscardOldestPolicy() { }
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       if (!e.isShutdown()) {
5         e.getQueue().poll();// 最早(老)的任务出队列
6         e.execute(r);
7       }
8     }
9   }

 

2.4.2如何设置线程池

  • CPU密集型

尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。

  • IO密集型任务

可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间

  • 混合型任务

线程数 = CPU核心数 * (1+平均等待时间 / 平均工作时间)

2.4.3 代码示例

 1 import lombok.extern.slf4j.Slf4j;
 2 import org.junit.Test;
 3 ​
 4 import java.util.concurrent.*;
 5 ​
 6 /**
 7  * @description: 通过线程池实现多线程
 8  * @author: DZ
 9  **/
10 @Slf4j
11 public class MyThread4 {
12     //通常使用方式,定义前5个参数即可,其余默认
13     private ThreadPoolExecutor threadPoolExecutor0 = new ThreadPoolExecutor(5, 10, 60,
14             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
15 ​
16     //所有参数均自定义(增加工厂ThreadFactory和拒绝方式Handle)
17     private ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(5, 10, 60,
18             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
19         @Override
20         public Thread newThread(Runnable r) {
21             Thread thread = new Thread(r);
22             log.info("我是线程{}", thread.getName());
23             return thread;
24         }
25     }, new RejectedExecutionHandler() {
26         @Override
27         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
28             log.info("线程被去掉{}", new Thread(r).getName());
29         }
30     });
31 ​
32     //所有参数均自定义,拒绝方式使用默认new ThreadPoolExecutor.AbortPolicy(),new ThreadPoolExecutor.DiscardOldestPolicy(),new ThreadPoolExecutor.CallerRunsPolicy(),new ThreadPoolExecutor.DiscardPolicy()
33     private ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(5, 10, 60,
34             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
35         @Override
36         public Thread newThread(Runnable r) {
37             Thread thread = new Thread(r);
38             log.info("我是线程{}", thread.getName());
39             return thread;
40         }
41     }, new ThreadPoolExecutor.DiscardPolicy());
42 ​
43     @Test
44     public void testRunnable() {
45         threadPoolExecutor0.execute(new Runnable() {
46             @Override
47             public void run() {
48                 log.info("MyThread1");
49                 log.info("MyThread2");
50             }
51         });
52     }
53 ​
54     @Test
55     public void testCallable() throws ExecutionException, InterruptedException {
56         Future<String> submit = threadPoolExecutor1.submit(new Callable<String>() {
57             @Override
58             public String call() throws Exception {
59                 log.info("MyThread1");
60                 log.info("MyThread2");
61                 return "MyThread4";
62             }
63         });
64         System.out.println(submit.get());
65     }
66 }

 

3 常见的线程池

3.1 FixedThreadPool

适用于任务数量已知,且相对耗时的任务

1 public static ExecutorService newFixedThreadPool(int nThreads) { 
2     return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 
3 }

3.2 SingleThreadExecutor

这种线程池非常适合所有任务都需要按被提交的顺序来执行的场景,是个单线程的串行。

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
4     }
5 ​
6     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
7         return new FinalizableDelegatedExecutorService
8             (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
9     }

3.3 CachedThreadPool

核心线程池为0,存活时间为60s,适合小而快的任务

1   public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
3     }
4 ​
5     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
6         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
7     }

3.4 ScheduledThreadPool

支持定时或者周期执行的任务

 1  public ScheduledThreadPoolExecutor(int corePoolSize) {
 2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
 3     }
 4     public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
 5         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
 6     }
 7     public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
 8         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);
 9     }
10     public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
11         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
12     }

eg

 1 public static void main(String[] args) {
 2         ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 3         // 1. 延迟一定时间执行一次
 4         service.schedule(() ->{
 5             System.out.println("schedule ==> 延迟一定时间执行一次");
 6         },2, TimeUnit.SECONDS);
 7         // 2. 按照固定频率周期执行
 8         service.scheduleAtFixedRate(() ->{
 9             System.out.println("scheduleAtFixedRate ==> 按照固定频率周期执行");
10         },2,3,TimeUnit.SECONDS);
11         //3. 按照固定频率周期执行
12         service.scheduleWithFixedDelay(() -> {
13             System.out.println("scheduleWithFixedDelay ==> 按照固定频率周期执行");
14         },2,5,TimeUnit.SECONDS);
15     }
  • 首先我们看第一个方法 schedule , 它有三个参数,第一个参数是线程任务,第二个delay 表示任务执行延迟时长,第三个unit 表示延迟时间的单位,如上面代码所示就是延迟两秒后执行任务

1 public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
  • 第二个方法是 scheduleAtFixedRate 如下, 它有四个参数,command 参数表示执行的线程任务 ,initialDelay 参数表示第一次执行的延迟时间,period 参数表示第一次执行之后按照多久一次的频率来执行,最后一个参数是时间单位。如上面案例代码所示,表示两秒后执行第一次,之后按每隔三秒执行一次

1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
  • 第三个方法是 scheduleWithFixedDelay 如下,它与上面方法是非常类似的,也是周期性定时执行, 参数含义和上面方法一致。这个方法和 scheduleAtFixedRate 的区别主要在于时间的起点计时不同。scheduleAtFixedRate 是以任务开始的时间为时间起点来计时,时间到就执行第二次任务,与任务执行所花费的时间无关;而 scheduleWithFixedDelay 是以任务执行结束的时间点作为计时的开始。如下所示

1 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

3.5 SingleThreadEcheduledExecutor

它实际和 ScheduledThreadPool。线程池非常相似,它只是 ScheduledThreadPool的一个特例,内部只有一个线程,它只是将 ScheduledThreadPool 的核心线程数设置为了 1。如源码所示:

1  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
2         return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
3     }

3.6 ForkJoinPool

这是一个在 JDK7引入的新新线程池,它的主要特点是可以充分利用多核CPU,可以把一个任务拆分为多个子任务,这些子任务放在不同的处理器上并行执行,当这些子任务执行结束后再把这些结果合并起来,这是一种分治思想。

3.7 newWorkStealingPool

WorkStealingPool背后是使用ForkJoinPool实现的(JDK8)

上一篇:浅谈ThreadPoolExecutor线程池底层源码


下一篇:Scrapy学习系列(一):网页元素查询CSS Selector和XPath Selector