标签:pat cep else capacity timer result 抛出异常 没有 拒绝策略
1> CorePoolSize 核心线程数
2> MaxPoolSize 最大线程数
3> QueueCapacity 队列容量
4> KeepAliveSecond 没有任务存活时间
5> TimeUnit 时间单位
6> rejectedExecutionHandler 拒绝策略
7> threadFactory一般使用默认的即可
1>ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
2>ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
3>ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4>ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
1>项目启动类添加@EnableScheduling注解开启
2>定时类上加@Component注解
3>定时方法上加@Scheduled(fixedDelay = 5000) 或者@Scheduled(cron = "0/3 * * * * *")
@Slf4j
@Component
public class DataMarketComponent implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
dynamicTimerComponent.register("proxy_concurrent_req", new DataMarketComponent.ProxyConcurrentReq(), "30 */1 * * * ?");
}
/***
* 根据proxyType proxyid统计每分钟的请求量
*/
public class ProxyConcurrentReq implements Runnable {
@Override
public void run() {
//多线程池
ExecutorService exs = null;
try {
//代理统计对象集合
if (tableList.size() > 0){
List<ProxyConReqLog> proxyConReqLogList = new ArrayList<>();
ProxyConReqLog proxyConReqLog;
List<Future<Map<String,Map<String,Long>>>> futureMap = new ArrayList<>();
String startTimeStr = DateUtil.getStringFormat(startTime, DateUtil.DATETIMEFORMAT2);
String endTimeStr = DateUtil.getStringFormat(endTime, DateUtil.DATETIMEFORMAT2);
//开启多线程
exs = Executors.newFixedThreadPool(tableList.size());
for (String otsTableName : tableList){
if (otsTableName.contains("ods_output_proxy_")){
try {
taskId = otsTableName.replaceAll("ods_output_proxy_","").replaceAll("_","");
String tableIndex = "_index_" + taskId;
if (null != dispatcherService.getOne(new Dispatcher(taskId,null))){
//1.高速提交任务,每个任务返回一个Future入list
futureMap.add(exs.submit(new CallableTaskAgg(startTimeStr, endTimeStr, otsTableName, tableIndex)));
}
} catch (Exception e) {
log.error("提交任务发生错误", e.getMessage());
}
}
}
//所有表聚合到一起的结果
Map<String,Long> httpMap = new HashMap<>();
Map<String,Long> tcpMap = new HashMap<>();
for (Future<Map<String,Map<String,Long>>> future : futureMap) {
while (true) {
if (future.isDone()&& !future.isCancelled()) {
Map<String, Map<String, Long>> tableResult = future.get();
Map<String, Long> singleTcpMap = tableResult.get(ProxyTypeEnum.TCP.code);
Map<String, Long> singleHttpMap = tableResult.get(ProxyTypeEnum.HTTP.code);
if (null != singleTcpMap && !CollectionUtils.isEmpty(singleTcpMap)){
singleTcpMap.forEach((key,value) -> tcpMap.merge(key,value,Long::sum));
}
if (null != singleHttpMap && !CollectionUtils.isEmpty(singleHttpMap)){
singleHttpMap.forEach((key,value) -> httpMap.merge(key,value,Long::sum));
}
break;
} else {
//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU
Thread.sleep(1);
}
}
}
}
} catch (Exception e) {
log.error("run exception, message is : {} ", e.getMessage());
} finally {
exs.shutdown();
}
}
}
private String generatorId(ProxyConReqLog proxyConReqLog) {
String dataVersion = DateUtil.getDateFormat(new Date(),"yyyyMMddHHmmss");
String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
String bn = dataVersion + uuid.substring(0,8);
return bn;
}
class CallableTaskAgg implements Callable{
String otsTableName;
String tableIndex;
String startTime;
String endTime;
public CallableTaskAgg(String startTime, String endTime, String otsTableName, String tableIndex) {
this.startTime = startTime;
this.endTime = endTime;
this.otsTableName = otsTableName;
this.tableIndex = tableIndex;
}
@Override
public Map<String,Map<String,Long>> call() {
Map map = otsQueryService.subGroupByFilter(startTime, endTime, otsTableName, tableIndex, "proxy_type", "proxy_id");
return map;
}
}
}
标签:pat cep else capacity timer result 抛出异常 没有 拒绝策略
原文地址:https://www.cnblogs.com/yyqxxy/p/11317153.html