java实现数据批量处理,达到一定数量或者达到一定时间去处理

1、批量处理通用代码

public class BatchQueue {

private final int batchSize;
private final Consumer<List<T>> consumer;
private final int timeoutInMs;

private AtomicBoolean isLooping = new AtomicBoolean(false);
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newCachedThreadPool();

private AtomicLong start = new AtomicLong(System.currentTimeMillis());

public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
    this.batchSize = batchSize;
    this.timeoutInMs = timeoutInMs;
    this.consumer = consumer;
}

public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
    this(batchSize, 500, consumer);
}

public boolean add(T t) {
    boolean result = queue.add(t);
    if(!isLooping.get() && result) {
        isLooping.set(true);
        startLoop();
    }
    return result;
}

public void completeAll() {
    while (!queue.isEmpty()) {
        drainToConsume();
    }
}

private void startLoop() {
    executorService.execute(new ExeThread());
}

private void drainToConsume() {
    List<T> drained = new ArrayList<>();
    int num = queue.drainTo(drained, batchSize);
    if(num > 0) {
        consumer.accept(drained);
        start.set(System.currentTimeMillis());
    }
}

private class ExeThread implements Runnable {
    @Override
    public void run() {
        start = new AtomicLong(System.currentTimeMillis());
        while(true) {
            long last = System.currentTimeMillis() - start.get() ;
            if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
                drainToConsume();
            } else if(queue.isEmpty()) {
                isLooping.set(false);
                break;
            }
        }
    }
}

2、测试类,我设置的批量长度为3,超时时间为5秒,只要满足一个条件就会执行处理方法

public class Test {
static BatchQueue batchQueue = new BatchQueue<>(3, 5000, x -> exe(x));

public static void main(String[] args) {
    while (true) {
        String line = new Scanner(System.in).nextLine();
        if (line.equals("done")) {
            batchQueue.completeAll();
            break;
        }
        batchQueue.add(line);
    }
}

private static void exe (List<String> o) {
    System.out.println("处理数据:" + o);
}

}1、批量处理通用代码

public class BatchQueue {

private final int batchSize;
private final Consumer<List<T>> consumer;
private final int timeoutInMs;

private AtomicBoolean isLooping = new AtomicBoolean(false);
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newCachedThreadPool();

private AtomicLong start = new AtomicLong(System.currentTimeMillis());

public BatchQueue(int batchSize, int timeoutInMs, Consumer<List<T>> consumer) {
    this.batchSize = batchSize;
    this.timeoutInMs = timeoutInMs;
    this.consumer = consumer;
}

public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
    this(batchSize, 500, consumer);
}

public boolean add(T t) {
    boolean result = queue.add(t);
    if(!isLooping.get() && result) {
        isLooping.set(true);
        startLoop();
    }
    return result;
}

public void completeAll() {
    while (!queue.isEmpty()) {
        drainToConsume();
    }
}

private void startLoop() {
    executorService.execute(new ExeThread());
}

private void drainToConsume() {
    List<T> drained = new ArrayList<>();
    int num = queue.drainTo(drained, batchSize);
    if(num > 0) {
        consumer.accept(drained);
        start.set(System.currentTimeMillis());
    }
}

private class ExeThread implements Runnable {
    @Override
    public void run() {
        start = new AtomicLong(System.currentTimeMillis());
        while(true) {
            long last = System.currentTimeMillis() - start.get() ;
            if (queue.size() >= batchSize || (!queue.isEmpty() && last > timeoutInMs)) {
                drainToConsume();
            } else if(queue.isEmpty()) {
                isLooping.set(false);
                break;
            }
        }
    }
}

2、测试类,我设置的批量长度为3,超时时间为5秒,只要满足一个条件就会执行处理方法

public class Test {
static BatchQueue batchQueue = new BatchQueue<>(3, 5000, x -> exe(x));

public static void main(String[] args) {
    while (true) {
        String line = new Scanner(System.in).nextLine();
        if (line.equals("done")) {
            batchQueue.completeAll();
            break;
        }
        batchQueue.add(line);
    }
}

private static void exe (List<String> o) {
    System.out.println("处理数据:" + o);
}

}

上一篇:kafka运维命令大全


下一篇:Python简单使用kafka