标签:异步消息处理 executorservice simpleasynctaskexecutor
一、前言说明
任务分发器是相对于RabbitMQ消息处理的一种简化,在项目底层不依赖其他服务时可以使用该方案,特点是随项目分布式部署时以服务器性能决定处理速度,简单、高效、安全、可扩展性等。
实现原理比较简单,使用SimpleAsyncTaskExecutor随项目启动时初始化异步调度的任务,通过配置的异步调度任务创建守护进程,依赖守护进程来初始化ExecutorService线程池和BlockingQueue阻塞队列,以守护进程的线程从数据库读取待处理数据放入队列,线程池创建线程作为消费者去读消息并进行业务处理。
二、Spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans default-autowire="byName" xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" xmlns:context="http://www.springframework.org/schema/context"> <!-- 使用annotation 自动注册bean,并保证@Required,@Autowired的属性被注入 --> <context:component-scan base-package="com.my.test"/> <!-- dispatcherStarter --> <bean id="dispatcherStarter" class="com.my.test.manager.dispatch.DispatcherStart" init-method="init"> <property name="name" value="PointCoreDispatcher"/> <property name="cmdManager" ref="baseCmdManager"/> <property name="dispatcherMap"> <map> <!-- 异步调度1 --> <entry key="asyncPayeeDispatcher" value-ref="asyncDispatcher1" /> <!-- 异步调度2 --> <entry key="asyncAllDispatcher" value-ref="asyncDispatcher2" /> <!-- 异步调度3 --> <entry key="asyncDayEndDispatcher" value-ref="asyncDispatcher3" /> </map> </property> </bean> <!-- 异步调度1 --> <bean id="asyncDispatcher1" class="com.my.test.manager.dispatch.Dispatcher" destroy-method="destroy"> <property name="name" value="DISPATCHER_1"/> <property name="cmdManager" ref="baseCmdManager"/> <property name="cmdHandler" ref="asyncCmdHandler1"/> <!-- 无命令处理时休息时常(秒)(default 5) --> <property name="noTaskSleepSeconds" value="${dispatch.noTaskSleepSeconds}"/> <!-- 初始线程处理数(default 0) --> <property name="maxThreads" value="${dispatch.maxSize}"/> <!-- 线程池接收新任务阀值(default 2) --> <property name="hungrySize" value="${dispatch.hungrySize}"/> <!-- queueSize(default 1000) --> <property name="queueSize" value="${dispatch.queueSize}"/> </bean> <!-- 异步调度2 --> <bean id="asyncDispatcher2" class="com.my.test.manager.dispatch.Dispatcher" destroy-method="destroy"> <property name="name" value="DISPATCHER_2"/> <property name="cmdManager" ref="baseCmdManager"/> <property name="cmdHandler" ref="asyncCmdHandler2"/> <!-- 无命令处理时休息时常(秒)(default 5) --> <property name="noTaskSleepSeconds" value="${dispatch.noTaskSleepSeconds}"/> <!-- 最大线程数(default 5) --> <property name="maxThreads" value="${dispatch.maxSize}"/> <!-- 线程池接收新任务阀值(default 2) --> <property name="hungrySize" value="${dispatch.hungrySize}"/> <!-- queueSize(default 1000) --> <property name="queueSize" value="${dispatch.queueSize}"/> </bean> <!-- 异步调度3 --> <bean id="asyncDispatcher3" class="com.my.test.manager.dispatch.Dispatcher" destroy-method="destroy"> <property name="name" value="DISPATCHER_3"/> <property name="cmdManager" ref="baseCmdManager"/> <property name="cmdHandler" ref="asyncCmdHandler3"/> <!-- 无命令处理时休息时常(秒)(default 5) --> <property name="noTaskSleepSeconds" value="${dispatch.noTaskSleepSeconds}"/> <!-- 初始线程处理数(default 0) --> <!-- 最大线程数(default 5) --> <property name="maxThreads" value="${dispatch.maxSize}"/> <!-- 线程池接收新任务阀值(default 2) --> <property name="hungrySize" value="${dispatch.hungrySize}"/> <!-- queueSize(default 1000) --> <property name="queueSize" value="${dispatch.queueSize}"/> </bean> <!-- 异步调度1 --> <bean id="asyncALLCmdHandler" class="com.my.test.service.dispatch.AsyncCmdHandler1"> <property name="handlerName" value="异步调度1"/> <!--重试间隔 --> <property name="retryInterval" value="60"/> </bean> <!-- 异步调度2 --> <bean id="asyncPayeeCmdHandler" class="com.my.test.service.dispatch.AsyncCmdHandler2"> <property name="handlerName" value="异步调度2"/> <!--重试间隔 --> <property name="retryInterval" value="60"/> </bean> <!-- 异步调度3 --> <bean id="asyncDailyHandler" class="com.my.test.service.dispatch.AsyncCmdHandler3"> <property name="handlerName" value="异步调度3"/> <!--重试间隔 --> <property name="retryInterval" value="60"/> </bean> </beans>
三、Dispatch 入口
import lombok.extern.slf4j.Slf4j; import org.springframework.core.task.SimpleAsyncTaskExecutor; import java.util.Map; /** * Dispatch 入口 * <p> * 1、随容器启动而启动,创建dispath守护进程 * 2、激活命令 * </p> * */ @Slf4j(topic = "TASK") public class DispatcherStart { /** 分发器线程名称 */ private String name; /** 分发器列表,Key:分发器线程名称前缀;Value:分发器实例 */ private Map<String, Dispatcher> dispatcherMap; /** 异步组件业务处理接口 */ private CmdManager cmdManager; /** * 随容器启动而启动,创建dispath守护进程 */ public void init() { log.info("------- 【{}】正在启动...... -------",getName()); reactiveCommandServerIP(); SimpleAsyncTaskExecutor cmdDispatchExecutor = new SimpleAsyncTaskExecutor(); cmdDispatchExecutor.setDaemon(true); cmdDispatchExecutor.setConcurrencyLimit(dispatcherMap.size()); for (String dispatcherName : dispatcherMap.keySet()) { cmdDispatchExecutor.setThreadNamePrefix(dispatcherName); cmdDispatchExecutor.execute(dispatcherMap.get(dispatcherName)); } log.info("------- 【{}】启动完毕! -------",getName()); } /** * 激活命令 * 多机部署的情况由于某台机器挂掉需要把该台机器执行中的命令激活,并把失效日期进行延后 */ private void reactiveCommandServerIP(){ try{ int count = cmdManager.reactiveCommandServerIP(); log.info("激活命令条数:"+count); }catch (Exception e){ log.warn("激活命令失败!",e); } } public void setDispatcherMap(Map<String, Dispatcher> dispatcherMap) { this.dispatcherMap = dispatcherMap; } public void setName(String name) { this.name = name; } public String getName() { return name; } public CmdManager getCmdManager() { return cmdManager; } public void setCmdManager(CmdManager cmdManager) { this.cmdManager = cmdManager; } }
四、任务分发器
import com.my.test.dal.model.BizCmdDO; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.*; /** * 任务分发器 * <p> * 1、线程任务执行方法 * 2、中断线程 * 3、任务处理线程池 * </p> * */ @Getter @Setter @Slf4j(topic = "TASK") public class Dispatcher implements Runnable { /** 业务类型名称 */ private String name; /** 最大线程数 */ private int maxThreads = 10; /** 最大队列数 */ private int queueSize=2000; /** 无命令处理时休息时常(秒) */ private long noTaskSleepSeconds = 5; /** 线程池接收新任务阀值 */ private int hungrySize; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Dispatch-"+name+"-%d") .setDaemon(true) .build(); /** 线程池 */ private ExecutorService pool; /** 任务处理列 */ private BlockingQueue<BizCmdDO> queue; /** 命令管理器,查询待处理命令 */ private CmdManager cmdManager; /** 命令处理Handler */ private CmdHandler cmdHandler; /** * 线程任务执行方法 */ @Override public void run() { log.info("分发器[" + getName() + "]启动 maxThreads {},queueSize:{},hungrySize:{}...", maxThreads,queueSize,hungrySize); //初始化队列 queue = new ArrayBlockingQueue<>(queueSize); //初始化线程池 pool = Executors.newFixedThreadPool(maxThreads,threadFactory); //初始化数据读入队列线程 Scanner scanner = new Scanner(cmdManager,queue,hungrySize); //启动数据读入线程 new Thread(scanner).start(); //从队列中获取数据执行 while (true) { try { if (scanner.queue.isEmpty()) { sleep(); continue; } BizCmdDO command = scanner.queue.poll(100L,TimeUnit.MILLISECONDS); while (((ThreadPoolExecutor)pool).getActiveCount() >= maxThreads) { try { log.info("Dispatcher :{} 线程数不足,休眠100 毫秒",name); Thread.sleep(100L); } catch (InterruptedException e) { log.warn("Dispatcher {} 休眠被打断。",name); } } pool.execute(new HandlerWrapper(command, cmdHandler)); } catch (Exception e) { log.error("分发器[{}]执行失败:{}", getName(),e.getMessage()); } } } private class Scanner implements Runnable { private CmdManager cmdManager; /** 任务处理列 */ private BlockingQueue<BizCmdDO> queue; /** 线程池接收新任务阀值 */ private int hungrySize; Scanner (CmdManager manager,BlockingQueue<BizCmdDO> queue,int hungrySize) { this.cmdManager = manager; this.queue = queue; this.hungrySize = hungrySize; } @Override public void run() { while (true){ try { if (queue.size() >= hungrySize) { log.debug("queue.size:{},hungrySize:{} read thread sleep ...",queue.size(),hungrySize); sleep(); continue; } List<BizCmdDO> commands = cmdManager.lockAndListCommands(name, 100); log.debug("分发器:[{}] query list size => {}",getName(),commands.size()); if (commands.isEmpty()) { log.debug("lockAndListCommands.size:{},read thread sleep ...",commands.size()); sleep(); continue; } commands.forEach(command -> queue.add(command)); } catch (Exception e) { log.error("分发器[{}] 读取命令失败",getName(),e.getMessage()); } } } } /** * 中断线程 */ public void destroy() { log.warn("收到分发器[{}]停止通知!!",getName()); pool.shutdown(); Thread.interrupted(); } /** * 线程睡眠 */ private void sleep() { try { TimeUnit.SECONDS.sleep(noTaskSleepSeconds); } catch (InterruptedException e) { log.info("分发器[{}]休眠被打断!",getName(), e.getMessage()); } } }
五、任务执行线程管理器
import com.my.test.dal.model.BizCmdDO; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; /** * 任务执行线程管理器 * * */ @Slf4j(topic = "TASK") @AllArgsConstructor public class HandlerWrapper implements Runnable { /** 处理命令对象 */ private BizCmdDO command; /** 具体命令处理类 */ private CmdHandler cmdHandler; @Override public void run() { try { cmdHandler.execute(command); } catch (Exception t) { log.error(t.getMessage() +"executeCmdTread 异常:{}", t); cmdHandler.handlerException(command, t.getMessage()); } } }
六、命令处理对象
import lombok.Getter; import lombok.Setter; import lombok.ToString; import java.util.Date; /** * 线程任务model * */ @Getter @Setter @ToString(callSuper = true) public class BizCmdDO extends BaseDO { /** * 序列化ID */ private static final long serialVersionUID = -8489428944604351417L; /** * 业务ID */ private String bizId; /** * 业务类型 */ private String bizType; /** * 执行的server地址 */ private String serverIP; /** * 最后一次执行失败的原因 */ private String failReason; /** * 环境标签 */ private String envTag; /** * 任务状态 db_column: STATUS */ private String status; /** * 是否正在处理中 db_column: IS_DOING */ private String isDoing; /** * 重试次数 db_column: RETRY_TIMES */ private Long retryTimes = 0L; /** * 最大重试次数,负数表示无限次 */ private Long maxRetryTimes = -1L; /** * 下次执行时间 db_column: NEXT_EXE_TIME */ private Date nextExeTime; /** * 命令执行起始时间 db_column: ENABLE_START_DATE */ private Date enableStartDate; /** * 命令执行终止时间 db_column: ENABLE_END_DATE */ private Date enableEndDate; }
七、异步组件业务处理Manager接口
import com.my.test.dal.model.BizCmdDO; import java.util.Date; import java.util.List; /** * 异步组件业务处理Manager接口 * <p> * 1、查询待处理命令 * 2、添加命令 * 3、更新命令 * 4、激活命令 * 5、ServerIP激活命令 * 6、延迟未处理成功的结束时间 * </p> * */ public interface CmdManager { /** * 查询待处理命令 * * @param bizType 业务类型 * @param cmdNum 查询返回数 * @return 待处理命令集合 * @throws RuntimeException */ List<BizCmdDO> lockAndListCommands(String bizType, int cmdNum) throws RuntimeException; /** * 添加命令 * * @param cmdObject 待处理命令 * @throws RuntimeException */ void insert(BizCmdDO cmdObject) throws RuntimeException; /** * 更新命令 * * @param cmdObject 命令 * @throws RuntimeException */ void update(BizCmdDO cmdObject) throws RuntimeException; /** * 激活命令 * * @param date 日期 * @return count */ int reactiveCommandBeforeDate(Date date); /** * ServerIP激活命令 * * @return count */ int reactiveCommandServerIP(); /** * 延迟未处理成功的结束时间 * * @return int */ int delayEndDate(); }
八、异步调度业务处理
import com.my.test.dal.model.BizCmdDO; import com.my.test.manager.dispatch.CmdHandler; import com.my.test.manager.dispatch.CmdManager; import com.my.test.manager.dispatch.StatusUtils; import com.my.test.manager.dispatch.core.DisErrorCode; import com.my.test.manager.dispatch.core.DisException; import com.my.test.manager.enums.BufferFlagEnum; import com.my.test.manager.enums.DispatchDoingEnum; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; /** * Dispatcher基本命令处理handler虚拟类 * * */ @Setter @Slf4j(topic = "TASK") public abstract class BaseCmdHandler implements CmdHandler { /** 异步组件业务处理接口 */ @Autowired private CmdManager cmdManager; /** 重试间隔 */ protected int retryInterval; /** Spring 事务模板 */ @Autowired private TransactionTemplate transactionTemplate; /** * handler执行方法 * * @param command 基础命令对象 * @throws Exception */ @Override public void execute(BizCmdDO command) throws Exception { if (StatusUtils.isCmdEnd(command.getStatus())) { return; } try { log.info("任务[{}]开始,参数[{}]", getHandlerName(), command.toString()); long startTime = System.currentTimeMillis(); doCmd(command); success(command); long endTime = System.currentTimeMillis(); log.info("任务[{}]结束[{}ms],参数[{}]", getHandlerName(), endTime - startTime, command.toString()); } catch (DisException e) { log.error("任务[{}]异常, 参数[{}], 错误码[{}], 异常信息[{}]", getHandlerName(), command.toString(), e.getErrorCode(), e.getMessage()); throw e; } catch (Exception e) { log.error("任务[{}]异常, 参数[{}], 错误码[{}], 异常信息[{}]",getHandlerName(), command.toString(), DisErrorCode.SYSTEM_INNER_ERROR, e.getMessage()); throw e; } } /** * 错误处理 * * @param command 基础命令对象 * @param failReason 失败原因 */ @Override public void handlerException(final BizCmdDO command,String failReason) { transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { String reason = failReason; //获取执行的错误原因,如果有的话 if (StringUtils.isNotBlank(reason) && reason.length() > 512) { reason = reason.substring(0, 511); } command.setFailReason(reason); if (needRetry(command)) { retry(command); } else { fail(command); failedFinally(command); } } }); } /** * 获取handler虚拟类 * * @return */ protected abstract String getHandlerName(); /** * 业务执行方法 * * @param command 基础命令对象 * @throws Exception */ protected abstract void doCmd(BizCmdDO command) throws Exception; /** * 执行失败处理方法 * * @param command 基础命令对象 */ protected abstract void failedFinally(BizCmdDO command); /** * 更新执行结果为成功 * * @param command 基础命令对象 */ private void success(BizCmdDO command) { command.setStatus(BufferFlagEnum.CmdStatusEnums.SUCCESS.getCode()); command.setIsDoing(DispatchDoingEnum.NO.getCode()); cmdManager.update(command); } /** * 重试判断 * * @param command 基础命令对象 * @return true/false */ private boolean needRetry(BizCmdDO command) { if (command.getEnableEndDate() != null && DateUtils.addSeconds(command.getNextExeTime(), retryInterval).after(command.getEnableEndDate())) { return false; } if (command.getMaxRetryTimes() > 0 && command.getRetryTimes() >= command.getMaxRetryTimes()) { return false; } return true; } /** * 重试 * * @param command 基础命令对象 */ protected void retry(BizCmdDO command) { command.setStatus(BufferFlagEnum.CmdStatusEnums.WAIT.getCode()); command.setIsDoing(DispatchDoingEnum.NO.getCode()); command.setRetryTimes(command.getRetryTimes() + 1); //modify on 2014-01-15 系统时间和数据库时间不相同时会出现问题,取上一次执行时间 command.setNextExeTime(DateUtils.addSeconds(command.getNextExeTime(), retryInterval)); cmdManager.update(command); } /** * 更新执行结果为失败 * * @param command 基础命令对象 */ private void fail(BizCmdDO command) { command.setStatus(BufferFlagEnum.CmdStatusEnums.FAILURE.getCode()); command.setIsDoing(DispatchDoingEnum.NO.getCode()); cmdManager.update(command); } /** * 新增执行命令 * * @param command 基础命令对象 */ protected void createCommand(BizCmdDO command) { cmdManager.insert(command); } } ----------------------------------------------------------------------------------------------- import com.my.test.biz.async.AsyncBusiness1; import com.my.test.dal.model.BizCmdDO; import com.my.test.manager.dispatch.impl.BaseCmdHandler; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Dispatcher 异步调度1 * */ @Slf4j @Setter @Getter @Component public class AsyncCmdHandler1 extends BaseCmdHandler { /** 业务处理类 */ @Autowired private AsyncBusiness1 asyncBusiness1; /** 用于日志显示 */ private String handlerName; /** * 异步执行入口 * * @param command 基础命令对象 */ @Override protected void doCmd(BizCmdDO command) throws Exception { log.info("exec doCmd: {} {} {}", getHandlerName(), command.getBizId(), command.getBizType()); asyncBusiness1.execute(command.getBizId()); log.info("SUCCESS to doCmd, Parameters:{}", command.getBizId()); } @Override protected void failedFinally(BizCmdDO command) { } } ----------------------------------------------------------------------------------------------- import com.my.test.biz.async.AsyncBusiness2; import com.my.test.dal.model.BizCmdDO; import com.my.test.manager.dispatch.impl.BaseCmdHandler; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Dispatcher 异步调度2 * */ @Slf4j @Setter @Getter @Component public class AsyncCmdHandler2 extends BaseCmdHandler { /** 业务处理类 */ @Autowired private AsyncBusiness2 asyncBusiness2; /** 用于日志显示 */ private String handlerName; /** * 异步执行入口 * * @param command 基础命令对象 */ @Override protected void doCmd(BizCmdDO command) throws Exception { log.info("exec doCmd: {} {} {}", getHandlerName(), command.getBizId(), command.getBizType()); asyncBusiness2.execute(command.getBizId()); log.info("SUCCESS to doCmd, Parameters:{}", command.getBizId()); } @Override protected void failedFinally(BizCmdDO command) { } } ----------------------------------------------------------------------------------------------- import com.my.test.biz.async.AsyncBusiness3; import com.my.test.dal.model.BizCmdDO; import com.my.test.manager.dispatch.impl.BaseCmdHandler; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Dispatcher 异步调度3 * */ @Slf4j @Setter @Getter @Component public class AsyncCmdHandler3 extends BaseCmdHandler { /** 业务处理类 */ @Autowired private AsyncBusiness3 asyncBusiness3; /** 用于日志显示 */ private String handlerName; /** * 异步执行入口 * * @param command 基础命令对象 */ @Override protected void doCmd(BizCmdDO command) throws Exception { log.info("exec doCmd: {} {} {}", getHandlerName(), command.getBizId(), command.getBizType()); asyncBusiness3.execute(command.getBizId()); log.info("SUCCESS to doCmd, Parameters:{}", command.getBizId()); } @Override protected void failedFinally(BizCmdDO command) { } }
本文出自 “让希望不再失望!” 博客,请务必保留此出处http://peterz2011.blog.51cto.com/3186140/1838509
标签:异步消息处理 executorservice simpleasynctaskexecutor
原文地址:http://peterz2011.blog.51cto.com/3186140/1838509