java并发API从java7提供了并发多阶段任务的抽象Phaser。如果我们有并发任务且需要分段执行的时候
我们可以考虑Phaser这个类。
Phaser有一个特别的地方,即不需要处理InterruptedException除了(awaitAdvanceInterruptibly(int phaser)方法)
我们模拟一种场景:
我们有三个任务,分别从三个不同的文件夹及其子文件夹查找过去24小时内修改过的扩展名
为.log的文件。这三个任务分别包含以下三个步骤:
1、在指定的文件夹下筛选出扩展名为.log的文件;
2、对第一步的结果进行过滤,去除修改时间超过24小时的文件;
3、将结果打印至控制台;
我们借助Phaser完成个任务的定义。
我们定义了两个类:
FileSearcher
Core
FileSearcher是任务的执行类;
Core是程序的入口
代码如下:
package com.ali.concurrency.phaser; import java.io.File; import java.util.Date; import java.util.List; import java.util.ArrayList; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; public class FileSearcher implements Runnable{ private final String dir; private final String end; private List<File> results = new ArrayList<File>(); private final Phaser controller; public FileSearcher(String dir, String end, Phaser controller){ this.dir = dir; this.end = end; this.controller = controller; } @Override public void run() { <u>controller.arriveAndAwaitAdvance();</u> System.out.println(Thread.currentThread().getName() + "start..."); File root = new File(dir); if(root.isDirectory()){ dirProcess(dir); } if(!checkResult()) return; filterFile(); if(!checkResult()) return; showInfo(); <u>controller.arriveAndDeregister();</u> System.out.println(Thread.currentThread().getName() + " Work comeleted."); } private void showInfo(){ for(File f : results){ System.out.println(Thread.currentThread().getName() + " : " + f.getAbsolutePath()); } <u>controller.arriveAndAwaitAdvance();</u> } private boolean checkResult(){ if(results.size() == 0){ System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : 0 results"); System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : End "); <u>controller.arriveAndDeregister();</u> return false; }else{ System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : " + results.size() + "results"); <u>controller.arriveAndAwaitAdvance();</u> return true; } } private void dirProcess(String dir){ <u>controller.arriveAndAwaitAdvance();</u> File dirFile = new File(dir); File[] dirFileArr = dirFile.listFiles(); for(int i = 0; i < dirFileArr.length; i ++){ if(dirFileArr[i].isFile()){ fileProcess(dirFileArr[i]); }else if(dirFileArr[i].isDirectory()){ dirProcess(dirFileArr[i].getPath()); } } } private void fileProcess(File f){ if(f.getName().endsWith(end)){ results.add(f); } } private void filterFile(){ List<File> newResults = new ArrayList<File>(); Date now = new Date(); for(File f : results){ if(f.lastModified() - now.getTime() < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)){ newResults.add(f); } } results = newResults; } }
package com.ali.concurrency.phaser; import java.util.concurrent.Phaser; public class Core { public static void main(String[] args){ Phaser controller = new Phaser(3); FileSearcher system = new FileSearcher("C:\\testfile\\1", "log", controller); FileSearcher apps = new FileSearcher("C:\\testfile\\2", "log", controller); FileSearcher documents = new FileSearcher("C:\\testfile\\3", "log", controller); Thread systemThread = new Thread(system,"System"); Thread appsThread = new Thread(apps,"apps"); Thread documentsThread = new Thread(documents,"documents"); systemThread.start(); appsThread.start(); documentsThread.start(); try{ systemThread.join(); appsThread.join(); documentsThread.join(); }catch(InterruptedException e){ e.printStackTrace(); } System.out.println("Terminated: " + controller.isTerminated()); } }
documentsstart... appsstart... Systemstart... documents Phaser 2 : 0 results documents Phaser 2 : End apps Phaser 2 : 0 results apps Phaser 2 : End System Phaser 2 : 4results System Phaser 3 : 4results System : C:\testfile\1\1_333.log System : C:\testfile\1\1_456.log System : C:\testfile\1\1_666.log System : C:\testfile\1\1_777.log System Work comeleted. Terminated: true
程序先在第一阶段等待,目的是等待所有线程开始;
然后执行dirProcess筛选出.log类型的文件,然后checkResult
接着filterFile过滤掉空结果的任务
然后checkResult
接着showInfo,其中showInfo在打印后再次等待
接着任务从phaser中注销
打印终止信息为true。
一个Phaser有两种状态:
1、活跃态(Active),当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
在这种状态中,Phaser的执行如前文所述。
2、终止态(Termination),当所有参与同步的线程都取消注册的时候,Phaser就处于终止态。在这种状态下Phaser没有任何参与者。
更具体的说,当onAdvance()方法返回true的时候,就处于了终止态。通过覆盖这个方法可以改变默认的行为。当Phaser处于
终止态的时候,同步方法arriveAndAwaitAdvance()方法会立即返回,而且不会做任何的同步操作。
其他方法请参考java7API
原文地址:http://blog.csdn.net/u014783753/article/details/44944975