标签:
继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor
的消息驱动的协同机制。这篇涉及到一个非常关键的类——CheckpointCoordinator
。
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
该类可以理解为检查点的协调器,用来协调operator
和state
的分布式快照。
检查点的触发机制是基于定时器的周期性触发。这涉及到一个定时器的实现类ScheduledTrigger
触发检查点的定时任务类。其实现就是调用triggerCheckpoint
方法。这个方法后面会具体介绍。
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis());
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint", e);
}
}
启动触发检查点的定时任务的方法实现:
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
try {
// Multiple start calls are OK
checkpointIdCounter.start();
} catch (Exception e) {
String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
throw new RuntimeException(msg, e);
}
periodicScheduling = true;
currentPeriodicTrigger = new ScheduledTrigger();
timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
}
}
方法的实现包含两个主要动作:
checkpointIdCounter
关闭定时任务的方法,用来释放资源,重置一些标记变量。
该方法是触发一个新的检查点的核心逻辑。
首先,方法中会去判断一个flag:triggerRequestQueued
。该标识表示是否一个检查点的触发请求不能被立即执行。
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
return false;
}
如果不能被立即执行,则直接返回。
不能被立即执行的原因是:还有其他处理没有完成。
接着检查正在并发处理的未完成的检查点:
// if too many checkpoints are currently in progress, we need to mark that a request is queued
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
currentPeriodicTrigger = null;
}
return false;
}
如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。
以上这些检查处于基于锁机制实现的同步代码块中。
接着检查需要被触发检查点的task
是否都处于运行状态:
ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
triggerIDs[i] = ee.getAttemptId();
} else {
LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getSimpleName());
return false;
}
}
只要有一个task
不满足条件,则不会触发检查点,并立即返回。
然后检查是否所有需要ack检查点的task
都处于运行状态:
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
ev.getSimpleName());
return false;
}
}
如果有一个task
不满足条件,则不会触发检查点,并立即返回。
以上条件都满足(即没有return false;
),才具备触发一个检查点的基本条件。
下一步,获得checkpointId
:
final long checkpointID;
if (nextCheckpointId < 0) {
try {
// this must happen outside the locked scope, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) {
int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
return false;
}
}
else {
checkpointID = nextCheckpointId;
}
这依赖于该方法的另一个参数nextCheckpointId
,如果其值为-1
,则起到标识的作用,指示checkpointId
将从外部获取(比如Zookeeper
,后续文章会谈及检查点ID的生成机制)。
接着创建一个PendingCheckpoint
对象:
final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
该类表示一个待处理的检查点。
与此同时,会定义一个针对当前检查点超时进行资源清理的取消器canceller
。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用triggerQueuedRequests
方法启动一个触发检查点的定时任务,如果有的话(取决于triggerRequestQueued
是否为true)。
然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得checkpointId
的代码,不在同步块中。
检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的PendingCheckpoint
对象加入集合中:
pendingCheckpoints.put(checkpointID, checkpoint);
同时会启动针对当前检查点的超时取消器:
timer.schedule(canceller, checkpointTimeout);
接下来会发送消息给task
以真正触发检查点(基于消息驱动的协同机制):
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
上面已经谈到了检查点的触发机制是基于定时任务的周期性触发,那么定时任务的启停机制又是什么?Flink使用的是基于AKKA的Actor模型的消息驱动机制。
类CheckpointCoordinatorDeActivator
是一个Actor
的实现,它用于基于消息来驱动检查点的定时任务的启停:
public void handleMessage(Object message) {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
// we ignore all other messages
}
该Actor
会收到Job
状态的变化通知:JobStatusChanged
。一旦变成RUNNING
,那么检查点的定时任务会被立即启动;否则会被立即关闭。
该Actor
被创建的代码是CheckpointCoordinator
中的createActivatorDeactivator
方法:
public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
// wrap the ActorRef in a AkkaActorGateway to support message decoration
jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
}
return jobStatusListener;
}
}
既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:org.apache.flink.runtime.messages.checkpoint
中。
类图如下:
检查点消息的基础抽象类,提供了三个公共属性(从构造器注入):
JobID
的实例,表示当前这条消息实例的归属;ExecutionAttemptID
的实例,表示检查点的源/目的任务除此之外,该实现仅仅override了hashCode
和equals
方法。
该消息由JobManager
发送给TaskManager
,用于告诉一个task
触发它的检查点。
位于CheckpointCoordinator
类的triggerCheckpoint
中,上面已经提及过。
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
TaskManager
的handleCheckpointingMessage
实现:
case message: TriggerCheckpoint =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, timestamp)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}
主要是触发检查点屏障Barrier
。
该消息由TaskManager
发送给JobManager
,用于告诉检查点协调器:检查点的请求还没有能够被处理。这种情况通常发生于:某task
已处于RUNNING
状态,但在内部可能还没有准备好执行检查点。
它除了AbstractCheckpointMessage
需要的三个属性外,还需要用于关联检查点的timestamp
。
位于Task
类的triggerCheckpointBarrier
方法中:
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
jobManager.tell(decline);
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new RuntimeException(
"Error while triggering checkpoint for " + taskName,
t));
}
}
}
};
位于JobManager
的handleCheckpointMessage
中
具体的实现在CheckpointCoordinator
的receiveDeclineMessage
中:
首先从接收的消息中(DeclineCheckpoint
)获得检查点编号:
final long checkpointId = message.getCheckpointId();
接下来的逻辑是判断当前检查点是否是未完成的检查点:isPendingCheckpoint
接下来分为三种情况对待:
discarded
)isPendingCheckpoint = true;
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);
置isPendingCheckpoint
为true
,根据检查点编号,将检查点从未完成的检查点集合中移除,discard
检查点,记住最近的检查点(将其保持到到一个最近的检查点列表中)。
接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:
boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
PendingCheckpoint p = entries.next().getValue();
if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
haveMoreRecentPending = true;
break;
}
}
根据标识haveMoreRecentPending
来进入不同的处理逻辑:
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
triggerQueuedRequests();
}
如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。
discarded
抛出IllegalStateException
异常
如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将isPendingCheckpoint
置为true
,否则将isPendingCheckpoint
置为false
.
最后返回isPendingCheckpoint
。
该消息是一个应答信号,表示某个独立的task
的检查点已经完成。也是由TaskManager
发送给JobManager
。该消息会携带task
的状态:
RuntimeEnvironment
类的acknowledgeCheckpoint
方法。
具体的实现在CheckpointCoordinator
的receiveAcknowledgeMessage
中,开始的实现同receiveDeclineMessage
,也是判断当前接收到的消息中包含的检查点是否是待处理的检查点。如果是,并且也没有discard
掉,则执行如下逻辑:
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
LOG.info("Completed checkpoint " + checkpointId + " (in " +
completed.getDuration() + " ms)");
LOG.debug(completed.getStates().toString());
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(completed.getTimestamp());
onFullyAcknowledgedCheckpoint(completed);
triggerQueuedRequests();
}
}
检查点首先应答相关的task
,如果检查点已经完全应答完成,则将检查点转换成CompletedCheckpoint
,然后将其加入completedCheckpointStore
列表,并从pendingCheckpoints
中移除。然后调用dropSubsumedCheckpoints
它会从pendingCheckpoints
中diacard
所有时间戳小于当前检查点的时间戳,并从集合中移除。
最后,如果该检查点被转化为已完成的检查点,则:
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}
迭代所有待commit的task
,发送NotifyCheckpointComplete
消息。同时触发状态跟踪器的onCompletedCheckpoint
回调方法。
该消息由JobManager
发送给TaskManager
,用于告诉一个task
它的检查点已经得到完成确认,task
可以向第三方提交该检查点。
位于CheckpointCoordinator
类的receiveAcknowledgeMessage
方法中,当检查点acktask
完成,转化为CompletedCheckpoint
之后
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}
TaskManager
的handleCheckpointingMessage
实现:
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
}
主要是触发task
的notifyCheckpointComplete
方法。
这篇文章主要讲解了检查点的基于定时任务的周期性的触发机制,以及基于Akka的Actor
模型的消息驱动的协同处理机制。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
Apache Flink fault tolerance源码剖析(二)
标签:
原文地址:http://blog.csdn.net/yanghua_kobe/article/details/51533957