异步提交处理场景code

1.异步数据吗提交

package com.xll.pattern.design.asynsubmit;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 *  异步提交处理数据
 * </p>
 *
 * @author liangliang.xu
 * @since 2022/2/19 12:57 AM
 */
public class AsyncDataCommitChannel<T> {
    /**
     * 队列容量
     */
    private static final Integer defaultCapacity = 500;
    /**
     * 默认超时时间
     */
    private static final Integer defaultTimeout = 1000;

    /**
     * 配置容量
     */
    private Integer capacity;
    /**
     * 通道名称
     */
    private final String channelName;

    /**
     * 队列锁
     */
    private final Object queueLock = new Object();

    /**
     * 存储队列
     */
    private LinkedBlockingDeque<T> queue;

    private Semaphore queueStored;

    public AsyncDataCommitChannel(Integer capacity, String channelName){
        this.capacity = capacity;
        this.channelName = channelName;
    }

    @PostConstruct
    private void init(){
        if(this.capacity == null || this.capacity == 0){
            capacity = defaultCapacity;
        }
        queue = new LinkedBlockingDeque<>(capacity);
        queueStored = new Semaphore(0);
    }

    public boolean put(T obj) throws Exception{
        if (!queue.offer(obj)) {
            throw new Exception("Put queue 容量为 capacity " + queue.size() + " 满了 通道名称"+channelName+", 考虑更频繁地提交,增加容量或增加线程数");
        }
        queueStored.release();
        return false;
    }

    /**
     * 获取队列数据
     * @param size 获取数量
     * @return
     * @throws InterruptedException
     */
    public List<T> take(int size) throws InterruptedException{
        if(!queueStored.tryAcquire(size, defaultTimeout, TimeUnit.MILLISECONDS)){
            return null;
        }
        List<T> ret = new ArrayList<>();
        int j = size;
        synchronized (queueLock) {
            while(j>0){
                ret.add(queue.poll());
                j--;
            }
        }
        return ret;
    }

    /**
     * 获取记录剩余消费量
     * @return
     */
    public Integer getAvailableNo(){
        return this.queueStored.availablePermits();
    }

    public String getChannelName() {
        return channelName;
    }
}

2.异步数据处理

package com.xll.pattern.design.asynsubmit;

import com.xll.pattern.design.asynsubmit.model.Record;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>
 *  默认提交通道消费
 * </p>
 *
 * @author liangliang.xu
 * @since 2022/2/19 12:58 AM
 */
@Slf4j
public class DefaultCommitChannelConsumer {
    /**
     * 默认批处理数
     */
    private static final Integer DEFAULT_BATCH_CAPACITY = 10;
    /**
     * 获取队列数据
     */
    private final AsyncDataCommitChannel<Record> channel;
    /**
     * 批量处理数
     */
    private final Integer batchCapacity;
    /**
     * 超时时间
     */
    private final Integer timeout;
    /**
     * 任务暂停标识
     */
    private final AtomicBoolean isStopFlag = new AtomicBoolean(false);

    private final ExecutorService executorService;

    public DefaultCommitChannelConsumer(AsyncDataCommitChannel<Record> channel, Integer batchCapacity, Integer timeout, Integer consumerThreadPoolSize) {
        this.channel = channel;
        this.batchCapacity = (batchCapacity == null || batchCapacity == 0) ? DEFAULT_BATCH_CAPACITY : batchCapacity;
        this.timeout = timeout;
        log.info("new DefaultCommitChannelConsumer, batchCapacity={},timeout={},consumerThreadPoolSize={}", this.batchCapacity, this.timeout, consumerThreadPoolSize);
        executorService =  Executors.newFixedThreadPool(consumerThreadPoolSize);
        executorService.execute(this::consumeSpaceTimeOut);
        executorService.execute(this::consumeBatch);
        executorService.execute(this::autoStopTask);
    }

    /**
     * 手动添加消费任务
     * @param size
     */
    public void addConsumerTask(int size){
        for (int i = 0; i < size; i++) {
            executorService.execute(this::consumeBatch);
        }
    }

    /**
     * 手动停止任务 - 对外暴露方法
     */
    public void notAutoStopTask(boolean isStopFlag){
        this.isStopFlag.set(isStopFlag);
    }

