标签:时长 group actor pen 配置 sts 同步 完成 创建工程
??在日常开发中,为了提高主线程的效率,往往需要采用异步调用处理,例如系统日志等。在实际业务场景中,可以使用消息中间件如RabbitMQ、RocketMQ、Kafka等来解决。假如对高可用没有太高的要求,也可以使用线程池或者队列来解决。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.c3stones</groupId>
<artifactId>spring-boot-async-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-async-demo</name>
<description>Spring Boot Simple Demo</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class SimpleDemo {
// 创建固定数目线程的线程池
// 阿里巴巴Java开发手册中提到可能存在OOM(OutOfMemory)内存溢出异常
// private static ExecutorService executorService = Executors.newFixedThreadPool(5);
// 推荐使用com.google.guava的ThreadFactoryBuilder来创建线程池
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simple-threadpool-%d")
.build();
/**
* java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize, long keepAliveTime, TimeUnit unit,
* BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
* RejectedExecutionHandler handler)
*
* @param corePoolSize 线程池大小
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 当线程大于线程池大小时,最长等待时间
* @param unit 时间单位
* @param workQueue 任务队列
* @param threadFactory 指定线程工厂
* @param handler 当线程池界限和队列容量时,阻止线程处理
*/
private static ExecutorService threadPool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// 提交线程到线程池
for (int i = 0; i < 20; i++) {
threadPool.execute(new SimpleThread1());
threadPool.execute(new SimpleThread2());
}
// 关闭
threadPool.shutdown();
}
}
class SimpleThread1 implements Runnable {
private static Logger logger = LoggerFactory.getLogger(SimpleThread1.class);
@Override
public void run() {
try {
logger.info("线程 SimpleThread1 被调用!");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class SimpleThread2 implements Runnable {
private static Logger logger = LoggerFactory.getLogger(SimpleThread2.class);
@Override
public void run() {
try {
logger.info("线程 SimpleThread2 被调用!");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
??问题:
??当进程被异常关闭,会导致存储在线程池或者队列的线程丢失。
??但是消息队列中的消息不会因为JVM进程关闭而丢失,依然存储在消息队列所在服务器上。
/**
* 启动类
*
* @author CL
*
*/
@SpringBootApplication
@EnableAsync // 开启异步支持
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
/**
* 快速入门
*
* @author CL
*
*/
@Service
@Slf4j
public class TestQuickService {
@SneakyThrows
public String get() {
log.info("调用UserService.get()!");
Thread.sleep(2000);
return "get";
}
@SneakyThrows
public String get2() {
log.info("调用UserService.get2()!");
Thread.sleep(5000);
return "get2";
}
}
/**
* 测试快速入门
*
* @author CL
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TestQuick {
@Autowired
private TestQuickService quickService;
/**
* 测试同步调用
*/
@Test
public void testSync() {
LocalDateTime startTime = LocalDateTime.now();
quickService.get();
quickService.get2();
LocalDateTime endTime = LocalDateTime.now();
log.info("总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
}
控制台打印:
2020-05-28 16:39:36.853 INFO 6220 --- [ main] com.c3stones.quick.TestQuickService : 调用UserService.get()!
2020-05-28 16:39:38.853 INFO 6220 --- [ main] com.c3stones.quick.TestQuickService : 调用UserService.get2()!
2020-05-28 16:39:43.857 INFO 6220 --- [ main] com.c3stones.test.TestQuick : 同步调用,总耗时:7000 ms
??可以看出总耗时为两个业务方法的总耗时,并且在主线程中执行。
@Async
public String asyncGet() {
return this.get();
}
@Async
public String asyncGet2() {
return this.get2();
}
/**
* 测试异步调用
*/
@Test
public void testAsync() {
LocalDateTime startTime = LocalDateTime.now();
quickService.asyncGet();
quickService.asyncGet2();
LocalDateTime endTime = LocalDateTime.now();
log.info("异步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
控制台打印:
2020-05-28 17:11:10.550 INFO 908 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService ‘applicationTaskExecutor‘
2020-05-28 17:11:10.563 INFO 908 --- [ main] com.c3stones.test.TestQuick : 异步调用,总耗时:45 ms
2020-05-28 17:11:10.574 INFO 908 --- [ task-2] com.c3stones.quick.TestQuickService : 调用UserService.get2()!
2020-05-28 17:11:10.575 INFO 908 --- [ task-1] com.c3stones.quick.TestQuickService : 调用UserService.get()!
2020-05-28 17:11:10.587 INFO 908 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService ‘applicationTaskExecutor‘
??可以看出总耗时不受业务方法时间的影响,并且都在异步线程池中执行的。
??注意:实际两个方法并没有执行完成。
??实际场景中肯定不可能出现上述的问题。肯定希望既能异步调用,并且主线程能阻塞,直到方法执行完成。
@Async
public Future<String> asyncFutureGet() {
return AsyncResult.forValue(this.get());
}
@Async
public Future<String> asyncFutureGet2() {
return AsyncResult.forValue(this.get2());
}
/**
* 测试异步调用并阻塞主线程
*/
@Test
@SneakyThrows
public void testAsyncFuture() {
LocalDateTime startTime = LocalDateTime.now();
// 执行
Future<String> getFuture = quickService.asyncFutureGet();
Future<String> get2Future = quickService.asyncFutureGet2();
// 阻塞等待执行结果
getFuture.get();
get2Future.get();
LocalDateTime endTime = LocalDateTime.now();
log.info("异步调用,总耗时:" + Duration.between(startTime, endTime).toMillis() + " ms");
}
控制台打印:
2020-05-28 17:14:57.434 INFO 5784 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService ‘applicationTaskExecutor‘
2020-05-28 17:14:57.468 INFO 5784 --- [ task-2] com.c3stones.quick.TestQuickService : 调用UserService.get2()!
2020-05-28 17:14:57.470 INFO 5784 --- [ task-1] com.c3stones.quick.TestQuickService : 调用UserService.get()!
2020-05-28 17:15:02.474 INFO 5784 --- [ main] com.c3stones.test.TestQuick : 异步调用,总耗时:5066 ms
2020-05-28 17:15:02.511 INFO 5784 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService ‘applicationTaskExecutor‘
??可以看出总耗时由耗时较长的方法决定,并且在异步线程池中执行。
spring:
task: # Spring执行器配置,对应TaskExecutionProperties配置类。对于Spring异步任务,会使用该执行器。
execution:
thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
pool: # 线程池相关
core-size: 8 # 核心线程数,线程池创建时初始化的线程数。默认为 8 。
max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE。
keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒。
queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE。
allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
shutdown:
await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 。
??spring.task.execution.shutdown
配置项是为了实现Spring Task异步任务的优雅关闭。
??在异步任务执行过程中,如果应用开始关闭,把异步任务需要使用到的Spring Bean一并销毁,例如数据库连接池等,但是异步任务还在执行中,当需要访问数据库时,就会导致报错。所以通过配置await-termination = true
来实现应用关闭时,等待异步任务执行完成。这样应用在关闭时Spring 会优先等待ThreadPoolTaskSchedule 执行完任务之后,再开始Spring Bean的销毁。
??同时,又考虑到我们不可能无限等待异步任务全部执行结束,因此可以配置await-termination-period = 60
,等待任务完成的最大时长,单位为秒。具体设置需与根据业务场景决定。
标签:时长 group actor pen 配置 sts 同步 完成 创建工程
原文地址:https://www.cnblogs.com/cao-lei/p/12967238.html