标签:规范 stream init adp read 设置 pool reac des
线程池的作用是初始化一些线程,当有任务的时候,就从中启动一个来执行相关任务,执行完后,线程资源重新回收到线程池中,达到复用的效果,从而减少资源的开销
在JDK中,Executors
类已经帮我们封装了创建线程池的方法。
Executors.newFixedThreadPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool();
但是点进去看的话,
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它的内部实现还是基于ThreadPoolExecutor
来实现的。通过阿里代码规范插件扫描会提示我们用ThreadPoolExecutor
去实现线程池。通过查看ThreadPoolExecutor
的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
do something
...
}
我觉得有以下几方面的原因。
keepAliveTime
(当线程池中线程数大于corePoolSize
的数m, 为这m个线程设置的最长等待时间 ),节约系统资源。workQueue
:线程等待队列,在Executors
中默认的是LinkedBlockingQueue
。可以理解是一种无界的数组,当有不断有线程来的时候,可能会撑爆机器内存。ThreadFactory
类。ThreadFactory
public class NacosSyncThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger(1);
private String threadPrefix = null;
private ThreadGroup threadGroup;
public NacosSyncThreadFactory(String prefix) {
this.threadPrefix = "thread" + "-" + prefix + "-" ;
threadGroup = Thread.currentThread().getThreadGroup();
}
public NacosSyncThreadFactory() {
this("pool");
}
@Override
public Thread newThread(Runnable r) {
String name = threadPrefix + threadNum.incrementAndGet();
Thread thread = new Thread(threadGroup, r, name);
return thread;
}
}
public class MyThreadPool {
private ThreadFactory threadFactory;
private int threadNum;
private BlockingQueue blockingQueue;
private RejectedExecutionHandler handler;
public MyThreadPool(ThreadFactory threadFactory, int threadNum,
BlockingQueue blockingQueue,
RejectedExecutionHandler handler ) {
this.threadFactory = threadFactory;
this.threadNum = threadNum;
this.blockingQueue = blockingQueue;
this.handler = handler;
}
public MyThreadPool() {
this(Executors.defaultThreadFactory(), 10,
new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());
}
public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) {
if (handler == null) {
handler = new ThreadPoolExecutor.AbortPolicy();
}
return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler);
}
}
初始化线程池类
MyThreadPool myThreadPool = new MyThreadPool();
threadPoolExecutor = myThreadPool.initThreadPool(
new NacosSyncThreadFactory("nacos-sync"),
threadNum,
new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardPolicy()
);
}
创建Callable(FutureTask)
/**
* 分页获取task信息
* @return
*/
private List<Task> getTask(int pageNum) {
IPage<Task> page = new Page(pageNum, 25);
IPage<Task> taskIPage = this.taskService.page(page);
if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) {
return null;
}
return taskIPage.getRecords();
}
// 执行任务
private FutureTask<String> assembleTaskFuture(Task task) {
FutureTask<String> futureTask = new FutureTask(() -> {
// 执行任务
this.doSyncWork(task);
return "success";
});
return futureTask;
}
执行任务(FutureTask)
public void zkSync() {
// 获取数据总数,得到线程数
int count = this.taskService.count();
int pageSize = 25;
int num = count / pageSize;
int pageTotal = count % pageSize == 0 ? num : num + 1;
log.info("========总记录数:{}=====总页数:{}", count, pageTotal);
for (int i = 1; i <= pageTotal; i++) {
List<Task> taskList = this.getTask(i);
if (CollectionUtils.isEmpty(taskList)) {
break;
}
List<Integer> collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList());
taskList.forEach(task -> {
FutureTask<String> futureTask = this.assembleTaskFuture(task);
threadPoolExecutor.execute(futureTask);
});
}
threadPoolExecutor.shutdown();
}
标签:规范 stream init adp read 设置 pool reac des
原文地址:https://www.cnblogs.com/KevinStark/p/12430091.html