标签:怎么办 表示 并行 消费 过多 实例 depend util 服务端
让我们先看个图, Taobao的PDP(Product Detail Page)页.
打开Chrome Network面板, 让我们来看taobao是怎么加载这个页面数据的. 根据经验, 一般是异步加载的, 要么是XHR,要么就是js(jsonp), 你应该很快可以找到
还能看到这个接口的性能
神奇的是, taobao竟然在一次请求中拉下了整个PDP页的完整数据, 而且服务端处理耗时不到125ms
首先, 这么做有什么好处?
然后, 这又是怎么做到的呢?
你可能会说缓存, 但你要知道, 这样一个对电商极为重要的页面, 绝对涉及到了非常多的团队, 比如:
即使每个团队的数据全都是缓存的, 你一个个去拿, 要在125ms内拿完也不容易. 而且作为跟钱相关的页面, 部分数据必须保证绝对实时有效, 能用缓存的地方不多. 怎么办, 如果是你, 你会怎么做? 离线打标? 数据预热? etc..
此时, 并行调用不失为一种好办法.
分析一下这个页面, 你会发现, 每一个模块除了属于同一个商品(入参相同), 其实各个模块的数据之间, 并没有依赖性, 完全可以并行去获取.
并行获取数据, 可以提高我们的接口性能. 但也会引入一些问题, 如:
下面, 我们来讨论下如何简单\易用\高效的并行获取数据; 如何解决上述异步问题.
假如你现在需要用户的基础信息\博客列表\粉丝列表 3份数据. 哪么你有哪些方式可以并行获取呢?
最简单原始的办法, 直接使用Java提供了的线程池和Future机制.
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CountDownLatch countDownLatch = new CountDownLatch(3);
Future<User> userFuture = executorService.submit(() -> {
try{
return userService.get(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<Post>> postsFuture = executorService.submit(() -> {
try{
return postService.getPosts(userId);
}finally {
countDownLatch.countDown();
}
});
Future<List<User>> followersFuture = executorService.submit(() -> {
try{
return followService.getFollowers(userId);
}finally {
countDownLatch.countDown();
}
});
countDownLatch.await();
User user = userFuture.get();
user.setFollowers(followersFuture.get());
user.setPosts(postsFuture.get());
return user;
}
我们知道, Spring支持@Async注解, 可以方便的实现异步, 并且支持获取返回值. 参考: https://www.baeldung.com/spring-async#2-methods-with-return-type
@Async实现的原理实际是在Bean的代理类的方法中, 拦截方法调用, 向taskExecutor Bean中提交Callable任务. 原理跟自己用Java ThreadPool写其实区别不大.
那么要用Spring Async实现上述功能. 首先需要修改下面3个方法的返回值, 并且修改返回值类型, 并为方法添加 @Async注解
class UserServiceImpl implements UserService {
@Async
public Future<User> get(Long userId) {
// ... something
}
}
class PostServiceImpl implements PostService {
@Async
public Future<List<Post> getPosts(Long userId) {
// ... something
}
}
class FollowServiceImpl implements FollowService {
@Async
public Future<List<User> getFollowers(Long userId) {
// ... something
}
}
并行获取3份用户数据然后聚合, 代码如下:
public User getUserDataByParallel(Long userId) throws InterruptedException, ExecutionException {
Future<User> userFuture = userService.get(userId);
Future<List<Post>> postsFuture = postService.getPosts(userId);
Future<List<User>> followersFuture = followService.getFollowers(userId);
User user = whileGet(userFuture);
user.setFollowers(whileGet(followersFuture));
user.setPosts(whileGet(postsFuture));
return user;
}
private <T> T whileGet(Future<T> future) throws ExecutionException, InterruptedException {
while(true) {
if (future.isDone()) {
break;
}
}
return future.get();
}
这里使用自旋去获取异步数据. 当然你也可以像前面那样, 传递一个闭锁(CountDownLatch)到Service中去, 然后让主调线程在一个闭锁上面等待.
上面2种方式的确能实现功能, 但首先, 它们都很不直观, 而且没有处理前面讲到的异步问题, 一旦出现超时\异常\ThreadLocal, 代码可能不会按照你预期的方式工作. 那有没有更简单方便可靠的方法呢?
试想这样一种方式, 如果你需要的数据, 都可以通过方法入参自动并行获取, 然后传递给你, 那是不是很方便? 就像这样:
@Component
public class UserAggregate {
@DataProvider("userWithPosts")
public User userWithPosts(
@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
}
这里的@DataConsumer
声明了你要异步获取的数据id. @DataProvider
声明了这个方法提供数据, 并且id为userWithPosts.
或者你不想写这样一个Aggregate类, 你不需要复用, 你想直接创建一个"匿名Provider". 那么你可以直接在任何地方像下面这样调用拿结果
User user = dataBeanAggregateQueryFacade.get(
Collections.singletonMap("userId", 1L),
new Function3<User, List<Post>,List<User>, User>() {
@Override
public User apply(@DataConsumer("user") User user,
@DataConsumer("posts") List<Post> posts,
@DataConsumer("followers") List<User> followers) {
user.setPosts(posts);
user.setFollowers(followers);
return user;
}
});
Assert.notNull(user,"user not null");
Assert.notNull(user.getPosts(),"user posts not null");
这里的Function3接收4个泛型参数, 最后一个User表示返回值类型, 前3个参数依次对应apply方法的3个入参类型. 项目预定义了Function2-Function5, 支持不超过5个参数, 如果你需要更多参数, 可以编写一个接口(FunctionInterface), 继承MultipleArgumentsFunction接口即可.
很显然
@DataConsumer
只会对应一个 @DataProvider
.@DataProvider
可能被多个 @DataConsumer
消费 .@DataProvider
通过多个 @DataConsumer
依赖上多个 @DataProvider
.你不用care底层如何实现. 只有在你有定制化的需求时, 才去关心一些配置参数. 去扩展一些能力.
@DataProvider
和 @DataConsumer
注解. 分析记录下依赖关系(有向非连通图), 并且记录好@DataProvider
和Spring Bean的映射关系.@DataConsumer
注解的入参, 将从此map中取值.@DataProvider
和 @DataConsumer
注解可以支持一些参数, 用来控制超时时间\异常处理方式\是否幂等缓存等等.@DataProvider
注解支持 timeout
参数, 用来控制超时. 实现原理是通过闭锁的超时等待方法.
java.util.concurrent.CountDownLatch#await(long, java.util.concurrent.TimeUnit)
对异常提供两种处理方式: 吞没或者向上层抛出.
@DataConsumer
注解支持exceptionProcessingMethod
参数, 用来表示这个Consumer想怎么处理Provider抛出的异常.
当然, 也支持在全局维度配置. 全局配置的优先级低于(<)Consumer配置的优先级.
Spring Bean初始化, 因为Bean创建和Bean属性赋值分了两步走, 因此可以用所谓的"早期引用"解决循环依赖的问题.
但如果你循环依赖的Bean, 依赖关系定义在构造函数入参上, 那么是没法解决循环依赖的问题的.
同理, 我们通过方法入参, 异步注入依赖数据, 在方法入参没有变化的情况下, 也是无法结束死循环的. 因此必须禁止循环依赖.
那么问题变为了怎么禁止循环依赖. 或者说, 怎么检测有向非联通图中的循环依赖, 两个办法:
这里我们用领接表+DFS染色搜索, 来实现环的检查
private void checkCycle(Map<String,Set<String>> graphAdjMap) {
Map<String,Integer> visitStatusMap = new HashMap<>(graphAdjMap.size() * 2);
for (Map.Entry<String, Set<String>> item : graphAdjMap.entrySet()) {
if (visitStatusMap.containsKey(item.getKey())) {
continue;
}
dfs(graphAdjMap,visitStatusMap,item.getKey());
}
}
private void dfs(Map<String,Set<String>> graphAdjMap,Map<String,Integer> visitStatusMap, String node) {
if (visitStatusMap.containsKey(node)) {
if(visitStatusMap.get(node) == 1) {
List<String> relatedNodes = new ArrayList<>();
for (Map.Entry<String,Integer> item : visitStatusMap.entrySet()) {
if (item.getValue() == 1) {
relatedNodes.add(item.getKey());
}
}
throw new IllegalStateException("There are loops in the dependency graph. Related nodes:" + StringUtils.join(relatedNodes));
}
return ;
}
visitStatusMap.put(node,1);
log.info("visited:{}", node);
for (String relateNode : graphAdjMap.get(node)) {
dfs(graphAdjMap,visitStatusMap,relateNode);
}
visitStatusMap.put(node,2);
}
许多的框架都使用了ThreadLocal来实现Context来保存单次请求中的一些共享数据, Spring也不例外.
众所周知, ThreadLocal实际是访问Thread中一个特殊Map的入口. ThreadLocal只能访问当前Thread的数据(副本), 如果跨越了线程, 是拿不到到其他ThreadLocalMap的数据的.
如图
这里, 我们先定义一个接口, 来描述这3个动作
public interface AsyncQueryTaskWrapper {
/**
* 任务提交之前执行. 此方法在提交任务的那个线程中执行
*/
void beforeSubmit();
/**
* 任务开始执行前执行. 此方法在异步线程中执行
* @param taskFrom 提交任务的那个线程
*/
void beforeExecute(Thread taskFrom);
/**
* 任务执行结束后执行. 此方法在异步线程中执行
* 注意, 不管用户的方法抛出何种异常, 此方法都会执行.
* @param taskFrom 提交任务的那个线程
*/
void afterExecute(Thread taskFrom);
}
为了让我们定义的3个动作起作用. 我们需要重写一下 java.util.concurrent.Callable#call方法.
public abstract class AsyncQueryTask<T> implements Callable<T> {
Thread taskFromThread;
AsyncQueryTaskWrapper asyncQueryTaskWrapper;
public AsyncQueryTask(Thread taskFromThread, AsyncQueryTaskWrapper asyncQueryTaskWrapper) {
this.taskFromThread = taskFromThread;
this.asyncQueryTaskWrapper = asyncQueryTaskWrapper;
}
@Override
public T call() throws Exception {
try {
if(asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.beforeExecute(taskFromThread);
}
return execute();
} finally {
if (asyncQueryTaskWrapper != null) {
asyncQueryTaskWrapper.afterExecute(taskFromThread);
}
}
}
/**
* 提交任务时, 业务方实现这个替代方法
*
* @return
* @throws Exception
*/
public abstract T execute() throws Exception;
}
接下来, 向线程池提交任务时, 不再直接提交Callable匿名类实例, 而是提交AsyncQueryTask实例. 并且在提交前触发 taskWrapper.beforeSubmit();
AsyncQueryTaskWrapper taskWrapper = new CustomAsyncQueryTaskWrapper();
// 任务提交前执行动作.
taskWrapper.beforeSubmit();
Future<?> future = executorService.submit(new AsyncQueryTask<Object>(Thread.currentThread(),taskWrapper) {
@Override
public Object execute() throws Exception {
try {
// something to do
} finally {
stopDownLatch.countDown();
}
}
});
你只需要定义一个类, 实现这个接口, 并将这个类加到配置文件中去.
@Slf4j
public class CustomAsyncQueryTaskWrapper implements AsyncQueryTaskWrapper {
/**
* "捆绑" 在任务实例中的数据
*/
private Long tenantId;
private User user;
@Override
public void beforeSubmit() {
/* 提交任务前, 先从当前线程拷贝出ThreadLocal中的数据到任务中 */
log.info("asyncTask beforeSubmit. threadName: {}",Thread.currentThread().getName());
this.tenantId = RequestContext.getTenantId();
this.user = ExampleAppContext.getUser();
}
@Override
public void beforeExecute(Thread taskFrom) {
/* 任务提交后, 执行前, 在异步线程中用数据恢复ThreadLocal(Context) */
log.info("asyncTask beforeExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.setTenantId(tenantId);
ExampleAppContext.setLoggedUser(user);
}
@Override
public void afterExecute(Thread taskFrom) {
/* 任务执行完成后, 清理异步线程中的ThreadLocal(Context) */
log.info("asyncTask afterExecute. threadName: {}, taskFrom: {}",Thread.currentThread().getName(),taskFrom.getName());
RequestContext.removeTenantId();
ExampleAppContext.remove();
}
}
添加配置使TaskWapper生效.
io.github.lvyahui8.spring.task-wrapper-class=io.github.lvyahui8.spring.example.wrapper.CustomAsyncQueryTaskWrapper
我们先把一次查询, 分为以下几个生命周期
转换成接口如下
public interface AggregateQueryInterceptor {
/**
* 查询正常提交, Context已经创建
*
* @param aggregationContext 查询上下文
* @return 返回为true才继续执行
*/
boolean querySubmitted(AggregationContext aggregationContext) ;
/**
* 每个Provider方法执行前, 将调用此方法. 存在并发调用
*
* @param aggregationContext 查询上下文
* @param provideDefinition 将被执行的Provider
*/
void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition);
/**
* 每个Provider方法执行成功之后, 调用此方法. 存在并发调用
*
* @param aggregationContext 查询上下文
* @param provideDefinition 被执行的Provider
* @param result 查询结果
* @return 返回结果, 如不修改不, 请直接返回参数中的result
*/
Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result);
/**
* 每个Provider执行时, 如果抛出异常, 将调用此方法. 存在并发调用
*
* @param aggregationContext 查询上下文
* @param provideDefinition 被执行的Provider
* @param e Provider抛出的异常
*/
void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e);
/**
* 一次查询全部完成.
*
* @param aggregationContext 查询上下文
*/
void queryFinished(AggregationContext aggregationContext);
}
在Spring应用启动之初, 获取所有实现了AggregateQueryInterceptor接口的Bean, 并按照Order注解排序, 作为拦截器链.
至于拦截器如何执行. 很简单, 在递归提交查询任务时, 插入执行一些钩子(hook)函数即可. 涉及到的代码很多, 就不贴在这里, 感兴趣的可以去github clone代码查看.
你可以实现一个拦截器, 在拦截器中输出日志, 监控节点执行状态(耗时, 出入参), 如下:
@Component
@Order(2)
@Slf4j
public class SampleAggregateQueryInterceptor implements AggregateQueryInterceptor {
@Override
public boolean querySubmitted(AggregationContext aggregationContext) {
log.info("begin query. root:{}",aggregationContext.getRootProvideDefinition().getMethod().getName());
return true;
}
@Override
public void queryBefore(AggregationContext aggregationContext, DataProvideDefinition provideDefinition) {
log.info("query before. provider:{}",provideDefinition.getMethod().getName());
}
@Override
public Object queryAfter(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Object result) {
log.info("query after. provider:{},result:{}",provideDefinition.getMethod().getName(),result.toString());
return result;
}
@Override
public void exceptionHandle(AggregationContext aggregationContext, DataProvideDefinition provideDefinition, Exception e) {
log.error(e.getMessage());
}
@Override
public void queryFinished(AggregationContext aggregationContext) {
log.info("query finish. root: {}",aggregationContext.getRootProvideDefinition().getMethod().getName());
}
}
最后, 再次贴一下项目地址: . spring-boot-data-aggregator
欢迎拍砖, 欢迎star, 欢迎使用
当Parallel遇上了DI - Spring并行数据聚合最佳实践
标签:怎么办 表示 并行 消费 过多 实例 depend util 服务端
原文地址:https://www.cnblogs.com/lvyahui/p/12169119.html