Java并发编程之Master-Worker模式

  Masker-Worker的核心思想是有两类进程(Masker进程和Worker进程)协作完成任务。Masker进程负责接收和分配任务,Worker负责处理子任务,当各个Worker子进程完成任务后会将结果返回给Masker,由Masker做归纳和总结。其好处在于能将一个大任务分解成若干个小任务,并行执行,从而提供系统的吞吐量。

                      

Java并发编程之Master-Worker模式

      

Java并发编程之Master-Worker模式

这个模型主要用于主线程可以分为若干子线程的情形,各子线程之间不会相互影响。

 举个例子,这个例子是创建20个Worker去处理100个任务,每个任务是在0-1000的范围内随即一个整数,最后将这些数字相加。分别有Main,Masker,Worker,Task这四个类

1 import java.util.Random;
 2 
 3 public class Main {
 4 
 5     public static void main(String[] args) {
 6         
 7         Master master = new Master(new Worker(), 20);//生成20个work去处理这个任务
 8         
 9         Random r = new Random();
10         for(int i = 1; i <= 100; i++){
11             Task t = new Task();
12             t.setId(i);
13             t.setPrice(r.nextInt(1000));
14             master.submit(t);
15         }
16         master.execute();
17         long start = System.currentTimeMillis();
18         
19         while(true){
20             if(master.isComplete()){
21                 long end = System.currentTimeMillis() - start;
22                 int priceResult = master.getResult();
23                 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
24                 break;
25             }
26         }
27         
28     }
29 }

Main

1 import java.util.HashMap;
 2 import java.util.Map;
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5 
 6 public class Master {
 7 
 8     //1 有一个盛放任务的容器
 9     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
10     
11     //2 需要有一个盛放worker的集合
12     private HashMap<String, Thread> workers = new HashMap<String, Thread>();
13     
14     //3 需要有一个盛放每一个worker执行任务的结果集合
15     private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
16     
17     //4 构造方法
18     public Master(Worker worker , int workerCount){
19         worker.setWorkQueue(this.workQueue);    //因为是消费workQueue里面的数据,所以workQueue放进去
20         worker.setResultMap(this.resultMap);    //消费完之后要把结果集返回给Masker,所以要有resultMap应的引用
21         
22         for(int i = 0; i < workerCount; i ++){
23             this.workers.put(Integer.toString(i), new Thread(worker));//创建一个线程并对它起个名字用i来表示
24         }
25         
26     }
27     
28     //5 需要一个提交任务的方法
29     public void submit(Task task){
30         this.workQueue.add(task);
31     }
32     
33     //6 需要有一个执行的方法,启动所有的worker方法去执行任务
34     public void execute(){
35         for(Map.Entry<String, Thread> me : workers.entrySet()){
36             me.getValue().start();    //循环这已经装好的Works,让它们都起动起来
37         }
38     }
39 
40     //7 判断是否运行结束的方法
41     public boolean isComplete() {
42         for(Map.Entry<String, Thread> me : workers.entrySet()){//循环这写Works线程,判断其状态
43             if(me.getValue().getState() != Thread.State.TERMINATED){
44                 return false;
45             }
46         }    
47         return true;
48     }
49 
50     //8 计算结果方法
51     public int getResult() {
52         int priceResult = 0;
53         for(Map.Entry<String, Object> me : resultMap.entrySet()){
54             priceResult += (Integer)me.getValue();
55         }
56         
57         return priceResult;
58         
59     }
60     
61     
62     

Masker
1 import java.util.concurrent.ConcurrentHashMap;
 2 import java.util.concurrent.ConcurrentLinkedQueue;
 3 
 4 public class Worker implements Runnable {
 5 
 6     private ConcurrentLinkedQueue<Task> workQueue;
 7     private ConcurrentHashMap<String, Object> resultMap;
 8     
 9     public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
10         this.workQueue = workQueue;
11     }
12 
13     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
14         this.resultMap = resultMap;
15     }
16     
17     @Override
18     public void run() {
19         while(true){
20             Task input = this.workQueue.poll();//不断地从Mask的workQueue将任务取出来
21             if(input == null) break;   //为空的话证明它已经消费完了
22             Object output = handle(input);//否则交给handle方法进行处理
23             this.resultMap.put(Integer.toString(input.getId()), output);//将任务的id,和结果集放到resultMap里
24         }
25     }
26 
27     private Object handle(Task input) {
28         Object output = null;
29         try {
30             //处理任务的耗时。。 比如说进行操作数据库。。。
31             Thread.sleep(500);
32             output = input.getPrice();//得到处理完的结果
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         }
36         return output;
37     }
38 
39 
40 
41 }

Worker

1 public class Task {
 2 
 3     private int id;
 4     private int price ;
 5     public int getId() {
 6         return id;
 7     }
 8     public void setId(int id) {
 9         this.id = id;
10     }
11     public int getPrice() {
12         return price;
13     }
14     public void setPrice(int price) {
15         this.price = price;
16     } 
17     
18 }

Task

运行结果为:

Java并发编程之Master-Worker模式

执行时间是每个任务休眠0.5s乘以100个任务,除以20个Worker,为2.5s。创建的Worker数并不是越大越好,因为创建Worker也需要花费时间。

上一篇:基于多态的职工管理系统


下一篇:nginx 的配置,避免工作进程过多以及内存过高导致系统内存耗尽