标签:线程 i++ 服务 ons stat hand service forkjoin execution
两种创建线程池的方法
ThreadPoolExecutor 和 FokJoinPool
1.ThreadPoolExecutor 创建线程池
1) 无返回值的情况
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 无返回值的线程池 */ public class ThreadPoolTest { /** * 任务先拿corePoolSize里面的 2个线程,然后往 workQueue里面放两个,然后往这里面放 maxPoolSize - corePoolSize = 2个 * 这样就达到了线程的饱和数,其他的任务来了没有线程执行,阻塞队列也满了所以其他任务都会被拒绝 * */ public static void main(String[] args) { //核心线程池的大小 int corePoolSize = 2; // 线程池的最大线程数 int maxPoolSize = 4; //线程最大空闲数 long keepAliveTime = 2; //时间单位 TimeUnit unit = TimeUnit.SECONDS; //阻塞对垒 容量为2 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); //线程创建工厂 ThreadFactory threadFactory = new NameThreadFactory(); //线程池拒绝策略 RejectedExecutionHandler handler = new MyIgnorePolicy(); ThreadPoolExecutor executor = null; try{ //推荐的使用创建线程池方式 //不推荐使用现成的api创建,会默认最大线程事2的31次方,容易导致服务器挂 executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); //预开启所有的核心线程 提升效率 executor.prestartAllCoreThreads(); //任务数量 int count = 12; for (int i = 1; i <= count; i++) { //执行任务 RunnableTask task = new RunnableTask(String.valueOf(i)); executor.submit(task); System.out.println("i::"+i+" active::"+executor.getActiveCount()+" core::"+executor.getCorePoolSize()+" size::"+executor.getPoolSize()+" queue::"+executor.getQueue().size()); System.out.println(); } }finally{ if(executor !=null){ executor.shutdown(); } } } /** * @Description 线程工厂 * @Param */ static class NameThreadFactory implements ThreadFactory { //线程id private final AtomicInteger threadId = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r,"线程—"+threadId.getAndIncrement()); System.out.println(t.getName()+" 已经被创建"); return t; } } /** * @Description 线程池拒绝策略 * @Param */ static class MyIgnorePolicy implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //打印被拒绝的任务 System.out.println(r.toString() + " 被拒绝"); } } static class RunnableTask implements Runnable{ private String name ; RunnableTask(String name ){ this.name = name ; } @Override public void run() { System.out.println(this.toString() + " is running!"); // 让任务执行慢点 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return "RunnableTask [name = "+ name + "]"; } } }
2)有返回值的情况
import java.util.concurrent.*; /** * 有返回值的线程池 */ public class ThreadPoolCallableTest { public static void main(String[] args) throws ExecutionException { ExecutorService executorService = null ; int cout = 10 ; try{ //不推荐这种创建方式,这是默认创建线程池,默认线程数事2的31次方 executorService = Executors.newCachedThreadPool(); //保存执行完成的结果数据 CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService); for(int i = 1 ; i<= cout ;i++){ FactoryCalculator factoryCalculator = new FactoryCalculator(i); completionService.submit(factoryCalculator); } for(int i = 1 ; i<= cout ;i++){ Future<Object> future = completionService.take(); System.out.println(future.get()); } }catch (InterruptedException e) { e.printStackTrace(); }finally{ if(executorService != null ){ executorService.shutdown(); } } } static class FactoryCalculator implements Callable { Object val = null; public FactoryCalculator(Object val){ this.val = val; } @Override public Object call() throws Exception { return Thread.currentThread().getName() + "::"+val; } } }
2.ForkJoinPool 创建线程池
1)无返回值的情况
import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * 无返回值 */ public class ForkJoinPoolAction { private static Set<Integer> resultSet = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws InterruptedException { //打印1-3000 的数字 PrintTask task = new PrintTask(0,3000); //穿见线程池,这里线程数默认为服务器当前处理器数 ForkJoinPool pool = new ForkJoinPool(); pool.submit(task); // 线程阻塞 等待所有的任务完成,这里有个等待时间 pool.awaitTermination(2,TimeUnit.SECONDS); System.out.printf("result_set 的大小 = %s",resultSet.size()); pool.shutdown(); } static class PrintTask extends RecursiveAction { private int start ; private int end; private static final int THRESHOLD = 50 ; @Override protected void compute() { //小于50 就直接将50个数字放集合里 if(end -start < THRESHOLD){ for(int i = start; i < end ; i++ ){ ForkJoinPoolAction.resultSet.add(i); System.out.println(Thread.currentThread().getName()+ "的i 值 \t" + i); } } //如果大约50就递归切分 else{ //递归切分 int mid = (start+end )/2; PrintTask leftTask = new PrintTask(start,mid); PrintTask rightTask = new PrintTask(mid,end); leftTask.fork(); rightTask.fork(); } } public PrintTask(int start,int end){ this.start = start; this.end = end; } } }
2)有返回值的情况
import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * 有返回的情况 */ public class ForkJoinPoolTask { public static void main(String[] args) throws ExecutionException, InterruptedException { //需求: 对长度为10000的元素数组进行累加 int[] nums = new int[10000]; Random random = new Random(); int total = 0; //获取纳秒时间 long start = System.nanoTime(); for(int i = 0 ;i < nums.length; i++){ int temp = random.nextInt(100); nums[i] = temp; total += nums[i]; } long end = System.nanoTime(); System.out.println("初始化数组用时:"+ (end-start)+" 纳秒,初始化数组总和:"+total); long startTask = System.nanoTime(); SumTask task = new SumTask(nums,0,nums.length); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(task); long endTask = System.nanoTime(); System.out.println("线程计算用时::"+(endTask - startTask)+ " 纳秒,线程执行结果:"+ forkJoinTask.get()); forkJoinPool.shutdown(); } static class SumTask extends RecursiveTask<Integer> { private int [] nums; private int start; private int end ; private static final int THRESHOLD = 50 ; SumTask( int [] nums, int start,int end){ this.nums = nums; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if(end -start < THRESHOLD){ for(int i = start; i < end ; i++ ){ sum+= nums[i]; } return sum; } else{ //递归切分 int mid = (start+end )/2; SumTask leftTask = new SumTask(nums,start,mid); SumTask rightTask = new SumTask(nums,mid,end); leftTask.fork(); rightTask.fork(); //把两个小人物累加合并 return leftTask.join() + rightTask.join(); } } } }
两种创建线程池方式对比:
ThreadPoolExecutor 适用于IO密集型任务如:
1.http
2.rpc
3.DB
4.Redis 等
ForkJoinPool 适用于CPU密集型任务
1.处理大量的文本数据
标签:线程 i++ 服务 ons stat hand service forkjoin execution
原文地址:https://www.cnblogs.com/w-ting/p/12775139.html