需求
容器容量达到10则批处理,如果超过5s容量没有10,也进行批处理。
实现
/**
* @Desc:
* @Author: heling
* @Date: 2021/5/6 14:49
*/
@Slf4j
public class BatchHandleList extends Vector<String> {
private static volatile BatchHandleList list;
//集合容量达到10则批处理
private static int batchSize = 10;
//达到5s即使容器容量小于10也批处理
private static int timeout = 5;
//上一次处理时间戳
private static volatile long lastHandTsp;
final ScheduledExecutorService executor;
private BatchHandleList() {
super();
this.executor = Executors.newScheduledThreadPool(1, (r) -> {
Thread t = new Thread(r);
t.setName("定期检查容器线程");
t.setDaemon(true);
return t;
});
this.executor.scheduleWithFixedDelay(() -> {
try {
if (System.currentTimeMillis() - lastHandTsp >= timeout * 1000L) {
log.info("========达到时间阈值,开始批处理========");
int size = size();
for (int i = 0; i < size; i++) {
String v = super.remove(0);
log.info("处理{}", v);
}
lastHandTsp = System.currentTimeMillis();
}
} catch (Exception e) {
}
}, 100L, timeout * 1000L, TimeUnit.MILLISECONDS);
lastHandTsp = System.currentTimeMillis();
}
public static BatchHandleList getInstance() {
if (null == list) {
synchronized (BatchHandleList.class) {
if (null == list) {
list = new BatchHandleList();
}
}
}
return list;
}
@Override
public synchronized boolean add(String str) {
boolean success = super.add(str);
if (size() >= batchSize) {
log.info("========达到批次数量阈值,开始批处理========");
for (int i = 0; i < batchSize; i++) {
String v = super.remove(0);
log.info("处理{}", v);
}
lastHandTsp = System.currentTimeMillis();
}
return success;
}
}
测试
public static void main(String[] args) throws InterruptedException {
BatchHandleList list = BatchHandleList.getInstance();
for (int i = 1; i <= 15; i++) {
list.add(i + "");
}
TimeUnit.SECONDS.sleep(10L);
for (int i = 1; i < 5; i++) {
TimeUnit.SECONDS.sleep(2L);
list.add(i + "");
}
TimeUnit.MINUTES.sleep(10L);
}
结果输出:
15:47:52.080 [main] INFO com.cxs.crm.ext.BatchHandleList - ========达到批次数量阈值,开始批处理========
15:47:52.082 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理1
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理2
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理3
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理4
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理5
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理6
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理7
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理8
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理9
15:47:52.084 [main] INFO com.cxs.crm.ext.BatchHandleList - 处理10
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - ========达到时间阈值,开始批处理========
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理11
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理12
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理13
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理14
15:47:57.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理15
15:48:02.181 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - ========达到时间阈值,开始批处理========
15:48:07.183 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - ========达到时间阈值,开始批处理========
15:48:07.183 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理1
15:48:07.183 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理2
15:48:12.184 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - ========达到时间阈值,开始批处理========
15:48:12.184 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理3
15:48:12.184 [定期检查容器线程] INFO com.cxs.crm.ext.BatchHandleList - 处理4