Masker-Worker的核心思想是有两类进程(Masker进程和Worker进程)协作完成任务。Masker进程负责接收和分配任务,Worker负责处理子任务,当各个Worker子进程完成任务后会将结果返回给Masker,由Masker做归纳和总结。其好处在于能将一个大任务分解成若干个小任务,并行执行,从而提供系统的吞吐量。
这个模型主要用于主线程可以分为若干子线程的情形,各子线程之间不会相互影响。
举个例子,这个例子是创建20个Worker去处理100个任务,每个任务是在0-1000的范围内随即一个整数,最后将这些数字相加。分别有Main,Masker,Worker,Task这四个类
1 import java.util.Random;
2
3 public class Main {
4
5 public static void main(String[] args) {
6
7 Master master = new Master(new Worker(), 20);//生成20个work去处理这个任务
8
9 Random r = new Random();
10 for(int i = 1; i <= 100; i++){
11 Task t = new Task();
12 t.setId(i);
13 t.setPrice(r.nextInt(1000));
14 master.submit(t);
15 }
16 master.execute();
17 long start = System.currentTimeMillis();
18
19 while(true){
20 if(master.isComplete()){
21 long end = System.currentTimeMillis() - start;
22 int priceResult = master.getResult();
23 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
24 break;
25 }
26 }
27
28 }
29 }
Main
1 import java.util.HashMap;
2 import java.util.Map;
3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.ConcurrentLinkedQueue;
5
6 public class Master {
7
8 //1 有一个盛放任务的容器
9 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
10
11 //2 需要有一个盛放worker的集合
12 private HashMap<String, Thread> workers = new HashMap<String, Thread>();
13
14 //3 需要有一个盛放每一个worker执行任务的结果集合
15 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
16
17 //4 构造方法
18 public Master(Worker worker , int workerCount){
19 worker.setWorkQueue(this.workQueue); //因为是消费workQueue里面的数据,所以workQueue放进去
20 worker.setResultMap(this.resultMap); //消费完之后要把结果集返回给Masker,所以要有resultMap应的引用
21
22 for(int i = 0; i < workerCount; i ++){
23 this.workers.put(Integer.toString(i), new Thread(worker));//创建一个线程并对它起个名字用i来表示
24 }
25
26 }
27
28 //5 需要一个提交任务的方法
29 public void submit(Task task){
30 this.workQueue.add(task);
31 }
32
33 //6 需要有一个执行的方法,启动所有的worker方法去执行任务
34 public void execute(){
35 for(Map.Entry<String, Thread> me : workers.entrySet()){
36 me.getValue().start(); //循环这已经装好的Works,让它们都起动起来
37 }
38 }
39
40 //7 判断是否运行结束的方法
41 public boolean isComplete() {
42 for(Map.Entry<String, Thread> me : workers.entrySet()){//循环这写Works线程,判断其状态
43 if(me.getValue().getState() != Thread.State.TERMINATED){
44 return false;
45 }
46 }
47 return true;
48 }
49
50 //8 计算结果方法
51 public int getResult() {
52 int priceResult = 0;
53 for(Map.Entry<String, Object> me : resultMap.entrySet()){
54 priceResult += (Integer)me.getValue();
55 }
56
57 return priceResult;
58
59 }
60
61
62
Masker
1 import java.util.concurrent.ConcurrentHashMap;
2 import java.util.concurrent.ConcurrentLinkedQueue;
3
4 public class Worker implements Runnable {
5
6 private ConcurrentLinkedQueue<Task> workQueue;
7 private ConcurrentHashMap<String, Object> resultMap;
8
9 public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
10 this.workQueue = workQueue;
11 }
12
13 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
14 this.resultMap = resultMap;
15 }
16
17 @Override
18 public void run() {
19 while(true){
20 Task input = this.workQueue.poll();//不断地从Mask的workQueue将任务取出来
21 if(input == null) break; //为空的话证明它已经消费完了
22 Object output = handle(input);//否则交给handle方法进行处理
23 this.resultMap.put(Integer.toString(input.getId()), output);//将任务的id,和结果集放到resultMap里
24 }
25 }
26
27 private Object handle(Task input) {
28 Object output = null;
29 try {
30 //处理任务的耗时。。 比如说进行操作数据库。。。
31 Thread.sleep(500);
32 output = input.getPrice();//得到处理完的结果
33 } catch (InterruptedException e) {
34 e.printStackTrace();
35 }
36 return output;
37 }
38
39
40
41 }
Worker
1 public class Task {
2
3 private int id;
4 private int price ;
5 public int getId() {
6 return id;
7 }
8 public void setId(int id) {
9 this.id = id;
10 }
11 public int getPrice() {
12 return price;
13 }
14 public void setPrice(int price) {
15 this.price = price;
16 }
17
18 }
Task
运行结果为:
执行时间是每个任务休眠0.5s乘以100个任务,除以20个Worker,为2.5s。创建的Worker数并不是越大越好,因为创建Worker也需要花费时间。