由于工作中涉及大量的后台数据处理任务,于是开发了一套基于redis和kafka的多线程任务处理组件,两者的应用场景差不多:
1、ETL工具(kettle)将需要处理的数据抽取到redis/kafka;
2、后台Job基于redis/kafaka的数据拉取任务队列,然后多线程执行;
注:Job的计算频率大概是30分钟一轮,每一轮大约3万条目标数据,每一条目标数据有多重计算任务(5-10个),不同计算任务的耗时在10秒-5分钟不等,不同数据之间的计算任务没有依赖关系和先后约束。
基于上述需求,多线程任务处理组件,需要实现如下几个核心功能:
1、任务及任务的数据源可配置
2、任务执行线程数可以配置
实现的过程摘要如下,仅记录思路:
##任务配置-支持不同任务定义不同数据源、不同任务不同的并发数量 |
启动类
@Component
@Order(11)
@Slf4j
public class RedisServiceExecutor implements ApplicationRunner, DisposableBean {
@Autowired
private SdcRedisProperties sdcRedisProperties;
@Override
public void run(ApplicationArguments args) throws Exception {
if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) {
for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) {
if (Boolean.TRUE.equals(redisConsumers.getEnable())) {
startRedisQueueConsumer(redisConsumers, sdcRedisProperties);
}
}
}
}
private void startRedisQueueConsumer(SdcRedisQueueConsumerProperties redisConsumers, SdcRedisProperties sdcRedisProperties) {
QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisConsumers.getConsumerName(), QueueConsumer.class);
if (queueConsumer != null) {
queueConsumer.initConsumer(redisConsumers, sdcRedisProperties);
queueConsumer.startConsume();
}
}
@Override
public void destroy() throws Exception {
if (Boolean.TRUE.equals(sdcRedisProperties.getEnable()) && !CollectionUtils.isEmpty(sdcRedisProperties.getQueueConsumers())) {
for (SdcRedisQueueConsumerProperties redisConsumers : sdcRedisProperties.getQueueConsumers()) {
if (Boolean.TRUE.equals(redisConsumers.getEnable())) {
destroyConsumer(redisConsumers, sdcRedisProperties);
}
}
}
}
private void destroyConsumer(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) {
QueueConsumer queueConsumer = ApplicationContextUtil.getBean(redisQueueConsumerProperties.getConsumerName(), QueueConsumer.class);
if (queueConsumer != null) {
queueConsumer.destroy(redisQueueConsumerProperties, sdcRedisProperties);
}
}
}
QueueConsumer定义
public interface QueueConsumer<T> {
Integer getBatchSize();
Integer getMaxThread();
RedisTemplate getRedisTemplate();
void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties);
void startConsume();
void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties);
}
QueueConsumer的抽象实现
public abstract class AbstractQueueConsumer<T> implements QueueConsumer<T> {
private static final Log logger = LogFactory.getLog(AbstractQueueConsumer.class);
protected SdcRedisProperties sdcRedisProperties;
protected SdcRedisQueueConsumerProperties sdcRedisQueueConsumerProperties;
public static final Integer QUEUE_CAPACITY = 128;
protected ThreadPoolExecutor consumerTaskExecutor;
@Override
public void initConsumer(SdcRedisQueueConsumerProperties serviceProperties, SdcRedisProperties sdcRedisProperties) {
this.sdcRedisProperties = sdcRedisProperties;
this.sdcRedisQueueConsumerProperties = serviceProperties;
getThreadPool();
}
@Override
public void startConsume() {
Thread puller = new Thread(new Runnable() {
@Override
public void run() {
ThreadPoolExecutor threadPoolTaskExecutor = getThreadPool();
while (true) {
try {
if (!workerQueueFull(threadPoolTaskExecutor)) {
List<T> lstData = batchPopByPipeline(sdcRedisQueueConsumerProperties.getQueueName());
if (!CollectionUtils.isEmpty(lstData)) {
lstData = cleanBeforeConsume(lstData);
consumeRecords(lstData, threadPoolTaskExecutor);
} else {
logger.info("no record pulled from queue " + sdcRedisQueueConsumerProperties.getQueueName() + ", sleep 30 seconds");
threadSleep(30000);
}
} else {
threadSleep(3000);
logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait data pulling on full executor pool for 3 seconds");
}
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
} finally {
getThreadPool().purge();
}
}
}
});
puller.start();
}
protected boolean workerQueueFull(ThreadPoolExecutor threadPoolTaskExecutor) {
int queueSize = threadPoolTaskExecutor.getQueue().size();
logger.info(sdcRedisQueueConsumerProperties.getConsumerName() + " Task Queue Size: " + queueSize + ", Worker Pool Size: " + threadPoolTaskExecutor.getPoolSize());
return queueSize >= QUEUE_CAPACITY/* || threadPoolTaskExecutor.getPoolSize() >= getMaxThread()*/;
}
private List<T> cleanBeforeConsume(List<T> lstData) {
List<T> toConsume = null;
if (!CollectionUtils.isEmpty(lstData)) {
toConsume = new ArrayList<>();
for (T datum : lstData) {
if (datum != null && !toConsume.contains(datum)) {
toConsume.add(datum);
}
}
}
return toConsume;
}
private List<T> batchPopByPipeline(String queueName) {
Long size = getRedisTemplate().opsForList().size(queueName);
Long fetchSize = Math.min(size, getBatchSize());
if (fetchSize > 0L) {
return getRedisTemplate().executePipelined(new RedisCallback<T>() {
@Override
public T doInRedis(RedisConnection connection) throws DataAccessException {
for (int i = 0; i < fetchSize; i++) {
connection.lPop(queueName.getBytes());
}
return null;
}
});
} else {
return null;
}
}
protected void consumeRecords(List<T> records, ThreadPoolExecutor threadPoolTaskExecutor) {
if (!CollectionUtils.isEmpty(records)) {
logger.info("-------------- records pulled from redis queue: " + StringUtils.join(records));
if (getMaxThread() == null || getMaxThread() <= 1) {
try {
if (getQueueService().getTimeoutOfSingleTaskInSecond() > 0) {
TimedExecutor.timedInvoke(getQueueService(), "execute", new Class[]{List.class}, new Object[]{records},
getQueueService().getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS);
} else {
getQueueService().execute(records);
}
} catch (Exception e) {
logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e));
}
} else {
int taskNum = records.size();
int workerNum = getMaxThread();
if (taskNum <= workerNum) {
for (int i = 0; i < Math.min(taskNum, workerNum); i++) {
while (workerQueueFull(threadPoolTaskExecutor)) {
threadSleep(3000);
logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait task submitting on full executor pool for 3 seconds");
}
QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i, i + 1), getQueueService());
consumerTaskExecutor.submit(consumeTask);
}
} else {
int batchSize = taskNum / workerNum;
if (taskNum % workerNum != 0) {
batchSize = batchSize + 1;
}
for (int i = 0; i < Math.min(workerNum, taskNum / batchSize + 1); i++) {
int endIndex = Math.min(i * batchSize + batchSize, taskNum);
while (workerQueueFull(threadPoolTaskExecutor)) {
threadSleep(3000);
logger.warn(sdcRedisQueueConsumerProperties.getConsumerName() + " Wait submitting on full pool for 3 seconds");
}
QueueConsumeTask consumeTask = new QueueConsumeTask<T>(records.subList(i * batchSize, endIndex), getQueueService());
consumerTaskExecutor.submit(consumeTask);
}
}
}
}
}
private void threadSleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
}
@Override
public void destroy(SdcRedisQueueConsumerProperties redisQueueConsumerProperties, SdcRedisProperties sdcRedisProperties) {
getThreadPool().shutdown();
}
protected abstract RedisQueueService getQueueService();
public synchronized ThreadPoolExecutor getThreadPool() {
if (consumerTaskExecutor == null) {
initTaskExecutor();
}
return consumerTaskExecutor;
}
public void initTaskExecutor() {
consumerTaskExecutor = new ThreadPoolExecutor(getMaxThread(), getMaxThread(), 1, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(QUEUE_CAPACITY), new ThreadPoolExecutor.DiscardPolicy());
consumerTaskExecutor.prestartAllCoreThreads();
consumerTaskExecutor.allowCoreThreadTimeOut(true);
}
public Integer getMaxThread() {
Integer max = this.sdcRedisQueueConsumerProperties.getMaxThread() != null ? this.sdcRedisQueueConsumerProperties.getMaxThread() : this.sdcRedisProperties.getMaxThread();
return max != null ? max : 1;
}
@Override
public Integer getBatchSize() {
return this.sdcRedisQueueConsumerProperties.getBatchSize();
}
}
抽象实现中几个设计点:
1、调度线程:专门启动一个调度线程拉取redis的任务数据,再交由线程池处理;
2、线程池定义:corePool和maxPool设置一致,通过一个固定大小的queue存储任务,queue慢了不拉取,也不submit任务,避免数据拉下来后异常停机丢失;
3、支持批量拉取并基于不同的并发分配
QueueConsumer实现的核心思想是,线程执行需要在可设置时长内返回,不能因为redisQueueService的执行时长阻塞workQueue的轮训。
public class QueueConsumeTask<T> implements Callable<Long> {
private static final Log logger = LogFactory.getLog(QueueConsumeTask.class);
private final List<T> records;
private final RedisQueueService redisQueueService;
public QueueConsumeTask(List<T> records, RedisQueueService redisQueueService) {
this.records = records;
this.redisQueueService = redisQueueService;
}
@Override
public Long call() {
Long ret = 0L;
try {
if (redisQueueService.getTimeoutOfSingleTaskInSecond() > 0) {
ret = (Long) TimedExecutor.timedInvoke(redisQueueService, "execute", new Class[]{List.class}, new Object[]{records},
redisQueueService.getTimeoutOfSingleTaskInSecond(), TimeUnit.SECONDS);
} else {
ret = redisQueueService.execute(records);
}
} catch (Exception e) {
logger.warn(String.format("Queue consumer unexpect finished for [%s]: ", StringUtil.join(records, ",")) + ExceptionUtils.getStackTrace(e));
}
return ret;
}
}
基于时长可控的原则,定义了TimedExecutor
public class TimedExecutor {
private static final Log logger = LogFactory.getLog(TimedExecutor.class);
public static Object timedInvoke(final Object target,
final String methodName, final Class<?>[] parameterTypes, final Object[] params,
long timeout, TimeUnit timeUnit) throws TimeoutException {
Object ret = null;
ExecutorService executorService = Executors.newSingleThreadExecutor();
FutureTask<?> futureTask = new FutureTask<>(new Callable<Object>() {
public Object call() throws Exception {
try {
Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes);
return method.invoke(target, params);
} catch (Exception e) {
throw e;
}
}
});
try {
executorService.execute(futureTask);
ret = futureTask.get(timeout, timeUnit);
} catch (TimeoutException e) {
logger.warn("timedInvoke timeout, try to cancel future");
futureTask.cancel(true);
throw e;
} catch (Exception e) {
logger.warn("timedInvoke: " + ExceptionUtils.getStackTrace(e));
throw new RuntimeException(e);
} finally {
executorService.shutdownNow();
}
return ret;
}
}
RedisQueueService则是具体的业务逻辑
public interface RedisQueueService<T> {
Long execute(List<T> records);
default long getTimeoutOfSingleTaskInSecond() {
return 0L;
}
}
实现实例:
@Service
@Slf4j
public class VesselExpectQueueService implements RedisQueueService<Integer> {
@Autowired
DtVesselEstimateArrivalCalcService dtVesselEstimateArrivalCalcService;
@Override
public Long execute(List<Integer> records) {
if (records != null) {
for (Integer mmsi : records) {
dtVesselEstimateArrivalCalcService.calcExpectArrAutoByJob(mmsi);
}
return (long) records.size();
}
return 0L;
}
@Override
public long getTimeoutOfSingleTaskInSecond() {
return 60;
}
}