    /**
     * 自动停止任务 - 内置
     */
    private void autoStopTask(){
        while (true){
            if(channel.getAvailableNo() == 0){
                log.info("线程名称【{}】自动停止任务 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName());
                this.isStopFlag.set(true);
                try {
                    //暂停
                    Thread.sleep(5000);
                    log.info("线程名称【{}】自动开启任务 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName());
                    this.isStopFlag.set(false);
                    executorService.execute(this::consumeSpaceTimeOut);
                    executorService.execute(this::consumeBatch);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 批量获取数据 - 作用:防止队列数据过多,导致单条处理不过来,则使用批量进行处理获取队列数据
     * @param
     */
    private void consumeBatch() {
        while (true) {
            if(isStopFlag.get()){
                break;
            }
            try {
                List<Record> ret = channel.take(batchCapacity);
                if(ret == null){
                    log.info("线程名称[{}] 批量获取数据结果为空 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName());
                    continue;
                }
                log.info("线程名称[{}] 当前提交记录剩消费量:[{}] 流程名称:{}" ,Thread.currentThread().getName(),channel.getAvailableNo(),channel.getChannelName());
                log.info("线程名称[{}] {} 开始批量插入提交记录消息",Thread.currentThread().getName(),channel.getChannelName());
                //异步处理逻辑
                Thread.sleep(1000);
                log.info("线程名称[{}] {} 结束批量插入提交记录消息:",Thread.currentThread().getName(),channel.getChannelName());

            } catch (Exception t) {
                log.error("DataCommit Consume message error", t);
                Thread.currentThread().interrupt();
            }
        }
        log.info("线程名称[{}] 批量获取数据 - consumeBatch stop 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName());
    }

    /**
     * 间隔timeout时间获取一条数据
     */
    private void consumeSpaceTimeOut() {
        while (true) {
            if(isStopFlag.get()){
                break;
            }
            try {
                List<Record> rets = channel.take(1);
                if (rets == null || rets.size() == 0) {
                    continue;
                }
                log.info("线程名称【{}】 {} 开始间隔timeout时间获取插入提交记录消息:",Thread.currentThread().getName(), channel.getChannelName());
                Record record = rets.get(0);
                log.info("获取数据:{}", record);
                //异步处理逻辑
                Thread.sleep(1000);
                log.info("线程名称【{}】 {} 结束间隔timeout时间获取插入提交记录消息:",Thread.currentThread().getName(),channel.getChannelName());

            } catch (InterruptedException e) {
                log.error("DataCommit consumer message error", e);
                Thread.currentThread().interrupt();
            }
        }
        log.info("线程名称【{}】间隔timeout时间获取一条数据 - consumeSpaceTimeOut stop 流程名称:{}", Thread.currentThread().getName(),channel.getChannelName());
    }
}

3.Bean多处理器配置

package com.xll.pattern.design.asynsubmit;

import com.xll.pattern.design.asynsubmit.model.Record;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>
 *  配置多处理器bean, 之后可以做抽象类处理
 * </p>
 *
 * @author liangliang.xu
 * @since 2022/2/19 12:58 AM
 */
@Configuration
public class BeanConfig {

    @Bean("async.asyncDataCommitChannelOrder")
    public AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder(){
        return new AsyncDataCommitChannel<>(1000, "订单异步处理数据通道");
    }

    @Bean("async.asyncDataCommitChannelWms")
    public AsyncDataCommitChannel<Record> asyncDataCommitChannelWms(){
        return new AsyncDataCommitChannel<>(1000, "物流异步处理数据通道");
    }

    @Bean("async.consumer.orderCommitChannelConsumer")
    public DefaultCommitChannelConsumer orderCommitChannelConsumer(@Qualifier("async.asyncDataCommitChannelOrder")AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder){
        return new DefaultCommitChannelConsumer(asyncDataCommitChannelOrder,2, 200, 10);
    }

    @Bean("async.consumer.wmsCommitChannelConsumer")
    public DefaultCommitChannelConsumer wmsCommitChannelConsumer(@Qualifier("async.asyncDataCommitChannelWms")AsyncDataCommitChannel<Record> asyncDataCommitChannelOrder){
        return new DefaultCommitChannelConsumer(asyncDataCommitChannelOrder,2, 200, 10);
    }

}

4.Main测试

package com.xll.pattern.design.asynsubmit;

import com.xll.pattern.design.asynsubmit.model.Record;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/**
 * <p>
 *  test
 * </p>
 *
 * @author liangliang.xu
 * @since 2022/2/19 12:58 AM
 */
public class Main {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(BeanConfig.class);
        AsyncDataCommitChannel asyncDataCommitChannelOrder = (AsyncDataCommitChannel)context.getBean("async.asyncDataCommitChannelOrder");

        for (int i = 0; i < 50; i++) {
            Record record = new Record();
            record.setContext("订单"+i);
            record.setMessageId(String.valueOf(i));
            asyncDataCommitChannelOrder.put(record);
        }
        Thread.sleep(5000);
        for (int i = 50; i < 100; i++) {
            Record record = new Record();
            record.setContext("订单"+i);
            record.setMessageId(String.valueOf(i));
            asyncDataCommitChannelOrder.put(record);
        }
        System.out.println("获取记录剩余消费量:"+asyncDataCommitChannelOrder.getAvailableNo());
    }
}

  

 

上一篇:RabbitMQ-发布订阅


下一篇:golang中channel发送和接收元素的本质是什么