并发编程 | 线程池从入门到成神!

一、为什么要用多线程

使用多线程,可以把一些大任务分解成多个小任务来执行,多个小任务之间互不影响,同时进行,这样,充分利用了cpu资源。

二、java中简单的实现多线程方式 继承Thread类,实现run方法
class MyTread extends Thread{
  public void run() {
    System.out.println(Thread.currentThread().getName());
  } 
  
}
实现Runable接口,实现run方法
class MyRunnable implements Runnable{
  public void run() {
    System.out.println(Thread.currentThread().getName());
  }
}
测试
class ThreadTest {
 
  public static void main(String[] args) {
 
    MyTread thread = new Mythread();
    thread.start(); //开启一个线程
 
    MyRunnable myRunnable = new MyRunnable();
    Thread runnable = new Thread(myRunnable);
    runnable.start();   //开启一个线程
 
  }
}
推荐观看:传送门 三、java线程的状态

并发编程 | 线程池从入门到成神!

 

  • 「创建」:当new了一个线程,并没有调用start之前,线程处于创建状态;
  • 「就绪」:当调用了start之后,线程处于就绪状态,这时,线程调度程序还没有设置执行当前线程;
  • 「运行」:线程调度程序执行到线程时,当前线程从就绪状态转成运行状态,开始执行run方法里边的代码;
  • 「阻塞」:线程再运行的时候,被暂停执行(通常等待某项资源就绪后在执行,sleep、wait可以导致线程阻塞),这是该线程处于阻塞状态;
  • 「死亡」:当一个线程执行完run方法里边的代码或调用了stop方法后,该线程结束运行
线程b站合集推荐观看:传送门 四、为什么要引入线程池

当我们需要的并发执行线程数量很多时,且每个线程执行很短的时间就结束了,这样,我们频繁的创建、销毁线程就大大「降低了工作效率」(创建和销毁线程需要时间、资源)。

java中的线程池可以达到这样的效果:一个线程执行完任务之后,继续去执行下一个任务,不被销毁,这样线程「利用率提高」了。

五、java中线程池(ThreadPoolExecutor)

并发编程 | 线程池从入门到成神!

 

说起java中的线程池,就想到java.util.concurrent.ThreadPoolExecutor。ThreadPoolExecutor类是java线程池中的核心类。

ThreadPoolExecutor提供了四个构造函数:

并发编程 | 线程池从入门到成神!

 

//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

序号名称类型含义1corePoolSizeint核心线程池大小2maximumPoolSizeint最大线程池大小3keepAliveTimelong线程最大空闲时间4unitTimeUnit时间单位5workQueueBlockingQueue线程等待队列6threadFactoryThreadFactory线程创建工厂7handlerRejectedExecutionHandler拒绝策略

下面来解释下各个参数:

1、int corePoolSize:该线程池中核心线程数最大值

「核心线程」:线程池创建之后不会立即去创建线程,而是等待线程的到来。当前执行的线程数大于corePoolSize,线程会加入到缓冲队列中.

2、int maximumPoolSize:该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数

3、long keepAliveTime:该线程池中非核心线程闲置超时时长

一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,如果设置allowCoreThreadTimeOut = true,则会作用于核心线程。

简单理解就是:空闲的线程多久时间后被销毁。默认情况下,该值在线程数大于corePoolSize时,对超出corePoolSize值的这些线程起作用。

4、TimeUnit unit:keepAliveTime的单位

TimeUnit是一个枚举类型,其包括:

NANOSECONDS :1微毫秒 = 1微秒 / 1000
MICROSECONDS :1微秒 = 1毫秒 / 1000
MILLISECONDS :1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
5、BlockingQueue workQueue:该线程池中的任务队列

维护着等待执行的Runnable对象,当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

阻塞队列,用来存储等待执行的任务,决定了线程池的排队策略,有以下取值:

  • 「SynchronousQueue」:同步队列,这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大,newCachedThreadPool采用的便是这种策略。
  • 「LinkedBlockingQueue」:***队列,这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize,newFixedThreadPool采用的便是这种策略。
  • 「ArrayBlockingQueue」:有界队列,可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
  • 「DelayQueue」:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
6、ThreadFactory threadFactory

线程工厂,是用来创建线程的,默认new Executors.DefaultThreadFactory(),这是一个接口,你new他的时候需要实现他的Thread newThread(Runnable r)方法。

7、RejectedExecutionHandler handler

线程拒绝策略。当创建的线程超出maximumPoolSize,且缓冲队列已满时,新任务会拒绝,有以下取值:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。以下是具体的实现方式:

//默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常
class AbortPolicy implements RejectedExecutionHandler{
 
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    executor.toString());
  }
}
//如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
class DiscardPolicy implements RejectedExecutionHandler{
 
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 
  }
}
//丢弃最老的,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
class DiscardOldestPolicy implements RejectedExecutionHandler{
 
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) {
      //移除队头元素
      executor.getQueue().poll();
    //再尝试入队
      executor.execute(r);
    }
  }
}
//主线程会自己去执行该任务,不会等待线程池中的线程去执行
class CallerRunsPolicy implements RejectedExecutionHandler{
 
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    if (!executor.isShutdown()) {
      //直接执行run方法
      r.run();
    }
  }
}

