多线程模式之MasterWorker模式

多线程模式之MasterWorker模式

Master-Worker模式的核心思想是,系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当Worker进程将各个子任务处理完成后,将结果返回给Master进程,由Master进程做归纳和汇总。

工作示意图如下图所示:



多线程模式之MasterWorker模式





它的优势在于将一个大的任务分解成一个个的子任务并行执行,提高程序执行效率。

代码示例:



Master进程类:

public class Master {

    protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();//存放子任务

    protected Map<String,Thread> threadMap = new HashMap<String,Thread>();//存放Worker线程

    protected Map<String,Object> resultMap = new ConcurrentHashMap<String, Object>();//存放返回结果集

    public boolean isCompleted(){

        for(Map.Entry<String,Thread> entry:threadMap.entrySet()){

            if(entry.getValue().getState() != Thread.State.TERMINATED){

                return false;

            }

        }

        return true;

    }

    public Master(Worker worker,int countWorker){

        worker.setWorkQueue(workQueue);

        worker.setResultMap(resultMap);

        for(int i=0;i<countWorker;i++){//创建多个Worker线程

            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));

        }

    }

    public void submit(Object job){

        workQueue.add(job);//将任务加到子任务队列中

    }

    public Map<String,Object> getResultMap(){

        return resultMap;

    }

    public void execute(){

//循环启动Worker线程

        for(Map.Entry<String,Thread> entry:threadMap.entrySet()){

           entry.getValue().start();

        }

    }



}





Worker进程类:



public class Worker implements  Runnable{

    protected Queue<Object> workQueue;

    protected Map<String,Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue){

        this.workQueue = workQueue;

    }

    public void setResultMap( Map<String,Object> resultMap){

        this.resultMap = resultMap;

    }

    public Object handle(Object input){

        return input;

    }

    public void run(){

        while(true){

            Object input = workQueue.poll();//从子任务中取任务执行

            if(input == null) break;

            Object re = handle(input);//这是一个模板方法模式

            resultMap.put(Integer.toString(input.hashCode()),re);

        }

    }

}





PlusWorker进程类:

public class PlusWorker extends Worker {

    public Object handle(Object input){

        Integer i = (Integer)input;

        return i*i*i;

    }

}



测试类:

public class Main {

    public static void  main(String[] args){

        Master m = new Master(new PlusWorker(),5);

        for(int i=0;i<100;i++)

            m.submit(i);

        m.execute();

        int re = 0;

        Map<String,Object> resultMap = m.getResultMap();

        while(resultMap.size() >0 || !m.isCompleted()){

            Set<String> keys = resultMap.keySet();

            String key = null;

            for(String k:keys){

                key = k;

                break;

            }

            Integer i = null;

            if(key != null)

                i = (Integer)resultMap.get(key);

            if(i != null)

                re += i;

            if(key != null)

                resultMap.remove(key);



        }

        System.out.println("re:"+re);

    }

}





Main不用等所有的Worker线程都计算完以后再执行,只要有计算完成的,它就一直计算。







《Java性能优化》 -葛一鸣

上一篇:jQuery实现导航监听事件


下一篇:linux下实用命令