标签:executors throw lock add find public get string 提交
//线程池创建,线程池提交任务使用Callable
int corePoolSize = 3; int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什么也不做,直接忽略 ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy); submitTisks(gatherDate, hbaseConfig, corePoolSize, threadPoolExecutor); /************************************使用Callable接口**************************************/ /** * 提交任务 * @param gatherDate * @param hbaseConfig * @param corePoolSize * @param threadPoolExecutor */ private static void submitTisks(String gatherDate, Configuration hbaseConfig, int corePoolSize, ExecutorService threadPoolExecutor) { List<Future<Boolean>> taskFutureList = new ArrayList<>(corePoolSize); for (int i = 0; i < corePoolSize; i++) { // 提交任务,任务的执行由线程池去调用执行并管理。 // 这里获取结果任务的Future,并放到list中,供所有任务提交完后,通过每个任务的Future判断执行状态和结果。 Future<Boolean> gpsfuture = threadPoolExecutor.submit(new GPSTask(hbaseConfig, gatherDate, xikangTidList, rootPath)); taskFutureList.add(gpsfuture); } int done = 0; //完成任务的数量 while (!taskFutureList.isEmpty()) { Iterator<Future<Boolean>> iter = taskFutureList.iterator(); while (iter.hasNext()) { Future<Boolean> fut = iter.next(); if (fut.isDone()) { try{ Boolean flag = fut.get(); if (flag){ done++;} }catch (Exception e){ e.printStackTrace(); } iter.remove(); } } // 停留一会,避免一直循环。 try { Thread.sleep(1000L); }catch (InterruptedException e){ e.printStackTrace(); } } } /** * gps线程 */ static class GPSTask implements Callable<Boolean> { Configuration config; String recordDate; List<Long> xkDatanoList; String rootPath; Thread currentThread; public GPSTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) { this.config = config; this.recordDate = recordDate; this.xkDatanoList = xkDatanoList; this.rootPath = rootPath; } @Override public Boolean call() throws Exception { this.currentThread = Thread.currentThread(); return FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread); } }
//线程池创建,线程池提交任务使用Thread
int corePoolSize = 3; int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); //什么也不做,直接忽略 ExecutorService threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue, policy); GPSRunTask gpsRunTask = new GPSRunTask(hbaseConfig, gatherDate, xikangTidList, rootPath); threadPoolExecutor.execute(gpsRunTask); try { gpsRunTask.join(); } catch (InterruptedException e) { e.printStackTrace(); } // 等待已提交的任务全部结束 不再接受新的任务 threadPoolExecutor.shutdown(); /** * gps线程 */ static class GPSRunTask extends Thread { Configuration config; String recordDate; List<Long> xkDatanoList; String rootPath; Thread currentThread; public GPSRunTask(Configuration config, String recordDate, List<Long> xkDatanoList,String rootPath) { this.config = config; this.recordDate = recordDate; this.xkDatanoList = xkDatanoList; this.rootPath = rootPath; } @Override public void run(){ this.currentThread = Thread.currentThread(); //logger.info("gps线程:"+currentThread.getName()); FindDataUtil.getData4HbaseGPSSQ(config, recordDate, xkDatanoList,rootPath,currentThread); } }
标签:executors throw lock add find public get string 提交
原文地址:https://www.cnblogs.com/zyanrong/p/13299641.html