ThreadPoolExecutor 继承于AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,并实现了ExecutorService里边的方法

public abstract class AbstractExecutorService implements ExecutorService {
 
}

下面看下ExecutorService接口的具体实现:

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  throws InterruptedException, ExecutionException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  long timeout, TimeUnit unit)
  throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService继承Executor接口,下面是Executor接口的具体实现

public interface Executor {
  void execute(Runnable command);
}

Executor接口是顶层接口,只声明了一个execute方法,该方法是用来执行传递进来的任务的。回过头来,咱们重新看ThreadPoolExecutor类,该类里边有以下两个重要的方法:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
  }else if (!addWorker(command, false))
    reject(command);
}

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

execute()方法是Executor中声明的方法,在ThreadPoolExecutor有了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行

submit()方法是ExecutorService中声明的方法,在AbstractExecutorService中进行了实现,Executor中并没有对其进行重写。从实现中可以看出,「submit方法最终也调用了execute 方法」,但submit方法可以返回执行结果,利用Future来获取任务执行结果。

六、常见四种线程池

如果你不想自己写一个线程池,Java通过Executors提供了四种线程池,这四种线程池都是直接或间接配置ThreadPoolExecutor的参数实现的。

1.可缓存线程池CachedThreadPool()

源码:

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

根据源码可以看出:

  • 这种线程池内部没有核心线程,线程的数量是有没限制的。
  • 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
  • 没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。

创建方法:

ExecutorService mCachedThreadPool = Executors.newCachedThreadPool();

用法:

//开始下载
private void startDownload(final ProgressBar progressBar, final int i) {
  mCachedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
        int p = 0;
                progressBar.setMax(10);//每个下载任务10秒
                while (p < 10) {
                    p++;
                    progressBar.setProgress(p);
                    Bundle bundle = new Bundle();
                    Message message = new Message();
                    bundle.putInt("p", p);
                    //把当前线程的名字用handler让textview显示出来
                    bundle.putString("ThreadName", Thread.currentThread().getName());
                    message.what = i;
                    message.setData(bundle);
                    mHandler.sendMessage(message);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
2.FixedThreadPool 定长线程池

源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

根据源码可以看出:

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

创建方法:

//nThreads => 最大线程数即maximumPoolSize
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads);

//threadFactory => 创建线程的方法,用得少
ExecutorService mFixedThreadPool= Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);

用法:

private void startDownload(final ProgressBar progressBar, final int i) {
        mFixedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
               //....逻辑代码自己控制
            }
        });
    }
3.SingleThreadPool

源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

根据源码可以看出:

  • 有且仅有一个工作线程执行任务
  • 所有任务按照指定顺序执行,即遵循队列的入队出队规则

创建方法:

ExecutorService mSingleThreadPool = Executors.newSingleThreadPool();

用法同上。

4.ScheduledThreadPool

源码:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

根据源码可以看出:DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。

  • 不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。
  • 这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。

创建:

//nThreads => 最大线程数即maximumPoolSize
ExecutorService mScheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

一般的执行任务方法和上面的都大同小异,我们主要看看延时执行任务和周期执行任务的方法。

//表示在3秒之后开始执行我们的任务。
mScheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
            //....
            }
        }, 3, TimeUnit.SECONDS);
//延迟3秒后执行任务,从开始执行任务这个时候开始计时,每7秒执行一次不管执行任务需要多长的时间。 
mScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);
/**延迟3秒后执行任务,从任务完成时这个时候开始计时,7秒后再执行,
*再等完成后计时7秒再执行也就是说这里的循环执行任务的时间点是
*从上一个任务完成的时候。
*/
mScheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
             //....
            }
        },3, 7, TimeUnit.SECONDS);
}

无论是创建何种类型线程池(FixedThreadPool、CachedThreadPool...),均会调用ThreadPoolExecutor构造函数

七、线程池执行流程

当提交一个新任务,线程池的处理流程如下:

  • 判断线程池中核心线程数是否已达阈值corePoolSize,若否,则创建一个新核心线程执行任务
  • 若核心线程数已达阈值corePoolSize,判断阻塞队列workQueue是否已满,若未满,则将新任务添加进阻塞队列
  • 若满,再判断,线程池中线程数是否达到阈值maximumPoolSize,若否,则新建一个非核心线程执行任务。若达到阈值,则执行线程池拒绝策略。

流程图:

并发编程 | 线程池从入门到成神!

 

结构图:

并发编程 | 线程池从入门到成神!

 

结尾 为什么阿里不允许用Executors创建线程池? 1、从资源角度分析
  • FixedThreadPool SingleThreadPool允许请求队列长度为Integer.MAX_VALUE可能会堆积大量请求从而导致OOM
  • CachedThreadPool ScheduledThreadPool允许创建线程数量为Integer.MAX_VALUE可能会创建大量线程从而导致OOM
2、排查问题角度

如果使用Executors创建线程池,大家应该最常使用如下语句

