码迷,mamicode.com
首页 > 编程语言 > 详细

Java基础之ThreadPoolExecutor详解

时间:2015-03-02 20:37:37      阅读:211      评论:0      收藏:0      [点我收藏+]

标签:

Spring中的ThreadPoolTaskExecutor是借助于JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。

ThreadPoolExecutor的构建参数:

 

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

1. 参数解释
corePoolSize:         核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
maximumPoolSize: 线程池维护线程的最大数量
keepAliveTime:      线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。
unit: 线程池维护线程所允许的空闲时间的单位、可选参数值为:TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue: 线程池所使用的缓冲队列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
handler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。

2.ThreadPoolExecutor内运转机制:

具体流程如下:当一个任务通过execute(Runnable)方法欲添加到线程池时:

  • 当池子大小小于corePoolSize就新建线程,并且处理请求。
  • 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理。
  • 当workQueue放不下新任务时,新建线程入池,并处理请求,如果池子的大小撑到maximumPoolSize就用RejectedExecutionHandler拒绝处理。
  • 另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。

 

 

技术分享

 

3.spingMVC中使用线程池,用做批处理

 

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:oscache="http://www.springmodules.org/schema/oscache"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
           http://www.springmodules.org/schema/oscache http://www.springmodules.org/schema/cache/springmodules-oscache.xsd">
    <!-- spring 加载资源文件的配置 -->
    <context:property-placeholder
        location="classpath:conf/custom/env/config.properties"
        ignore-unresolvable="true" />
            
    <!-- spring 线程池配置 start -->
    <!-- insurance线程池 -->
    <bean id="insuranceTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="${insurance.taskexecutor.corePoolSize}" />
        <property name="maxPoolSize" value="${insurance.taskexecutor.maxPoolSize}" />
        <property name="keepAliveSeconds" value="${insurance.taskexecutor.keepAliveSeconds}" />
        <property name="queueCapacity" value="${insurance.taskexecutor.queueCapacity}" />
    </bean>

</beans>

 config里面对于线程池的配置:

#insurance task setting
insurance.taskexecutor.corePoolSize=2
insurance.taskexecutor.maxPoolSize=2
insurance.taskexecutor.keepAliveSeconds=30
insurance.taskexecutor.queueCapacity=1000

 

Controller 层部分代码: 

 

/**
     * 
     * sendInsuranceBatch
     * 
     * @Title: sendInsuranceBatch
     * @Description: TODO 投保批处理
     * @param pojo
     * @param errors
     * @param request
     * @return
     */
    @RequestMapping("/ihotel_insurance_deliver")
    @ResponseBody
    public String sendInsuranceBatch(
            @ModelAttribute("pojo") IhotelBatchPojo pojo, Errors errors,
            HttpServletRequest request) {
        String remoteIp = WebUtils.getRemoteIpAddress(request);
        String serverIP = IPUtil.getServerIp();
        BaseResultInfo baseResultInfo = null;
        // 验证入参是否正确
        try {
            // 1,校验参数
            pojo.validate(pojo, errors);
            if (errors.hasErrors()) {
                baseResultInfo = new BaseResultInfo(
                        ResultType.PARAMETER_VERIFY_FAILUE);
                baseResultInfo.appendRetdesc("#" + getErrMsg(errors));
                return JsonUtil.BeanToJson(baseResultInfo);
            }
            ihotelInsuranceBatchService.processIhotelInsuredItem(pojo, remoteIp);
            baseResultInfo = new BaseResultInfo();
            baseResultInfo.setRetcode(0);
            baseResultInfo.setRetdesc("success");
            baseResultInfo.setServerIP(serverIP);
            return JSON.toJSONString(baseResultInfo);
        } catch (Exception e) {
            IHotelLoggerUtil.error("[国际酒店保险投保批处理异常]", e);
            throw e;
        }
    }

Service层部分代码:  

@Service("ihotelInsuranceBatchService")
public class IhotelInsuranceBatchService {
    // 保险线程池
    @Resource
    private ThreadPoolTaskExecutor insuranceTaskExecutor;
    

public void processIhotelInsuredItem(IhotelBatchPojo pojo, String remoteIp) {
        // 根据入参条件取出要投保批处理的数据
        List<InsuredHotelItem> isuredHotelList = this.findListByCond(pojo);
        for (InsuredHotelItem ihotelItem : isuredHotelList) {
//将任务放入线程池 insuranceTaskExecutor.execute(
new IhotelInsureDeliverTask( ihotelItem, this, remoteIp)); } }
}

 线程类

 

/**
 * @Title: IhotelInsureDeliverTask.java
 * @Package com.elong.ihotel.service.insurance
 * @Description: TODO
 * Copyright: Copyright (c) 2014 
 * Email: songbin0819@163.com
 * 
 * @author user
 * @date 2014年11月7日 下午4:04:02
 * @version V1.0
 */

package com.elong.ihotel.service.insurance;

import com.elong.ihotel.model.insurance.InsuredHotelItem;
import com.elong.ihotel.util.IHotelLoggerUtil;

/**
 * IhotelInsureDeliverTask
 * 
 * @Title: IhotelInsureDeliverTask
 * @Description: TODO 保险投保批处理线程类
 * @author Peng.Li
 * @date 2014年11月7日 下午4:04:02
 *
 */

public class IhotelInsureDeliverTask implements Runnable {
    /**
     * 需要批处理的保险中间表实体
     */
    private InsuredHotelItem ihotelItem;

    private IhotelInsuranceBatchService ihotelInsuranceBatchService;
    /**
     * ip
     */
    private String remoteIp;

    /**
     * Constructor for IhotelInsureDeliverTask.
     * <p>
     * Title:
     * </p>
     * <p>
     * Description:
     * </p>
     */
    public IhotelInsureDeliverTask() {
    }

    /**
     * Constructor for IhotelInsureDeliverTask.
     * <p>
     * Title:
     * </p>
     * <p>
     * Description:
     * </p>
     * 
     * @param ihotelItem
     * @param ihotelInsuranceBatchService
     * @param remoteIp
     */
    public IhotelInsureDeliverTask(InsuredHotelItem ihotelItem,
        IhotelInsuranceBatchService ihotelInsuranceBatchService, String remoteIp) {
    this.ihotelItem = ihotelItem;
    this.ihotelInsuranceBatchService = ihotelInsuranceBatchService;
    this.remoteIp = remoteIp;
    }

    /**
     * 
     * override run
     * <p>
     * Title: run
     * </p>
     * <p>
     * Description:
     * </p>
     */
    @Override
    public void run() {
    try {
        ihotelInsuranceBatchService.sendInsuranceBatch(ihotelItem, remoteIp);
    } catch (Exception e) {
        IHotelLoggerUtil.error("获取保险确认号失败", e);
    }

    }

}

 

 

 

Java基础之ThreadPoolExecutor详解

标签:

原文地址:http://www.cnblogs.com/200911/p/4309512.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!