标签:
/*具体工作对象*/
static abstract class Worker<T, R> implements Runnable {
private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);
protected Queue<T> workQueue;//持有Master的任务队列
protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符
public void setWorkQueue(Queue<T> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, R> resultMap) {
this.resultMap = resultMap;
}
public abstract R handler(T entity);
@Override
public void run() {
while (true) {
T childWork = workQueue.poll();
if (childWork == null) {
lg.e("已经没有任务在队列中等待执行");
break;
}
//处理子任务
R result = handler(childWork);
resultMap.put(Integer.toString(childWork.hashCode()), result);
}
}
}
public static class Master<T, R> {
private static final UtilsLog lg = UtilsLog.getLogger(Master.class);
protected Queue<T> workQueue;//用于存储任务集
protected Map<String, Thread> threadMap;//存储执行任务的线程集
protected Map<String, R> resultMap;//存储相关结果
@TargetApi(Build.VERSION_CODES.LOLLIPOP)
public Master(Worker<T, R> work, int threadCount) {
workQueue = new ConcurrentLinkedDeque<T>();
threadMap = new HashMap<>();
resultMap = new HashMap<>();
work.setWorkQueue(workQueue);
work.setResultMap(resultMap);
for (int i = 0; i < threadCount; i++) {
threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));
}
}
//是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
public Map<String, R> getResultMap() {
return resultMap;
}
public Master addJob(T job) {
workQueue.add(job);
return this;
}
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {
@Override
public Integer handler(Integer entity) {
int max = 50, min = 0;
UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作
lg.e("执行handler程序 with value:" + entity);
return entity * entity;
}
}, 3);
int jobCount = 10;//任务数
for (int i = 0; i < jobCount; i++) {
master.addJob(i);
}
master.execute();
Map<String, Integer> resultMap = master.getResultMap();
while (true) {
int resultMapSize = resultMap.size();
// lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变
if (master.isComplete()) {
break;
}
}

标签:
原文地址:http://www.cnblogs.com/linux007/p/5790490.html