1.异步数据吗提交
package com.xll.pattern.design.asynsubmit; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * <p> * 异步提交处理数据 * </p> * * @author liangliang.xu * @since 2022/2/19 12:57 AM */ public class AsyncDataCommitChannel<T> { /** * 队列容量 */ private static final Integer defaultCapacity = 500; /** * 默认超时时间 */ private static final Integer defaultTimeout = 1000; /** * 配置容量 */ private Integer capacity; /** * 通道名称 */ private final String channelName; /** * 队列锁 */ private final Object queueLock = new Object(); /** * 存储队列 */ private LinkedBlockingDeque<T> queue; private Semaphore queueStored; public AsyncDataCommitChannel(Integer capacity, String channelName){ this.capacity = capacity; this.channelName = channelName; } @PostConstruct private void init(){ if(this.capacity == null || this.capacity == 0){ capacity = defaultCapacity; } queue = new LinkedBlockingDeque<>(capacity); queueStored = new Semaphore(0); } public boolean put(T obj) throws Exception{ if (!queue.offer(obj)) { throw new Exception("Put queue 容量为 capacity " + queue.size() + " 满了 通道名称"+channelName+", 考虑更频繁地提交,增加容量或增加线程数"); } queueStored.release(); return false; } /** * 获取队列数据 * @param size 获取数量 * @return * @throws InterruptedException */ public List<T> take(int size) throws InterruptedException{ if(!queueStored.tryAcquire(size, defaultTimeout, TimeUnit.MILLISECONDS)){ return null; } List<T> ret = new ArrayList<>(); int j = size; synchronized (queueLock) { while(j>0){ ret.add(queue.poll()); j--; } } return ret; } /** * 获取记录剩余消费量 * @return */ public Integer getAvailableNo(){ return this.queueStored.availablePermits(); } public String getChannelName() { return channelName; } }
2.异步数据处理
package com.xll.pattern.design.asynsubmit; import com.xll.pattern.design.asynsubmit.model.Record; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; /** * <p> * 默认提交通道消费 * </p> * * @author liangliang.xu * @since 2022/2/19 12:58 AM */ @Slf4j public class DefaultCommitChannelConsumer { /** * 默认批处理数 */ private static final Integer DEFAULT_BATCH_CAPACITY = 10; /** * 获取队列数据 */ private final AsyncDataCommitChannel<Record> channel; /** * 批量处理数 */ private final Integer batchCapacity; /** * 超时时间 */ private final Integer timeout; /** * 任务暂停标识 */ private final AtomicBoolean isStopFlag = new AtomicBoolean(false); private final ExecutorService executorService; public DefaultCommitChannelConsumer(AsyncDataCommitChannel<Record> channel, Integer batchCapacity, Integer timeout, Integer consumerThreadPoolSize) { this.channel = channel; this.batchCapacity = (batchCapacity == null || batchCapacity == 0) ? DEFAULT_BATCH_CAPACITY : batchCapacity; this.timeout = timeout; log.info("new DefaultCommitChannelConsumer, batchCapacity={},timeout={},consumerThreadPoolSize={}", this.batchCapacity, this.timeout, consumerThreadPoolSize); executorService = Executors.newFixedThreadPool(consumerThreadPoolSize); executorService.execute(this::consumeSpaceTimeOut); executorService.execute(this::consumeBatch); executorService.execute(this::autoStopTask); } /** * 手动添加消费任务 * @param size */ public void addConsumerTask(int size){ for (int i = 0; i < size; i++) { executorService.execute(this::consumeBatch); } } /** * 手动停止任务 - 对外暴露方法 */ public void notAutoStopTask(boolean isStopFlag){ this.isStopFlag.set(isStopFlag); } /** * 自动停止任务 - 内置 */ private void autoStopTask(){ while (true){ if(channel.getAvailableNo() == 0){ log.info("线程名称【{}】自动停止任务 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName()); this.isStopFlag.set(true); try { //暂停 Thread.sleep(5000); log.info("线程名称【{}】自动开启任务 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName()); this.isStopFlag.set(false); executorService.execute(this::consumeSpaceTimeOut); executorService.execute(this::consumeBatch); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 批量获取数据 - 作用:防止队列数据过多,导致单条处理不过来,则使用批量进行处理获取队列数据 * @param */ private void consumeBatch() { while (true) { if(isStopFlag.get()){ break; } try { List<Record> ret = channel.take(batchCapacity); if(ret == null){ log.info("线程名称[{}] 批量获取数据结果为空 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName()); continue; } log.info("线程名称[{}] 当前提交记录剩消费量:[{}] 流程名称:{}" ,Thread.currentThread().getName(),channel.getAvailableNo(),channel.getChannelName()); log.info("线程名称[{}] {} 开始批量插入提交记录消息",Thread.currentThread().getName(),channel.getChannelName()); //异步处理逻辑 Thread.sleep(1000); log.info("线程名称[{}] {} 结束批量插入提交记录消息:",Thread.currentThread().getName(),channel.getChannelName()); } catch (Exception t) { log.error("DataCommit Consume message error", t); Thread.currentThread().interrupt(); } } log.info("线程名称[{}] 批量获取数据 - consumeBatch stop 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName()); } /** * 间隔timeout时间获取一条数据 */ private void consumeSpaceTimeOut() { while (true) { if(isStopFlag.get()){ break; } try { List<Record> rets = channel.take(1); if (rets == null || rets.size() == 0) { continue; } log.info("线程名称【{}】 {} 开始间隔timeout时间获取插入提交记录消息:",Thread.currentThread().getName(), channel.getChannelName()); Record record = rets.get(0); log.info("获取数据:{}", record); //异步处理逻辑 Thread.sleep(1000); log.info("线程名称【{}】 {} 结束间隔timeout时间获取插入提交记录消息:",Thread.currentThread().getName(),channel.getChannelName()); } catch (InterruptedException e) { log.error("DataCommit consumer message error", e); Thread.currentThread().interrupt(); } } log.info("线程名称【{}】间隔timeout时间获取一条数据 - consumeSpaceTimeOut stop 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName()); } }
3.Bean多处理器配置
package com.xll.pattern.design.asynsubmit; import com.xll.pattern.design.asynsubmit.model.Record; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * <p> * 配置多处理器bean, 之后可以做抽象类处理 * </p> * * @author liangliang.xu * @since 2022/2/19 12:58 AM */ @Configuration public class BeanConfig { @Bean("async.asyncDataCommitChannelOrder") public AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder(){ return new AsyncDataCommitChannel<>(1000, "订单异步处理数据通道"); } @Bean("async.asyncDataCommitChannelWms") public AsyncDataCommitChannel<Record> asyncDataCommitChannelWms(){ return new AsyncDataCommitChannel<>(1000, "物流异步处理数据通道"); } @Bean("async.consumer.orderCommitChannelConsumer") public DefaultCommitChannelConsumer orderCommitChannelConsumer(@Qualifier("async.asyncDataCommitChannelOrder")AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder){ return new DefaultCommitChannelConsumer(asyncDataCommitChannelOrder,2, 200, 10); } @Bean("async.consumer.wmsCommitChannelConsumer") public DefaultCommitChannelConsumer wmsCommitChannelConsumer(@Qualifier("async.asyncDataCommitChannelWms")AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder){ return new DefaultCommitChannelConsumer(asyncDataCommitChannelOrder,2, 200, 10); } }
4.Main测试
package com.xll.pattern.design.asynsubmit; import com.xll.pattern.design.asynsubmit.model.Record; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; /** * <p> * test * </p> * * @author liangliang.xu * @since 2022/2/19 12:58 AM */ public class Main { public static void main(String[] args) throws Exception { ApplicationContext context = new AnnotationConfigApplicationContext(BeanConfig.class); AsyncDataCommitChannel asyncDataCommitChannelOrder = (AsyncDataCommitChannel)context.getBean("async.asyncDataCommitChannelOrder"); for (int i = 0; i < 50; i++) { Record record = new Record(); record.setContext("订单"+i); record.setMessageId(String.valueOf(i)); asyncDataCommitChannelOrder.put(record); } Thread.sleep(5000); for (int i = 50; i < 100; i++) { Record record = new Record(); record.setContext("订单"+i); record.setMessageId(String.valueOf(i)); asyncDataCommitChannelOrder.put(record); } System.out.println("获取记录剩余消费量:"+asyncDataCommitChannelOrder.getAvailableNo()); } }