码迷,mamicode.com
首页 > 其他好文 > 详细

并发多阶段任务的执行——Phaser

时间:2015-04-09 09:05:34      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:并发多阶段任务   phaser   java并发   并发   


java并发API从java7提供了并发多阶段任务的抽象Phaser。如果我们有并发任务且需要分段执行的时候

我们可以考虑Phaser这个类。

Phaser有一个特别的地方,即不需要处理InterruptedException除了(awaitAdvanceInterruptibly(int phaser)方法)


我们模拟一种场景:

我们有三个任务,分别从三个不同的文件夹及其子文件夹查找过去24小时内修改过的扩展名

为.log的文件。这三个任务分别包含以下三个步骤:

1、在指定的文件夹下筛选出扩展名为.log的文件;

2、对第一步的结果进行过滤,去除修改时间超过24小时的文件;

3、将结果打印至控制台;


我们借助Phaser完成个任务的定义。

我们定义了两个类:

FileSearcher

Core

FileSearcher是任务的执行类;

Core是程序的入口

代码如下:


package com.ali.concurrency.phaser;

import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class FileSearcher implements Runnable{
	
	private final String dir;
	
	private final String end;
	
	private List<File> results = new ArrayList<File>();
	
	private final Phaser controller;
	
	public FileSearcher(String dir, String end, Phaser controller){
		
		this.dir = dir;
		
		this.end = end;
		
		this.controller = controller;
		
	}

	@Override
	public void run() {
		
		<u>controller.arriveAndAwaitAdvance();</u>
		
		System.out.println(Thread.currentThread().getName() + "start...");
		
		File root = new File(dir);
		
		if(root.isDirectory()){
			
			dirProcess(dir);
			
		}

		if(!checkResult()) return;
		
		
		filterFile();
		
		if(!checkResult()) return;
		
		showInfo();
		
		<u>controller.arriveAndDeregister();</u>
		
		System.out.println(Thread.currentThread().getName() + " Work comeleted.");
		
		
	}
	
	private void showInfo(){
		
		for(File f : results){
			
			System.out.println(Thread.currentThread().getName() + " : " + f.getAbsolutePath());
			
		}
		
		<u>controller.arriveAndAwaitAdvance();</u>
		
	}
	
	private boolean checkResult(){
		
		if(results.size() == 0){
			System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : 0 results");
			System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : End ");
			<u>controller.arriveAndDeregister();</u>
			return false;
		}else{
			System.out.println(Thread.currentThread().getName() + " Phaser " + controller.getPhase() + " : " + results.size() + "results");
			<u>controller.arriveAndAwaitAdvance();</u>
			return true;
		}
		
	}
	
	private void dirProcess(String dir){
		
		<u>controller.arriveAndAwaitAdvance();</u>
		File dirFile = new File(dir);
		File[] dirFileArr = dirFile.listFiles();
		for(int i = 0; i < dirFileArr.length; i ++){
			
			if(dirFileArr[i].isFile()){
				fileProcess(dirFileArr[i]);
			}else if(dirFileArr[i].isDirectory()){
				dirProcess(dirFileArr[i].getPath());
			}
			
		}
		
	}
	
	private void fileProcess(File f){
		
		if(f.getName().endsWith(end)){
			results.add(f);
		}
		
	}
	
	private void filterFile(){
		
		List<File> newResults = new ArrayList<File>();
		
		Date now = new Date();
		
		for(File f : results){
			
			if(f.lastModified() - now.getTime() < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)){
				newResults.add(f);
			}
			
		}
		
		results = newResults;
		
	}
	
}

package com.ali.concurrency.phaser;

import java.util.concurrent.Phaser;

public class Core {

	public static void main(String[] args){
		
		Phaser controller = new Phaser(3);
		
		FileSearcher system = new FileSearcher("C:\\testfile\\1", "log", controller);
		FileSearcher apps = new FileSearcher("C:\\testfile\\2", "log", controller);
		FileSearcher documents = new FileSearcher("C:\\testfile\\3", "log", controller);
		
		Thread systemThread = new Thread(system,"System");
		Thread appsThread  = new Thread(apps,"apps");
		Thread documentsThread = new Thread(documents,"documents");
		
		systemThread.start();
		appsThread.start();
		documentsThread.start();
		
		try{
			systemThread.join();
			appsThread.join();
			documentsThread.join();
		}catch(InterruptedException e){
			e.printStackTrace();
		}
		
		System.out.println("Terminated: " + controller.isTerminated());
		
		
		
	}
	
}


执行结果:

documentsstart...
appsstart...
Systemstart...
documents Phaser 2 : 0 results
documents Phaser 2 : End 
apps Phaser 2 : 0 results
apps Phaser 2 : End 
System Phaser 2 : 4results
System Phaser 3 : 4results
System : C:\testfile\1\1_333.log
System : C:\testfile\1\1_456.log
System : C:\testfile\1\1_666.log
System : C:\testfile\1\1_777.log
System Work comeleted.
Terminated: true


我们从run方法中理逻辑:

程序先在第一阶段等待,目的是等待所有线程开始;

然后执行dirProcess筛选出.log类型的文件,然后checkResult

接着filterFile过滤掉空结果的任务

然后checkResult

接着showInfo,其中showInfo在打印后再次等待

接着任务从phaser中注销

打印终止信息为true。


一个Phaser有两种状态:

1、活跃态(Active),当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。

在这种状态中,Phaser的执行如前文所述。

2、终止态(Termination),当所有参与同步的线程都取消注册的时候,Phaser就处于终止态。在这种状态下Phaser没有任何参与者。

更具体的说,当onAdvance()方法返回true的时候,就处于了终止态。通过覆盖这个方法可以改变默认的行为。当Phaser处于

终止态的时候,同步方法arriveAndAwaitAdvance()方法会立即返回,而且不会做任何的同步操作。

其他方法请参考java7API

并发多阶段任务的执行——Phaser

标签:并发多阶段任务   phaser   java并发   并发   

原文地址:http://blog.csdn.net/u014783753/article/details/44944975

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!