最近发生了很多事情,甚至对自己的技术能力和学习方式产生了怀疑,所以有一段时间没更新文章了,估计以后更新的频率会越来越少,希望有更多的沉淀而不是简单地分享。让我有感悟的是,最近看到一篇关于ES集群状态更新的文章Elasticsearch Distributed Consistency Principles Analysis (2) - Meta,和 “提交给线程池的Runnable任务是以怎样的顺序执行的?”这个问题,因此,结合ES6.3.2源码,分析一下ES的Master节点是如何更新集群状态的。
在ES中,各个模块发生一些事件,会导致集群状态变化,并由org.elasticsearch.cluster.service.ClusterService#submitStateUpdateTask(java.lang.String, T)
//PrioritizedRunnable 实现了Comparable接口,compareTo方法比较任务的优先级
public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
private final Priority priority;//Runnable任务优先级
private final long creationDate;
private final LongSupplier relativeTimeProvider;
public int compareTo(PrioritizedRunnable pr) {
return priority.compareTo(pr.priority);
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
//core pool size == max pool size ==1,说明该线程池里面只有一个工作线程
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
而线程池的任务队列则是采用:PriorityBlockingQueue(底层是个数组,数据结构是:堆 Heap),通过compareTo方法比较Priority,从而决定任务的排队顺序。
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;
这里想提一下这种只采用一个线程执行任务状态更新的思路,它与Redis采用单线程执行Client的操作命令是一致的。各个Redis Client向Redis Server发起操作请求,Redis Server最终是以一个线程来"顺序地"执行各个命令。单线程执行方式,避免了数据并发操作导致的不一致性,并且不需要线程同步。毕竟同步需要加锁,而加锁会影响程序性能。
在这里,我想插一个问题:JDK线程池执行任务的顺序是怎样的?通过java.util.concurrent.ThreadPoolExecutor#execute方法先提交到线程池中的任务,一定会优先执行吗?这个问题经常被人问到,哈哈。但是,真正地理解,却不容易。因为它涉及到线程池参数,core pool size、max pool size 、任务队列的长度以及任务到来的时机。其实JDK源码中的注释已经讲得很清楚了:
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
* @author psj
* @date 2019/11/14
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException{
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(4);
ThreadPoolExecutor executorSevice = new ThreadPoolExecutor(1, 4, 0, TimeUnit.HOURS,
workQueue, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
for (int i = 1; i <=8; i++) {
MyRunnable task = new MyRunnable(i, workQueue);
System.out.println("submit: " + i + ", queue size:" + workQueue.size() + ", active count:" + executorSevice.getActiveCount());
public static class MyRunnable implements Runnable {
private int sequence;
private BlockingQueue taskQueue;
public MyRunnable(int sequence, BlockingQueue taskQueue) {
this.sequence = sequence;
this.taskQueue = taskQueue;
public void run() {
System.out.println("task :" + sequence + " finished, current queue size:" + taskQueue.size());
public static void sleepMills(int mills) {
try {
} catch (InterruptedException e) {
OK,分析完了线程池执行任务的顺序,再看看ES的PrioritizedEsThreadPoolExecutor线程池的参数:将 core pool size 和 max pool size 都设置成1,避免了这种"插队"的现象。各个模块触发的集群状态更新最终在org.elasticsearch.cluster.service.MasterService#submitStateUpdateTasks方法中构造UpdateTask对象实例,并通过submitTasks方法提交任务执行。额外需要注意的是:集群状态更新任务可以以批量执行方式提交,具体看org.elasticsearch.cluster.service.TaskBatcher的实现吧。
try {
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue()), executor))
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
最后来分析一下 org.elasticsearch.cluster.service.ClusterService类,在ES节点启动的时候,在Node#start()方法中会启动ClusterService,当其它各个模块执行一些操作触发集群状态改变时,就是通过ClusterService来提交集群状态更新任务。而ClusterService其实就是封装了 MasterService和ClusterApplierService,MasterService提供任务提交接口,内部维护一个线程池处理更新任务,而ClusterApplierService则负责通知各个模块应用新生成的集群状态。
