基于Redis实现多线程任务处理过程中遇到的一些问题

由于工作中涉及大量的后台数据处理任务,于是开发了一套基于redis和kafka的多线程任务处理组件,两者的应用场景差不多:

1、ETL工具(kettle)将需要处理的数据抽取到redis/kafka;

2、后台Job基于redis/kafaka的数据拉取任务队列,然后多线程执行;

注:Job的计算频率大概是30分钟一轮,每一轮大约3万条目标数据,每一条目标数据有多重计算任务(5-10个),不同计算任务的耗时在10秒-5分钟不等,不同数据之间的计算任务没有依赖关系和先后约束。

基于上述需求,多线程任务处理组件,需要实现如下几个核心功能:

1、任务及任务的数据源可配置

2、任务执行线程数可以配置

实现的过程摘要如下,仅记录思路:

##任务配置-支持不同任务定义不同数据源、不同任务不同的并发数量
redis:
enable: true
max-thread: 4
queue-consumers:
- consumer-name: expectCalculateQueueConsumer
queue-name: sdc-vessel-expect-cal
enable: true
batch-size: 24
max-thread: 24
- consumer-name: vesselEfficiencyQueueConsumer
queue-name: sdc-vessel-efficiency-cal
enable: false
batch-size: 200
max-thread: 4

启动类

@Component
@Order(11)
@Slf4j
public class RedisServiceExecutor implements ApplicationRunner, DisposableBean {

    @Autowired
    private SdcRedisProperties sdcRedisProperties;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) {
            for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) {
                if (Boolean.TRUE.equals(redisConsumers.getEnable())) {
                    startRedisQueueConsumer(redisConsumers, sdcRedisProperties);
                }
            }
        }
    }

    private void startRedisQueueConsumer(SdcRedisQueueConsumerProperties redisConsumers, SdcRedisProperties sdcRedisProperties) {
        QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisConsumers.getConsumerName(), QueueConsumer.class);
        if (queueConsumer != null) {
            queueConsumer.initConsumer(redisConsumers, sdcRedisProperties);
            queueConsumer.startConsume();
        }
    }

    @Override
    public void destroy() throws Exception {
        if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) {
            for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) {
                if (Boolean.TRUE.equals(redisConsumers.getEnable())) {
                    destroyConsumer(redisConsumers, sdcRedisProperties);
                }
            }
        }
    }

    private void destroyConsumer(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) {
        QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisQueueConsumerProperties.getConsumerName(), QueueConsumer.class);
        if (queueConsumer != null) {
            queueConsumer.destroy(redisQueueConsumerProperties, sdcRedisProperties);
        }
    }
}

QueueConsumer定义

public interface QueueConsumer<T> {
    Integer getBatchSize();
    Integer getMaxThread();
    RedisTemplate getRedisTemplate();
    void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties);
    void startConsume();
    void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties);
}

 

QueueConsumer的抽象实现

public abstract class AbstractQueueConsumer<T> implements QueueConsumer<T> {

    private static final Log logger = LogFactory.getLog(AbstractQueueConsumer.class);

    protected SdcRedisProperties sdcRedisProperties;
    protected SdcRedisQueueConsumerProperties sdcRedisQueueConsumerProperties;
    public static final Integer QUEUE_CAPACITY = 128;
    protected ThreadPoolExecutor consumerTaskExecutor;

