码迷,mamicode.com
首页 > 其他好文 > 详细

大数据文件处理

时间:2014-12-06 00:03:30      阅读:252      评论:0      收藏:0      [点我收藏+]

标签:blog   io   ar   os   java   on   文件   数据   div   

在处理大数据文件时,利用"生产者-消费者"线程模型进行处理,代码实现如下:

/**
 * 文件处理类
 * */
public class FileProcessor {
	
	/**读取文件的路径*/
	private String path = "";
	
	/**指定默认工作队列的大小*/
	public static final int MAXWORKQUEUESIZE = 2 << 12;
	
	/**工作线程队列*/
	private BlockingQueue<Runnable> workQueue = null; 
	
	/**数据处理线程池*/
	private ThreadPoolExecutor excutor = null;
	
	public FileProcessor(String file) {
		this.path = file;
		workQueue = new LinkedBlockingQueue<Runnable>(MAXWORKQUEUESIZE);
		excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue);
	}
	
	public FileProcessor(BlockingQueue<Runnable> workQueue,String file) {
		this.path = file;
		excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue);
	}
	
	public void process() {
		/**开启文件读取线程*/
		FileReaderProcessor fileReaderProcessor = new FileReaderProcessor(path,excutor);
		excutor.execute(fileReaderProcessor);
	}
	
	public static void main(String []args) {
		FileProcessor proc = new FileProcessor("D://test");
		proc.process();
	}
	
}

/********************************************************/
/**读取文件线程*/
public class FileReaderProcessor implements Runnable {

	/**读取文件路径*/
	private String path = "";
	
	private ThreadPoolExecutor excutor = null;
	
	public FileReaderProcessor(String file, ThreadPoolExecutor excutor) {
		this.path = file;
		this.excutor = excutor;
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		FileReader reader = null;
		BufferedReader br = null;
		int lineNumber = 0;
		try {
			reader = new FileReader(path);
			br = new BufferedReader(reader);
			String str = null;
			while((str = br.readLine()) != null) {
				++lineNumber;
				System.out.println("[" + Thread.currentThread().getName() + "] read " + lineNumber + " rows");
				/**防止读入过快,导致工作队列已满无法接受任务,则超过工作队列0.75时,暂停提交*/
				if(excutor.getQueue().size() >= FileProcessor.MAXWORKQUEUESIZE * 0.75) {
					System.out.println("[" + Thread.currentThread().getName() + "] sleep 5 seconds");
					TimeUnit.SECONDS.sleep(5); /**休眠五秒中*/
				}
				excutor.submit(new DateHandlerProcessor(str));
			}
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			System.out.println("File Not Find Error : " + e.getMessage());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			System.out.println("Read File Io Error : " + e.getMessage());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("Thread Interrupt Error : " + e.getMessage());
		} finally {
			/**关闭资源*/
			this.close(br, reader, excutor);
		}
	}

	public void close(BufferedReader br, FileReader reader, ThreadPoolExecutor executor) {
		try {
			if(br != null) {
				br.close();
			}
			if(reader != null) {
				reader.close();
			}
			/**关闭线程池*/
			while(excutor.getQueue().size() != 0) {
				TimeUnit.SECONDS.sleep(1);
			}
			excutor.shutdown();
			if(!excutor.awaitTermination(5 * 60 * 1000L, TimeUnit.MILLISECONDS)) {
				excutor.shutdownNow();
			}
		} catch(Exception e) {
			System.out.println("Close Error : " + e.getMessage());
		}
	}
	
}
/*********************************************************/
/**数据处理类*/
public class DateHandlerProcessor implements Runnable {

	/**处理文件一行内容*/
	private String line = "";
	
	public DateHandlerProcessor(String line) {
		this.line = line;
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			System.out.println("Thread[" + Thread.currentThread().getName() + "] Get Line " +  line);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			System.out.println("Thread[" + Thread.currentThread().getName() + "] Interrupt : " + e.getMessage());
		}
	}

}

  

大数据文件处理

标签:blog   io   ar   os   java   on   文件   数据   div   

原文地址:http://www.cnblogs.com/hanfight/p/4146158.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!