码迷,mamicode.com
首页 > 编程语言 > 详细

深入浅出Java并发编程(一):线程池的使用

时间:2016-05-20 17:42:25      阅读:305      评论:0      收藏:0      [点我收藏+]

标签:java   线程池   并发   

    我们在使用线程的时候就去建立一个线程,这样实现起来非常简便,但是会有一个问题:

    如果并发的线程数量很多,并且每个线程都是执行一个时间段很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率。

    那么如何解决此类问题呢?

    在Java中可以通过线程池来解决这样的效果。前面有文章简单提到过线程池的使用。今天我们来详细讲解下Java的线程池,由易而难,循序渐进,步骤如下:

  • 首先我们从最核心的ThreadPoolExecutor类的方法讲起

  • 然后讲述它的实现原理

  • 接着给出了相应的示例

  • 最后我们讨论下如何合理配置线程池的大小



一、Java中的ThreadPoolExecutor类

    在了解ThreadPoolExecutor类之前我们先来看下ThreadPoolExecutor的具体实现源代码

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;

public class ThreadPoolExecutor extends AbstractExecutorService {
	 ....//这里只给出构造器的源码,
	 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
                              long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
	
	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                              long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
	
	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
                              long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
	
	 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
                              long keepAliveTime,TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

从上面代码可以看到,继承了AbstractExecutorService类的ThreadPoolExecutor听了四个构造器,但最终执行的是第四个构造器

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

    其中,corePoolSize:核心池的大小,创建线程池后,在默认情况下没有任何线程,而是等待有任务到来才创建线程。这个参数的意义:当线程池中线程的数据到达corePoolSize后,就会把到达的任务放到缓存队列中去。

    maximumPoolSize:线程池最大线程数,表示在线程池中最多能够创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久会终止。

    unit:参数keepAlivetime的时间单位。

    workQueue:一个阻塞队列,用来存储等待执行的任务

    threadFactory:线程工厂,主要用来创建线程

    handler:表示当拒绝处理任务时的策略,主要有

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法实际上是在ThreadPoolExecutor的曾祖父类Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果

shutdown()和shutdownNow()是用来关闭线程池的。

使用示例:

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
          
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
             executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
 
class MyTask implements Runnable {
    private int taskNum;
     
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在执行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"执行完毕");
    }
}

但是在Java Doc中并建议我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

具体示例,请参照:http://aku28907.blog.51cto.com/5668513/1773073

2、如何合理配置线程池的大小

    

一般需要根据任务的类型来配置线程池大小:

  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

  如果是IO密集型任务,参考值可以设置为2*NCPU

  当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。



本文出自 “阿酷博客源” 博客,请务必保留此出处http://aku28907.blog.51cto.com/5668513/1775415

深入浅出Java并发编程(一):线程池的使用

标签:java   线程池   并发   

原文地址:http://aku28907.blog.51cto.com/5668513/1775415

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!