标签:如何 out tps 实现 app 本地 并且 blocking bug
有时候我们需要某个请求下的所有的traceId都是一致的,以获得统一解析的日志文件。便于排查问题。
为每一个请求分配同一个traceId据我所知有两种方式:MDC和ThreadLocal,MDC的内部实现也是ThreadLocal,下面分别介绍这两种方式。
一、MDC
MDC(Mapped Diagnostic Contexts),翻译过来就是:映射的诊断上下文。意思是:在日志中(映射的)请求ID(requestId),可以作为我们定位(诊断)问题的关键字(上下文)。
有了MDC工具,只要在接口或切面植入 put 和 remove 代码,就可以在定位问题时,根据映射的唯一 requestID 快速过滤出某次请求的所有日志。
slf4j的MDC机制其内部基于ThreadLocal实现,可参见Java基础下的 ThreadLocal这篇博客,https://www.cnblogs.com/yangyongjie/p/10574591.html。
1、请求没有子线程的情况下代码实现:
1)使用Aop拦截请求
/** * 为每一个的HTTP请求添加线程号 * * @author yangyongjie * @date 2019/9/2 * @desc */ @Aspect @Component public class LogAspect { private static final String STR_THREAD_ID = "threadId"; @Pointcut(value = "@annotation(org.springframework.web.bind.annotation.RequestMapping)") private void webPointcut() { // doNothing } /** * 为所有的HTTP请求添加线程号 * * @param joinPoint * @throws Throwable */ @Around(value = "webPointcut()") public void around(ProceedingJoinPoint joinPoint) throws Throwable { // 方法执行前加上线程号 MDC.put(STR_THREAD_ID, UUID.randomUUID().toString().replaceAll("-", "")); // 执行拦截的方法 joinPoint.proceed(); // 方法执行结束移除线程号 MDC.remove(STR_THREAD_ID); } }
2)log4j日志配置
log4j.appender.stdout.layout.ConversionPattern=[%-5p]%d{yyyy-MM-dd HH:mm:ss.SSS}[%t]%X{threadId}[%c:%L] - %m%n
需要注意日志红色中字符串 threadId 需要和 日志拦截中MDC put的key是一样的。
2、请求有子线程的情况
slf4j的MDC机制其内部基于ThreadLocal实现,可参见Java基础下的 ThreadLocal这篇博客,https://www.cnblogs.com/yangyongjie/p/10574591.html。所以我们调用 MDC.put()方法传入
的请求ID只在当前线程有效。所以,主线程中设置的MDC数据,在其子线程(线程池)中是无法获取的。那么主线程如何将MDC数据传递给子线程?
官方建议
1)在父线程新建子线程之前调用MDC.getCopyOfContextMap()方法将MDC内容取出来传给子线程
2)子线程在执行操作前先调用MDC.setContextMap()方法将父线程的MDC内容设置到子线程
代码实现
1)使用Aop拦截请求,与上面相同
2)log4j日志配置与上面相同
3)装饰器模式装饰子线程,有两种方式:
方式一:使用装饰器模式,对Runnable接口进行一层装饰,在创建MDCRunnable类对Runnable接口进行一层装饰。
在创建MDCRunnable类时保存当前线程的MDC值,再执行run()方法
装饰器MDCRunnable装饰Runnable:
import org.slf4j.MDC; import java.util.Map; /** * 装饰器模式装饰Runnable,传递父线程的线程号 * * @author yangyongjie * @date 2020/3/9 * @desc */ public class MDCRunnable implements Runnable { private Runnable runnable; /** * 保存当前主线程的MDC值 */ private final Map<String, String> mainMdcMap; public MDCRunnable(Runnable runnable) { this.runnable = runnable; this.mainMdcMap = MDC.getCopyOfContextMap(); } @Override public void run() { // 将父线程的MDC值赋给子线程 for (Map.Entry<String, String> entry : mainMdcMap.entrySet()) { MDC.put(entry.getKey(), entry.getValue()); } // 执行被装饰的线程run方法 runnable.run(); // 执行结束移除MDC值 for (Map.Entry<String, String> entry : mainMdcMap.entrySet()) { MDC.put(entry.getKey(), entry.getValue()); } } }
使用MDCRunnable代替Runnable:
// 异步线程打印日志,用MDCRunnable装饰Runnable new Thread(new MDCRunnable(new Runnable() { @Override public void run() { logger.debug("log in other thread"); } })).start(); // 异步线程池打印日志,用MDCRunnable装饰Runnable EXECUTOR.execute(new MDCRunnable(new Runnable() { @Override public void run() { logger.debug("log in other thread pool"); } })); EXECUTOR.shutdown();
方式二:装饰线程池
/** * 装饰ThreadPoolExecutor,将父线程的MDC内容传给子线程 * @author yangyongjie * @date 2020/3/19 * @desc */ public class MDCThreadPoolExecutor extends ThreadPoolExecutor { private static final Logger LOGGER= LoggerFactory.getLogger(MDCThreadPoolExecutor.class); public MDCThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(final Runnable runnable) { // 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null final Map<String, String> context = MDC.getCopyOfContextMap(); super.execute(new Runnable() { @Override public void run() { // 将父线程的MDC内容传给子线程 MDC.setContextMap(context); try { // 执行异步操作 runnable.run(); } finally { // 清空MDC内容 MDC.clear(); } } }); } }
用MDCThreadPoolExecutor 代替ThreadPoolExecutor :
private static final MDCThreadPoolExecutor MDCEXECUTORS=new MDCThreadPoolExecutor(1,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(600), new CustomThreadFactory("mdcThreadPoolTest"), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 打印日志,并且重启一个线程执行被拒绝的任务 LOGGER.error("Task:{},rejected from:{}", r.toString(), executor.toString()); // 直接执行被拒绝的任务,JVM另起线程执行 r.run(); } }); LOGGER.info("父线程日志"); MDCEXECUTORS.execute(new Runnable() { @Override public void run() { LOGGER.info("子线程日志"); } });
二、ThreadLocal方式
ThreadLocal可以用于在同一个线程内,跨类、跨方法传递数据。因此可以用来透传全局上下文
1、没有子线程的情况
1)创建线程的请求上下文
/** * 线程上下文,一个线程内所需的上下文变量参数,使用ThreadLocal保存副本 * * @author yangyongjie * @date 2019/9/12 * @desc */ public class ThreadContext { /** * 每个线程的私有变量,每个线程都有独立的变量副本,所以使用private static final修饰,因为都需要复制进入本地线程 */ private static final ThreadLocal<ThreadContext> THREAD_LOCAL = new ThreadLocal<ThreadContext>() { @Override protected ThreadContext initialValue() { return new ThreadContext(); } }; public static ThreadContext currentThreadContext() { /*ThreadContext threadContext = THREAD_LOCAL.get(); if (threadContext == null) { THREAD_LOCAL.set(new ThreadContext()); threadContext = THREAD_LOCAL.get(); } return threadContext;*/ return THREAD_LOCAL.get(); } public static void remove() { THREAD_LOCAL.remove(); } private String threadId; public String getThreadId() { return threadId; } public void setThreadId(String threadId) { this.threadId = threadId; } @Override public String toString() { return JacksonJsonUtil.toString(this); } }
2)使用Aop拦截请求,给每个请求线程ThreadLocalMap添加线程号
/** * 为每一个的HTTP请求添加线程号 * * @author yangyongjie * @date 2019/9/2 * @desc */ @Aspect @Component public class LogAspect { private static final Logger LOGGER = LoggerFactory.getLogger(LogAspect.class); @Pointcut(value = "@annotation(org.springframework.web.bind.annotation.RequestMapping)") private void webPointcut() { // doNothing } /** * 为所有的HTTP请求添加线程号 * * @param joinPoint * @throws Throwable */ @Around(value = "webPointcut()") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { // 方法执行前加上线程号,并将线程号放到线程本地变量中 ThreadContext.currentThreadContext().setThreadId(StringUtil.uuid()); // 执行拦截的方法 Object result; try { result = joinPoint.proceed(); } finally { // 方法执行结束移除线程号,并移除线程本地变量,防止内存泄漏 ThreadContext.remove(); } return result; } }
3)获取线程号
String threadId = ThreadContext.currentThreadContext().getThreadId();
2、请求有子线程的情况
END
标签:如何 out tps 实现 app 本地 并且 blocking bug
原文地址:https://www.cnblogs.com/yangyongjie/p/12523567.html