1 public class Task { 2 3 private int id; 4 private int price ; 5 public int getId() { 6 return id; 7 } 8 public void setId(int id) { 9 this.id = id; 10 } 11 public int getPrice() { 12 return price; 13 } 14 public void setPrice(int price) { 15 this.price = price; 16 } 17 18 } 19 20 21 public class Master { 22 23 //1 有一个盛放任务的容器 24 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 25 26 //2 需要有一个盛放worker的集合 27 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); 28 29 //3 需要有一个盛放每一个worker执行任务的结果集合 30 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); 31 32 /** 33 * 4 构造方法 34 * @param worker 执行任务的对象 35 * @param workerCount 子任务的大小 36 */ 37 public Master(Worker worker , int workerCount) { 38 //每一个Worker对象都需要有Master的引用 39 worker.setWorkQueue(this.workQueue);//workQueue用于任务的领取 40 worker.setResultMap(this.resultMap);//resultMap用于任务的提交 41 42 for (int i = 0; i < workerCount; i ++) { 43 //Key表示每一个Worker的名字,value表示线程执行对象 44 this.workers.put("子任务" + Integer.toString(i), new Thread(worker)); 45 } 46 47 } 48 49 //5 需要一个提交任务的方法 50 public void submit(Task task) { 51 this.workQueue.add(task); 52 } 53 54 //6 需要有一个执行的方法,启动所有的worker方法去执行任务 55 public void execute() { 56 for(Map.Entry<String, Thread> me : workers.entrySet()) { 57 me.getValue().start(); 58 } 59 } 60 61 //7 判断是否运行结束的方法 62 public boolean isComplete() { 63 for(Map.Entry<String, Thread> me : workers.entrySet()){ 64 if(me.getValue().getState() != Thread.State.TERMINATED){ 65 return false; 66 } 67 } 68 return true; 69 } 70 71 //8 计算结果方法 72 public int getResult() { 73 int priceResult = 0; 74 for (Map.Entry<String, Object> me : resultMap.entrySet()) { 75 priceResult += (Integer)me.getValue(); 76 } 77 return priceResult; 78 } 79 80 } 81 82 83 public class Worker implements Runnable { 84 85 private ConcurrentLinkedQueue<Task> workQueue; 86 private ConcurrentHashMap<String, Object> resultMap; 87 88 public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { 89 this.workQueue = workQueue; 90 } 91 92 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 93 this.resultMap = resultMap; 94 } 95 96 @Override 97 public void run() { 98 while(true){ 99 Task input = this.workQueue.poll(); 100 if (input == null) break; 101 //真正的去做业务处理 102 Object output = handle(input); 103 this.resultMap.put(Integer.toString(input.getId()), output); 104 } 105 } 106 107 private Object handle(Task input) { 108 Object output = null; 109 try { 110 //处理任务的耗时。。 比如说进行操作数据库。。。 111 Thread.sleep(500); 112 output = input.getPrice(); 113 } catch (InterruptedException e) { 114 e.printStackTrace(); 115 } 116 return output; 117 } 118 119 } 120 121 122 public class Main { 123 124 public static void main(String[] args) { 125 126 Master master = new Master(new Worker(), 20); 127 128 Random r = new Random(); 129 for(int i = 1; i <= 100; i++){ 130 Task t = new Task(); 131 t.setId(i); 132 t.setPrice(r.nextInt(1000)); 133 master.submit(t); 134 } 135 master.execute(); 136 long start = System.currentTimeMillis(); 137 138 while(true){ 139 if(master.isComplete()){ 140 long end = System.currentTimeMillis() - start; 141 int priceResult = master.getResult(); 142 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end); 143 break; 144 } 145 } 146 147 } 148 }