    @Override
    public void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties) {
        this.sdcRedisProperties = sdcRedisProperties;
        this.sdcRedisQueueConsumerProperties = serviceProperties;
        getThreadPool();
    }

    @Override
    public void startConsume() {
        Thread puller = new Thread(new Runnable() {
            @Override
            public void run() {
                ThreadPoolExecutor threadPoolTaskExecutor = getThreadPool();
                while (true) {
                    try {
                        if (!workerQueueFull(threadPoolTaskExecutor)) {
                            List<T> lstData = batchPopByPipeline(sdcRedisQueueConsumerProperties.getQueueName());
                            if (!CollectionUtils.isEmpty(lstData)) {
                                lstData = cleanBeforeConsume(lstData);
                                consumeRecords(lstData, threadPoolTaskExecutor);
                            } else {
                                logger.info("no record pulled from queue " + sdcRedisQueueConsumerProperties.getQueueName() + ", sleep 30 seconds");
                                threadSleep(30000);
                            }
                        } else {
                            threadSleep(3000);
                            logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait data pulling on full executor pool for 3 seconds");
                        }
                    } catch (Exception e) {
                        logger.error(ExceptionUtils.getStackTrace(e));
                    } finally {
                        getThreadPool().purge();
                    }
                }
            }
        });
        puller.start();
    }

    protected boolean workerQueueFull(ThreadPoolExecutor threadPoolTaskExecutor) {
        int queueSize = threadPoolTaskExecutor.getQueue().size();
        logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Task Queue Size: " + queueSize + ", Worker Pool Size: " + threadPoolTaskExecutor.getPoolSize());
        return queueSize >= QUEUE_CAPACITY/* || threadPoolTaskExecutor.getPoolSize() >= getMaxThread()*/;
    }

    private List<T> cleanBeforeConsume(List<T> lstData) {
        List<T> toConsume = null;
        if (!CollectionUtils.isEmpty(lstData)) {
            toConsume = new ArrayList<>();
            for (T datum : lstData) {
                if (datum != null && !toConsume.contains(datum)) {
                    toConsume.add(datum);
                }
            }
        }
        return toConsume;
    }

    private List<T> batchPopByPipeline(String queueName) {
        Long size = getRedisTemplate().opsForList().size(queueName);
        Long fetchSize = Math.min(size, getBatchSize());
        if (fetchSize > 0L) {
            return getRedisTemplate().executePipelined(new RedisCallback<T>() {
                @Override
                public T doInRedis(RedisConnection connection) throws DataAccessException {
                    for (int i = 0; i < fetchSize; i++) {
                        connection.lPop(queueName.getBytes());
                    }
                    return null;
                }
            });
        } else {
            return null;
        }
    }

    protected void consumeRecords(List<T> records, ThreadPoolExecutor threadPoolTaskExecutor) {
        if (!CollectionUtils.isEmpty(records)) {
            logger.info("-------------- records pulled from redis queue: " + StringUtils.join(records));
            if (getMaxThread() == null || getMaxThread() <= 1) {
                try {
                    if (getQueueService().getTimeoutOfSingleTaskInSecond() > 0) {
                        TimedExecutor.timedInvoke(getQueueService(), "execute", new Class[]{List.class}, new Object[]{records},
                                getQueueService().getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS);
                    } else {
                        getQueueService().execute(records);
                    }
                } catch (Exception e) {
                    logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e));
                }
            } else {
                int taskNum = records.size();
                int workerNum = getMaxThread();
                if (taskNum <= workerNum) {
                    for (int i = 0; i < Math.min(taskNum, workerNum); i++) {
                        while (workerQueueFull(threadPoolTaskExecutor)) {
                            threadSleep(3000);
                            logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait task submitting on full executor pool for 3 seconds");
                        }
                        QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i, i + 1), getQueueService());
                        consumerTaskExecutor.submit(consumeTask);
                    }
                } else {
                    int batchSize = taskNum / workerNum;
                    if (taskNum % workerNum != 0) {
                        batchSize = batchSize + 1;
                    }
                    for (int i = 0; i < Math.min(workerNum, taskNum / batchSize + 1); i++) {
                        int endIndex = Math.min(i * batchSize + batchSize, taskNum);
                        while (workerQueueFull(threadPoolTaskExecutor)) {
                            threadSleep(3000);
                            logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait submitting on full pool for 3 seconds");
                        }
                        QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i * batchSize, endIndex), getQueueService());
                        consumerTaskExecutor.submit(consumeTask);
                    }
                }
            }
        }
    }

    private void threadSleep(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            logger.error(ExceptionUtils.getStackTrace(e));
        }
    }

    @Override
    public void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) {
        getThreadPool().shutdown();
    }

    protected abstract RedisQueueService getQueueService();

    public synchronized ThreadPoolExecutor getThreadPool() {
        if (consumerTaskExecutor == null) {
            initTaskExecutor();
        }
        return consumerTaskExecutor;
    }

    public void initTaskExecutor() {
        consumerTaskExecutor = new ThreadPoolExecutor(getMaxThread(), getMaxThread(), 1, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(QUEUE_CAPACITY), new ThreadPoolExecutor.DiscardPolicy());
        consumerTaskExecutor.prestartAllCoreThreads();
        consumerTaskExecutor.allowCoreThreadTimeOut(true);
    }

    public Integer getMaxThread() {
        Integer max = this.sdcRedisQueueConsumerProperties.getMaxThread() != null ? this.sdcRedisQueueConsumerProperties.getMaxThread() : this.sdcRedisProperties.getMaxThread();
        return max != null ? max : 1;
    }

    @Override
    public Integer getBatchSize() {
        return this.sdcRedisQueueConsumerProperties.getBatchSize();
    }


}

