码迷,mamicode.com
首页 > 其他好文 > 详细

并发模型(二)——Master-Worker模式

时间:2017-11-02 01:01:38      阅读:185      评论:0      收藏:0      [点我收藏+]

标签:http   workqueue   main函数   汇总   public   hash   ble   exec   获取   

   Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。

一、什么是Master-Worker模式:

该模式的结构图:

技术分享

结构图:

技术分享

Worker:用于实际处理一个任务;

Master:任务的分配和最终结果的合成;

Main:启动程序,调度开启Master。

 

二、代码实现:

    下面的是一个简易的Master-Worker框架实现。

(1)Master部分:

package MasterWorker;  
  
import java.util.HashMap;  
import java.util.Map;  
import java.util.Queue;  
import java.util.concurrent.ConcurrentHashMap;  
import java.util.concurrent.ConcurrentLinkedQueue;  
  
public class Master {  
  
    //任务队列  
    protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();  
    //Worker进程队列  
    protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();  
    //子任务处理结果集  
    protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();  
    //是否所有的子任务都结束了  
    public boolean isComplete(){  
        for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
            if(entry.getValue().getState()!=Thread.State.TERMINATED){  
                return false;  
            }  
                  
        }  
        return true ;  
    }  
      
    //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量  
    public Master(Worker worker,int countWorker){  
          
        worker.setWorkQueue(workQueue);  
        worker.setResultMap(resultMap);  
        for(int i=0;i<countWorker;i++){  
            threadMap.put(Integer.toString(i),  new Thread(worker, Integer.toString(i)));  
        }  
          
    }  
      
    //提交一个任务  
    public void submit(Object job){  
        workQueue.add(job);  
    }  
      
      
    //返回子任务结果集  
    public Map<String ,Object> getResultMap(){  
        return resultMap;  
    }  
      
      
    //开始运行所有的Worker进程,进行处理  
    public  void execute(){  
         for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
             entry.getValue().start();  
               
         }  
    }  
      
      
}  

Worker进程实现:

package MasterWorker;  
  
import java.util.Map;  
import java.util.Queue;  
  
public class Worker  implements Runnable{  
  
    //任务队列,用于取得子任务  
    protected Queue<Object> workQueue;  
    //子任务处理结果集  
    protected Map<String ,Object> resultMap;  
    public void setWorkQueue(Queue<Object> workQueue){  
        this.workQueue= workQueue;  
    }  
      
    public void setResultMap(Map<String ,Object> resultMap){  
        this.resultMap=resultMap;  
    }  
    //子任务处理的逻辑,在子类中实现具体逻辑  
    public Object handle(Object input){  
        return input;  
    }  
      
      
    @Override  
    public void run() {  
          
        while(true){  
            //获取子任务  
            Object input= workQueue.poll();  
            if(input==null){  
                break;  
            }  
            //处理子任务  
            Object re = handle(input);  
            resultMap.put(Integer.toString(input.hashCode()), re);  
        }  
    }  
  
}  

运用这个小框架计算1——100的立方和,PlusWorker的实现:

package MasterWorker;  
  
public class PlusWorker extends Worker {  
  
    @Override  
    public Object handle(Object input) {  
          
        Integer i =(Integer)input;  
        return i*i*i;  
    }  
  
      
}  

进行计算的Main函数:

package MasterWorker;  
  
import java.util.Map;  
import java.util.Set;  
  
public class Main {  
  
      
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        //固定使用5个Worker,并指定Worker  
        Master m = new Master(new PlusWorker(), 5);  
        //提交100个子任务  
        for(int i=0;i<100;i++){  
            m.submit(i);  
        }  
        //开始计算  
        m.execute();  
        int re= 0;  
        //保存最终结算结果  
        Map<String ,Object> resultMap =m.getResultMap();  
          
        //不需要等待所有Worker都执行完成,即可开始计算最终结果  
        while(resultMap.size()>0 || !m.isComplete()){  
            Set<String> keys = resultMap.keySet();  
            String key =null;  
            for(String k:keys){  
                key=k;  
                break;  
            }  
            Integer i =null;  
            if(key!=null){  
                i=(Integer)resultMap.get(key);  
            }  
            if(i!=null){  
                //最终结果  
                re+=i;  
            }  
            if(key!=null){  
                //移除已经被计算过的项  
                resultMap.remove(key);  
            }  
              
        }  
          
  
    }  
  
}  

三、总结:

    Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。

 

并发模型(二)——Master-Worker模式

标签:http   workqueue   main函数   汇总   public   hash   ble   exec   获取   

原文地址:http://www.cnblogs.com/duan2/p/7769133.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!