标签:blog io ar os java on 文件 数据 div
在处理大数据文件时,利用"生产者-消费者"线程模型进行处理,代码实现如下:
/** * 文件处理类 * */ public class FileProcessor { /**读取文件的路径*/ private String path = ""; /**指定默认工作队列的大小*/ public static final int MAXWORKQUEUESIZE = 2 << 12; /**工作线程队列*/ private BlockingQueue<Runnable> workQueue = null; /**数据处理线程池*/ private ThreadPoolExecutor excutor = null; public FileProcessor(String file) { this.path = file; workQueue = new LinkedBlockingQueue<Runnable>(MAXWORKQUEUESIZE); excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue); } public FileProcessor(BlockingQueue<Runnable> workQueue,String file) { this.path = file; excutor = new ThreadPoolExecutor(10 , 15 , 5 * 60 * 1000L , TimeUnit.MILLISECONDS, workQueue); } public void process() { /**开启文件读取线程*/ FileReaderProcessor fileReaderProcessor = new FileReaderProcessor(path,excutor); excutor.execute(fileReaderProcessor); } public static void main(String []args) { FileProcessor proc = new FileProcessor("D://test"); proc.process(); } } /********************************************************/ /**读取文件线程*/ public class FileReaderProcessor implements Runnable { /**读取文件路径*/ private String path = ""; private ThreadPoolExecutor excutor = null; public FileReaderProcessor(String file, ThreadPoolExecutor excutor) { this.path = file; this.excutor = excutor; } @Override public void run() { // TODO Auto-generated method stub FileReader reader = null; BufferedReader br = null; int lineNumber = 0; try { reader = new FileReader(path); br = new BufferedReader(reader); String str = null; while((str = br.readLine()) != null) { ++lineNumber; System.out.println("[" + Thread.currentThread().getName() + "] read " + lineNumber + " rows"); /**防止读入过快,导致工作队列已满无法接受任务,则超过工作队列0.75时,暂停提交*/ if(excutor.getQueue().size() >= FileProcessor.MAXWORKQUEUESIZE * 0.75) { System.out.println("[" + Thread.currentThread().getName() + "] sleep 5 seconds"); TimeUnit.SECONDS.sleep(5); /**休眠五秒中*/ } excutor.submit(new DateHandlerProcessor(str)); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block System.out.println("File Not Find Error : " + e.getMessage()); } catch (IOException e) { // TODO Auto-generated catch block System.out.println("Read File Io Error : " + e.getMessage()); } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("Thread Interrupt Error : " + e.getMessage()); } finally { /**关闭资源*/ this.close(br, reader, excutor); } } public void close(BufferedReader br, FileReader reader, ThreadPoolExecutor executor) { try { if(br != null) { br.close(); } if(reader != null) { reader.close(); } /**关闭线程池*/ while(excutor.getQueue().size() != 0) { TimeUnit.SECONDS.sleep(1); } excutor.shutdown(); if(!excutor.awaitTermination(5 * 60 * 1000L, TimeUnit.MILLISECONDS)) { excutor.shutdownNow(); } } catch(Exception e) { System.out.println("Close Error : " + e.getMessage()); } } } /*********************************************************/ /**数据处理类*/ public class DateHandlerProcessor implements Runnable { /**处理文件一行内容*/ private String line = ""; public DateHandlerProcessor(String line) { this.line = line; } @Override public void run() { // TODO Auto-generated method stub try { System.out.println("Thread[" + Thread.currentThread().getName() + "] Get Line " + line); } catch (Exception e) { // TODO Auto-generated catch block System.out.println("Thread[" + Thread.currentThread().getName() + "] Interrupt : " + e.getMessage()); } } }
标签:blog io ar os java on 文件 数据 div
原文地址:http://www.cnblogs.com/hanfight/p/4146158.html