1、批量处理通用代码
public class BatchQueue {
private final int batchSize;
private final Consumer<List<T>> consumer;
private final int timeoutInMs;
private AtomicBoolean isLooping = new AtomicBoolean(false);
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newCachedThreadPool();
private AtomicLong start = new AtomicLong(System.currentTimeMillis());
public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
this.batchSize = batchSize;
this.timeoutInMs = timeoutInMs;
this.consumer = consumer;
}
public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
this(batchSize, 500, consumer);
}
public boolean add(T t) {
boolean result = queue.add(t);
if(!isLooping.get() && result) {
isLooping.set(true);
startLoop();
}
return result;
}
public void completeAll() {
while (!queue.isEmpty()) {
drainToConsume();
}
}
private void startLoop() {
executorService.execute(new ExeThread());
}
private void drainToConsume() {
List<T> drained = new ArrayList<>();
int num = queue.drainTo(drained, batchSize);
if(num > 0) {
consumer.accept(drained);
start.set(System.currentTimeMillis());
}
}
private class ExeThread implements Runnable {
@Override
public void run() {
start = new AtomicLong(System.currentTimeMillis());
while(true) {
long last = System.currentTimeMillis() - start.get() ;
if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
drainToConsume();
} else if(queue.isEmpty()) {
isLooping.set(false);
break;
}
}
}
}
2、测试类,我设置的批量长度为3,超时时间为5秒,只要满足一个条件就会执行处理方法
public class Test {
static BatchQueue batchQueue = new BatchQueue<>(3, 5000, x -> exe(x));
public static void main(String[] args) {
while (true) {
String line = new Scanner(System.in).nextLine();
if (line.equals("done")) {
batchQueue.completeAll();
break;
}
batchQueue.add(line);
}
}
private static void exe (List<String> o) {
System.out.println("处理数据:" + o);
}
}1、批量处理通用代码
public class BatchQueue {
private final int batchSize;
private final Consumer<List<T>> consumer;
private final int timeoutInMs;
private AtomicBoolean isLooping = new AtomicBoolean(false);
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newCachedThreadPool();
private AtomicLong start = new AtomicLong(System.currentTimeMillis());
public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
this.batchSize = batchSize;
this.timeoutInMs = timeoutInMs;
this.consumer = consumer;
}
public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
this(batchSize, 500, consumer);
}
public boolean add(T t) {
boolean result = queue.add(t);
if(!isLooping.get() && result) {
isLooping.set(true);
startLoop();
}
return result;
}
public void completeAll() {
while (!queue.isEmpty()) {
drainToConsume();
}
}
private void startLoop() {
executorService.execute(new ExeThread());
}
private void drainToConsume() {
List<T> drained = new ArrayList<>();
int num = queue.drainTo(drained, batchSize);
if(num > 0) {
consumer.accept(drained);
start.set(System.currentTimeMillis());
}
}
private class ExeThread implements Runnable {
@Override
public void run() {
start = new AtomicLong(System.currentTimeMillis());
while(true) {
long last = System.currentTimeMillis() - start.get() ;
if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
drainToConsume();
} else if(queue.isEmpty()) {
isLooping.set(false);
break;
}
}
}
}
2、测试类,我设置的批量长度为3,超时时间为5秒,只要满足一个条件就会执行处理方法
public class Test {
static BatchQueue batchQueue = new BatchQueue<>(3, 5000, x -> exe(x));
public static void main(String[] args) {
while (true) {
String line = new Scanner(System.in).nextLine();
if (line.equals("done")) {
batchQueue.completeAll();
break;
}
batchQueue.add(line);
}
}
private static void exe (List<String> o) {
System.out.println("处理数据:" + o);
}
}