抽象实现中几个设计点:

1、调度线程:专门启动一个调度线程拉取redis的任务数据,再交由线程池处理;

2、线程池定义:corePool和maxPool设置一致,通过一个固定大小的queue存储任务,queue慢了不拉取,也不submit任务,避免数据拉下来后异常停机丢失;

3、支持批量拉取并基于不同的并发分配

 

QueueConsumer实现的核心思想是,线程执行需要在可设置时长内返回,不能因为redisQueueService的执行时长阻塞workQueue的轮训。

public class QueueConsumeTask<T> implements Callable<Long> {

    private static final Log logger = LogFactory.getLog(QueueConsumeTask.class);

    private final List<T> records;
    private final RedisQueueService redisQueueService;

    public QueueConsumeTask(List<T> records, RedisQueueService redisQueueService) {
        this.records = records;
        this.redisQueueService = redisQueueService;
    }

    @Override
    public Long call() {
        Long ret = 0L;
        try {
            if (redisQueueService.getTimeoutOfSingleTaskInSecond() > 0) {
                ret = (Long) TimedExecutor.timedInvoke(redisQueueService, "execute", new Class[]{List.class}, new Object[]{records},
                        redisQueueService.getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS);
            } else {
                ret = redisQueueService.execute(records);
            }
        } catch (Exception e) {
            logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e));
        }
        return ret;
    }
}

基于时长可控的原则,定义了TimedExecutor

public class TimedExecutor {

    private static final Log logger = LogFactory.getLog(TimedExecutor.class);

    public static Object timedInvoke(final Object target,
                                     final String methodName, final Class<?>[] parameterTypes, final Object[] params,
                                     long timeout, TimeUnit timeUnit) throws TimeoutException {
        Object ret = null;
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        FutureTask<?> futureTask = new FutureTask<>(new Callable<Object>() {
            public Object call() throws Exception {
                try {
                    Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes);
                    return method.invoke(target, params);
                } catch (Exception e) {
                    throw e;
                }
            }
        });
        try {
            executorService.execute(futureTask);
            ret = futureTask.get(timeout, timeUnit);
        } catch (TimeoutException e) {
            logger.warn("timedInvoke timeout, try to cancel future");
            futureTask.cancel(true);
            throw e;
        } catch (Exception e) {
            logger.warn("timedInvoke: " + ExceptionUtils.getStackTrace(e));
            throw new RuntimeException(e);
        } finally {
            executorService.shutdownNow();
        }
        return ret;
    }

}

 

RedisQueueService则是具体的业务逻辑

public interface RedisQueueService<T> {

    Long execute(List<T> records);

    default long getTimeoutOfSingleTaskInSecond() {
        return 0L;
    }
}

实现实例:

@Service
@Slf4j
public class VesselExpectQueueService implements RedisQueueService<Integer> {

    @Autowired
    DtVesselEstimateArrivalCalcService dtVesselEstimateArrivalCalcService;

    @Override
    public Long execute(List<Integer> records) {
        if (records != null) {
            for (Integer mmsi : records) {
                dtVesselEstimateArrivalCalcService.calcExpectArrAutoByJob(mmsi);
            }
            return (long) records.size();
        }
        return 0L;
    }

    @Override
    public long getTimeoutOfSingleTaskInSecond() {
        return 60;
    }
}
上一篇:leetcode 617. 合并二叉树


下一篇:Python Pandas pandas.DataFrame.from_records函数方法的使用