标签:tar 指定 extends exce oob term 解决问题 zab 请求
由于线程的创建?较昂贵,随意、没有控制地创建?量线程会造成性能问题,因此短平快的任务?般考虑使 ?线程池来处理,?不是直接创建线程。
通过三个?产事故,来看看使?线程池应该注意些什么。
Java中的Executors类定义了?些快捷的?具?法,来帮助我们快速创建线程池。《阿?巴巴Java开发? 册》中提到,禁?使?这些?法来创建线程池,?应该?动new ThreadPoolExecutor来创建线程池。
最典型的就是newFixedThreadPool和 newCachedThreadPool,可能因为资源耗尽导致OOM问题。
写?段测试代码,来初始化?个单线程的FixedThreadPool,循环1亿次向线程池提交任务,每个任务都会创建?个?较?的字符串然后休眠??时:
执?程序后不久,?志中就出现OOM
public void oom1() throws InterruptedException { ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); for (int i = 0; i < 100000000; i++) { threadPool.execute(() -> { String payload = IntStream.rangeClosed(1, 1000000) .mapToObj(__ -> "a") .collect(Collectors.joining("")) + UUID.randomUUID().toString(); try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { } }); } threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); }
看newFixedThreadPool?法的源码发现,线程池的?作队列直接new了?个 LinkedBlockingQueue,?默认构造?法的LinkedBlockingQueue是?个Integer.MAX_VALUE?度的队列,可以认为是?界的:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } ... }
虽然使?newFixedThreadPool可以把?作线程控制在固定的数量上,但任务队列是?界的。如果任务较多 并且执?较慢的话,队列可能会快速积压,撑爆内存导致OOM。
把刚才的例?稍微改?下,改为使?newCachedThreadPool?法来获得线程池。程序运?不久后, 同样看到了OOM异常
这次OOM的原因是?法创建线程,翻看newCachedThreadPool的源码可以看到,这种线程池的最?线程数是Integer.MAX_VALUE,可以认为是没有上限的,?其?作队列 SynchronousQueue是?个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到?条?作线程来处理,如果当前没有空闲的线程就再创建?条新的。
由于任务需要1?时才能执?完成,?量的任务进来后会创建?量的线程。线程是需要分配?定的内存空间作为线程栈的,?如1MB,因此?限制创建线程必然会导致OOM
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
抱有侥幸?理,觉得只是使?线程池做?些轻量级的任务,不可能造成队列积压或开启?量线程。
??注册后,调??个外部服务去发送短信, 发送短信接?正常时可以在100毫秒内响应,TPS 100的注册量,CachedThreadPool能稳定在占?10个左 右线程的情况下满?需求。在某个时间点,外部短信服务不可?了,我们调?这个服务的超时?特别?, ?如1分钟,1分钟可能就进来了6000??,产?6000个发送短信的任务,需要6000个线程,没多久就因为?法创建线程导致了OOM,整个应?程序崩溃。
不建议使?Executors提供的两种快捷的线程池,原因如下:
线程池线程管理策略详解
除了?动声明线程池以外,还可以??些监控?段来观察线程池的状态。线程池这个组件除?是出现了拒绝策略,否则压?再?都不会抛出?个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。
??个printStats?法实现了最简陋的监控,每秒输出?次线程池的基本内部信息,包括线程数、活跃线程数、完成了多少任务,以及队列中还有多少积压任务等信息:
private void printStats(ThreadPoolExecutor threadPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("========================="); log.info("Pool Size: {}", threadPool.getPoolSize()); log.info("Active Threads: {}", threadPool.getActiveCount()); log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount()); log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size()); log.info("========================="); }, 0, 1, TimeUnit.SECONDS); }
?定义?个线程池。这个线程池具有2个核?线程、5个最?线程、使?容量为10的ArrayBlockingQueue阻塞队列作为?作队列,使?默认的AbortPolicy拒绝策略,也就是任务添加到线程池失败会抛出RejectedExecutionException。此外,借助Jodd类库的ThreadFactoryBuilder?法来构造?个线程??,实现线程池线程的?定义命名。
写?段测试代码来观察线程池管理线程的策略。测试代码的逻辑为,每次间隔1秒向线程池提交 任务,循环20次,每个任务需要10秒才能执?完成,代码如下:
public static int right() throws InterruptedException { // 使用一个计数器跟踪完成的任务数 AtomicInteger atomicInteger = new AtomicInteger(); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(), new ThreadPoolExecutor.AbortPolicy() ); printStatus(threadPool); //每隔1s提交一次任务,一共提交20次 IntStream.rangeClosed(1, 20).forEach( i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } int id = atomicInteger.incrementAndGet(); try { threadPool.submit(() -> { System.out.println(id + " started."); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { } System.out.println(id + " finished."); } ); } catch (Exception ex) { //提交出现异常的话,打印出错信息并为计数器减? System.out.println("error submitting task " + id + " " + ex.toString()); atomicInteger.decrementAndGet(); } } ); TimeUnit.SECONDS.sleep(60); return atomicInteger.intValue(); } private static void printStatus(ThreadPoolExecutor threadPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( () -> { System.out.println("========================="); System.out.println("Pool Size: " + threadPool.getPoolSize()); System.out.println("Active Threads: " + threadPool.getActiveCount()); System.out.println("Number of Tasks Completed: " + threadPool.getCompletedTaskCount()); System.out.println(("Number of Tasks in Queue: " + threadPool.getQueue().size())); System.out.println("========================="); }, 0, 1, TimeUnit.SECONDS ); }
60秒后??输出了17,有3次提交失败了,把printStats?法打印出的?志绘制成图表,得出如下曲线:
可以总结出线程池默认的?作?为:
也可以 通过?些?段来改变线程池的默认?作?为:
标签:tar 指定 extends exce oob term 解决问题 zab 请求
原文地址:https://www.cnblogs.com/liekkas01/p/12791700.html