public void testThread() throws Exception {
        ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            fixedExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("公众号Java架构历程");
                }
            });
        }
    }

上述语句在功能层面是没有问题的,但是在生产环境中有可能遇到CPU飙高,线程数持续增加,内存溢出等问题,我们时常需要通过线程快照进行观察。我们通过jstack命令观察上述代码线程快照

"pool-1-thread-2" #525prio=5os_prio=0tid=0x00006f6561039100nid=0xdaa
    waiting oncondition [0x00006f64e646d000]java.lang.Thread.State:

    WAITING(parking)at sun.misc.Unsafe.park(Native Method)
    parking to
    wait for <0x00000006e6f3e230>(
    a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:165)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1126)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:616)
    at java.lang.Thread.run(Thread.java:645)

我们发现从线程快照看不出任何业务信息,只有类似pool-1-thread-2这种编号信息,不利于排查问题,我们需要给线程命名。

线程池常见面试题: 面试问题1:Java的线程池说一下,各个参数的作用,如何进行的?

答:略,看完本篇文章肯定知道了。

面试问题2:按线程池内部机制,当提交新任务时,有哪些异常要考虑。

答:

  1. 在任务代码try/catch捕获异常,
  2. 通过Future对象的get方法接收抛出的异常,再处理
  3. 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                    (t1, e) -> {
                        System.out.println(t1.getName() + "线程抛出的异常"+e);
                    });
            return t;
           });
        threadPool.execute(()->{
            Object object = null;
            System.out.print("result## " + object.toString());
        });
  1. 重写ThreadPoolExecutor的afterExecute方法,处理传递的异常引用
这是jdk文档的一个demo:

class ExtendedExecutor extends ThreadPoolExecutor {
    // 这可是jdk文档里面给的例子。。
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
            System.out.println(t);
    }
}}
面试问题3:线程池都有哪几种工作队列?

答:略

面试问题4:使用***队列的线程池会导致内存飙升吗?

答:会的,newFixedThreadPool使用了***的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不断飙升, 最终导致OOM。

面试问题5:说说几种常见的线程池及使用场景? 1.newCachedThreadPool

「使用场景」:适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

「线程池特点」

  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60秒

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

工作机制:

并发编程 | 线程池从入门到成神!

 

  • 提交任务
  • 因为没有核心线程,所以任务直接加到SynchronousQueue队列。
  • 判断是否有空闲线程,如果有,就去取出任务执行。
  • 如果没有空闲线程,就新建一个线程执行。
  • 执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。

实例代码:

ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName()+"正在执行");
            });
        }
2.newSingleThreadExecutor

「使用场景」:用于并发执行大量短期的小任务。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

线程池特点:

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

工作机制:

并发编程 | 线程池从入门到成神!

 

  • 提交任务
  • 线程池是否有一条线程在,如果没有,新建线程执行任务
  • 如果有,将任务加到阻塞队列
  • 当前的唯一线程,从队列取任务,执行完一个,再继续取,一个人(一条线程)夜以继日地干活。

实例代码:

  ExecutorService executor = Executors.newSingleThreadExecutor();
                for (int i = 0; i < 5; i++) {
                    executor.execute(() -> {
                        System.out.println(Thread.currentThread().getName()+"正在执行");
                    });
        }
3.newScheduledThreadPool

「使用场景」:周期性执行任务的场景,需要限制线程数量的场景

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

线程池特点:

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

工作机制:

  • 添加一个任务
  • 线程池中的线程从 DelayQueue 中取任务
  • 线程从 DelayQueue 中获取 time 大于等于当前时间的task
  • 执行完后修改这个 task 的 time 为下次被执行的时间
  • 这个 task 放回DelayQueue队列中

实例代码:

 /**
    创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+* 给定的间隔时间
    */
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleWithFixedDelay(()->{
            System.out.println("current Time" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName()+"正在执行");
        }, 1, 3, TimeUnit.SECONDS);
    /**
    创建一个给定初始延迟的间隔性的任务,之后的每次任务执行时间为 初始延迟 + N * delay(间隔) 
    */
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
            scheduledExecutorService.scheduleAtFixedRate(()->{
            System.out.println("current Time" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName()+"正在执行");
        }, 1, 3, TimeUnit.SECONDS);;
4.newFixedThreadPool

使用场景:适用于串行执行任务的场景,一个任务一个任务地执行。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

线程池特点:

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为***队列LinkedBlockingQueue
  • 提交任务
  • 如果线程数少于核心线程,创建核心线程执行任务
  • 如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  • 如果线程执行完任务,去阻塞队列取任务,继续执行。

实例代码:

  ExecutorService executor = Executors.newFixedThreadPool(10);
                    for (int i = 0; i < Integer.MAX_VALUE; i++) {
                        executor.execute(()->{
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                //do nothing
                            }
            });

今天就分享到这里,感觉有帮助的关注支持一下哈关注我公众号里面有更多干货技术文章与架构学习资料

并发编程 | 线程池从入门到成神!

 

上一篇:Java线程池ThreadPoolExecutor使用和分析(一)


下一篇:Java并发编程-ThreadPool线程池