JUC并发:线程池详解,从原理到实际使用

线程池

池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我

线程池的好处:线程复用,可以控制最大并发数,管理线程

1、降低资源的消耗;

2、提高响应速度;

3、方便管理;

三大方法、七大参数、四种拒绝策略

三大方法:

1、ExecutorService threadPool = Executors.newSingleThreadExecutor();// 创建单个线程
2、ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
3、ExecutorService threadPool = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱

源码分析:

//三大方法
//请求队列LinkedBlockingQueue长度为默认为 Integer.MAX_VALUE
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                         new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
                         new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {//21亿
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>());
}
// 本质ThreadPoolExecutor()的方法调用
// 七大参数
public ThreadPoolExecutor(int corePoolSize,// 1. 核心线程池大小
                   int maximumPoolSize,// 2. 最大核心线程池大小
                   long keepAliveTime,// 3. 超时了没有人调用就会释放
                   TimeUnit unit,// 4. 超时单位
                   BlockingQueue<Runnable> workQueue,// 5. 阻塞队列,候客区
                   ThreadFactory threadFactory,// 6. 线程工厂:创建线程的,一般不用动
                   RejectedExecutionHandler handler// 7. 拒绝策略
                   ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
//四种拒绝策略
new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!

阿里巴巴开发规约:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:

允许的请求队列LinkedBlockingQueue长度为默认为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2) CachedThreadPool:

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

手动创建线程池

package com.zgq.pool;

import java.util.concurrent.*;

public class Demo02 {
    public static void main(String[] args) {
        // 自定义线程池!工作 ThreadPoolExecutor
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试去和最早的竞争,也不会抛出异常!
        try {
            // 最大承载:Deque + max
            // 超过最大承载,就会出发拒绝策略,引发异常 RejectedExecutionException
            for (int i = 1; i <= 9; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }

    }
}

池的最大的大小如何去设置!调优

1、CPU密集型:

尽量使用较小的线程池,一般为CPU核心数+1,因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换;

2、IO密集型:

可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务对CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。

3、混合型任务:

可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。只要分完之后两个任务的执行时间相差不大,那么就会比串行执行起来高效。因为如果划分之后两个任务执行时间存在有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。

// 最大线程到底该如何定义
// 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!一般为cpu核数+1
// 2、IO 密集型 > 判断你程序中十分耗IO的线程,
// 程序 15个大型任务 io十分占用资源!
// 获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());

实际项目中的使用

以谷粒商城的购物车系统为例,因为购物车需要用到CompletableFuture进行异步编排的优化,所以需要引入线程池来执行异步任务;

1、写配置文件的属性配置,因为线程池的参数尽量在配置文件中配置,这样后面易于修改与维护

package com.atguigu.gulimall.cart.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {

	private Integer coreSize;

	private Integer maxSize;

	private Integer keepAliveTime;
}

在配置文件中设置

gulimall:
  thread:
    core-size: 20
    max-size: 200
    keep-alive-time: 10

2、写线程池的配置类,记住一定要使用ThreadPoolExecutor来进行创建,原因上面详细说了

package com.atguigu.gulimall.cart.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 开启这个属性配置
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {

	@Bean
	public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties){

		return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(),
				threadPoolConfigProperties.getMaxSize(),
				threadPoolConfigProperties.getKeepAliveTime() ,
				TimeUnit.SECONDS,
				new LinkedBlockingDeque<>(10000),
				Executors.defaultThreadFactory(),
				new ThreadPoolExecutor.AbortPolicy());
	}
}

3、业务代码中使用,直接注入即可,然后就可以直接在CompletableFuture中使用了;至于添加到购物车的代码这里就不详细说了,以后应该会专门写谷粒商城的博客;

@Slf4j
@Service
public class CartServiceImpl implements CartService {

    @Autowired
    private ThreadPoolExecutor executor;
    @Override // CartServiceImpl
    public CartItem addToCart(Long skuId, Integer num) throws ExecutionException, InterruptedException {
        // 获取当前用户的map
        BoundHashOperations<String, Object, Object> cartOps = getCartOps();
        // 查看该用户购物车里是否有指定的skuId
        String res = (String) cartOps.get(skuId.toString());

        // 查看用户购物车里是否已经有了该sku项
        if (StringUtils.isEmpty(res)) {
            CartItem cartItem = new CartItem();
            // 异步编排
            CompletableFuture<Void> getSkuInfo = CompletableFuture.runAsync(() -> {
                // 1. 远程查询当前要添加的商品的信息
                R skuInfo = productFeignService.SkuInfo(skuId);
                SkuInfoVo sku = skuInfo.getData("skuInfo", new TypeReference<SkuInfoVo>() {
                });
                // 2. 填充购物项
                cartItem.setCount(num);
                cartItem.setCheck(true);
                cartItem.setImage(sku.getSkuDefaultImg());
                cartItem.setPrice(sku.getPrice());
                cartItem.setTitle(sku.getSkuTitle());
                cartItem.setSkuId(skuId);
            }, executor);

            // 3. 远程查询sku销售属性,销售属性是个list
            CompletableFuture<Void> getSkuSaleAttrValues = CompletableFuture.runAsync(() -> {
                List<String> values = productFeignService.getSkuSaleAttrValues(skuId);
                cartItem.setSkuAttr(values);
            }, executor);
            // 等待执行完成
            CompletableFuture.allOf(getSkuInfo, getSkuSaleAttrValues).get();

            // sku放到用户购物车redis中
            cartOps.put(skuId.toString(), JSON.toJSONString(cartItem));
            return cartItem;
        } else {//购物车里已经有该sku了,数量+1即可
            CartItem cartItem = JSON.parseObject(res, CartItem.class);
            // 不太可能并发,无需加锁
            cartItem.setCount(cartItem.getCount() + num);
            cartOps.put(skuId.toString(), JSON.toJSONString(cartItem));
            return cartItem;
        }
    }
}

希望大家看我的文章能够有所帮助,有些细微的提升。

上一篇:nginx 反向代理 ElasticSearch es


下一篇:C# 写日志帮助类