ForkJoin简介:
- forkjoin框架是java7提供的用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇集每一个小任务结果得到大任务结果的框架。
核心思想:分治
- fork:分解任务
- join:收集数据
代码示例
package com.fixData.util.use;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
//forkJoin框架
public class ForkJoinTest {
// 需要处理的数据
// 根据业务需要,这里的String可以是Map、Json等等的任意实体
private List<String> dataList;
// 线程数,一般我用5个
private int threadNum=5;
public static void main(String[] args) {
new ForkJoinTest().dowork();
}
private void dowork() {
while (true) {
// 通过查询 获得 需要处理的数据
dataList = new ArrayList<String>();
if (dataList == null || dataList.size() < 0) {
System.out.println("dataList长度为0!!!");
break;
}
// 通过forkjoin框架处理
ForkJoinPool pool = new ForkJoinPool(100);
Task task = new Task(dataList);// 传入需要处理的数据
pool.submit(task);// 提交处理
task.join();// 搜集数据
//读取mysql、mongo数据,处理成功之后一般修改mysql数据状态
//一般在mysql表中加status字段,0未处理,1处理成功,2处理失败等等
//查询的时候直接查状态为0的数据
}
}
// 处理数据
private void deal(List<String> list) {
for (String str : list) {
}
}
// 直接用,除了类名什么都不用改
class Task extends RecursiveTask<String> {
// 根据线程将数据划分
private int size = dataList.size() / threadNum == 0 ? 1 : dataList.size() / threadNum;
private List<String> data;
public Task(List<String> data) {
this.data = data;
}
@Override
protected String compute() {
if (this.data.size() <= this.size) {
System.out.println("********* size:" + data.size());
deal(data);
} else {
// 细分成小任务
List<Task> tasks = new ArrayList<ForkJoinTest.Task>();
for (int index = 0; index * size < data.size(); index++) {
Task task;
if ((index + 1) * size > data.size()) {
task = new Task(data.subList(index * size, data.size()));
} else {
task = new Task(data.subList(index * size, (index + 1) * size));
}
task.fork();
tasks.add(task);
}
for (Task task : tasks) {
task.join();
}
}
return null;
}
}
}
本人工作用途:处理数据:
- 我一般用于读取mysql/mongo中的数据,通过多线程插入es中
- 或者读取es数据,通过多线程插入mysql,或者导出到csv等等