void Slave::_runTask(
????const Future<bool>& future,
????const FrameworkInfo& frameworkInfo,
????const TaskInfo& task)
{
??const FrameworkID frameworkId = frameworkInfo.id();
?
??LOG(INFO) << "Launching task " << task.task_id()
????????????<< " for framework " << frameworkId;
?
??Framework* framework = getFramework(frameworkId);
??const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
??const ExecutorID& executorId = executorInfo.executor_id();
?
??if (framework->pending.contains(executorId) &&
??????framework->pending[executorId].contains(task.task_id())) {
????framework->pending[executorId].erase(task.task_id());
????if (framework->pending[executorId].empty()) {
??????framework->pending.erase(executorId);
??????// NOTE: Ideally we would perform the following check here:
??????//
??????// if (framework->executors.empty() &&
??????// framework->pending.empty()) {
??????// removeFramework(framework);
??????// }
??????//
??????// However, we need ‘framework‘ to stay valid for the rest of
??????// this function. As such, we perform the check before each of
??????// the ‘return‘ statements below.
????}
??} else {
????LOG(WARNING) << "Ignoring run task " << task.task_id()
?????????????????<< " of framework " << frameworkId
?????????????????<< " because the task has been killed in the meantime";
????return;
??}
?
??// We don‘t send a status update here because a terminating
??// framework cannot send acknowledgements.
??if (framework->state == Framework::TERMINATING) {
????LOG(WARNING) << "Ignoring run task " << task.task_id()
?????????????????<< " of framework " << frameworkId
?????????????????<< " because the framework is terminating";
?
????// Refer to the comment after ‘framework->pending.erase‘ above
????// for why we need this.
????if (framework->executors.empty() && framework->pending.empty()) {
??????removeFramework(framework);
????}
?
????return;
??}
?
??if (!future.isReady()) {
????LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
???????????????<< (future.isFailed() ? future.failure() : "future discarded");
?
????const StatusUpdate update = protobuf::createStatusUpdate(
????????frameworkId,
????????info.id(),
????????task.task_id(),
????????TASK_LOST,
????????TaskStatus::SOURCE_SLAVE,
????????UUID::random(),
????????"Could not launch the task because we failed to unschedule directories"
????????" scheduled for gc",
????????TaskStatus::REASON_GC_ERROR);
?
????// TODO(vinod): Ensure that the status update manager reliably
????// delivers this update. Currently, we don‘t guarantee this
????// because removal of the framework causes the status update
????// manager to stop retrying for its un-acked updates.
????statusUpdate(update, UPID());
?
????// Refer to the comment after ‘framework->pending.erase‘ above
????// for why we need this.
????if (framework->executors.empty() && framework->pending.empty()) {
??????removeFramework(framework);
????}
?
????return;
??}
?
??// NOTE: If the task or executor uses resources that are
??// checkpointed on the slave (e.g. persistent volumes), we should
??// already know about it. If the slave doesn‘t know about them (e.g.
??// CheckpointResourcesMessage was dropped or came out of order),
??// we send TASK_LOST status updates here since restarting the task
??// may succeed in the event that CheckpointResourcesMessage arrives
??// out of order.
??Resources checkpointedTaskResources =
????Resources(task.resources()).filter(needCheckpointing);
?
??foreach (const Resource& resource, checkpointedTaskResources) {
????if (!checkpointedResources.contains(resource)) {
??????LOG(WARNING) << "Unknown checkpointed resource " << resource
???????????????????<< " for task " << task.task_id()
???????????????????<< " of framework " << frameworkId;
?
??????const StatusUpdate update = protobuf::createStatusUpdate(
??????????frameworkId,
??????????info.id(),
??????????task.task_id(),
??????????TASK_LOST,
??????????TaskStatus::SOURCE_SLAVE,
??????????UUID::random(),
??????????"The checkpointed resources being used by the task are unknown to "
??????????"the slave",
??????????TaskStatus::REASON_RESOURCES_UNKNOWN);
?
??????statusUpdate(update, UPID());
?
??????// Refer to the comment after ‘framework->pending.erase‘ above
??????// for why we need this.
??????if (framework->executors.empty() && framework->pending.empty()) {
????????removeFramework(framework);
??????}
?
??????return;
????}
??}
?
??if (task.has_executor()) {
????Resources checkpointedExecutorResources =
??????Resources(task.executor().resources()).filter(needCheckpointing);
?
????foreach (const Resource& resource, checkpointedExecutorResources) {
??????if (!checkpointedResources.contains(resource)) {
????????LOG(WARNING) << "Unknown checkpointed resource " << resource
?????????????????????<< " for executor ‘" << task.executor().executor_id()
?????????????????????<< "‘ of framework " << frameworkId;
?
????????const StatusUpdate update = protobuf::createStatusUpdate(
????????????frameworkId,
????????????info.id(),
????????????task.task_id(),
????????????TASK_LOST,
????????????TaskStatus::SOURCE_SLAVE,
????????????UUID::random(),
????????????"The checkpointed resources being used by the executor are unknown "
????????????"to the slave",
????????????TaskStatus::REASON_RESOURCES_UNKNOWN,
????????????task.executor().executor_id());
?
????????statusUpdate(update, UPID());
?
????????// Refer to the comment after ‘framework->pending.erase‘ above
????????// for why we need this.
????????if (framework->executors.empty() && framework->pending.empty()) {
??????????removeFramework(framework);
????????}
?
????????return;
??????}
????}
??}
?
??// NOTE: The slave cannot be in ‘RECOVERING‘ because the task would
??// have been rejected in ‘runTask()‘ in that case.
??CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
????<< state;
?
??if (state == TERMINATING) {
????LOG(WARNING) << "Ignoring run task " << task.task_id()
?????????????????<< " of framework " << frameworkId
?????????????????<< " because the slave is terminating";
?
????// Refer to the comment after ‘framework->pending.erase‘ above
????// for why we need this.
????if (framework->executors.empty() && framework->pending.empty()) {
??????removeFramework(framework);
????}
?
????// We don‘t send a TASK_LOST here because the slave is
????// terminating.
????return;
??}
?
??CHECK(framework->state == Framework::RUNNING) << framework->state;
?
??// Either send the task to an executor or start a new executor
??// and queue the task until the executor has started.
??Executor* executor = framework->getExecutor(executorId);
?
??if (executor == NULL) {
????executor = framework->launchExecutor(executorInfo, task);
??}
?
??CHECK_NOTNULL(executor);
?
??switch (executor->state) {
????case Executor::TERMINATING:
????case Executor::TERMINATED: {
??????LOG(WARNING) << "Asked to run task ‘" << task.task_id()
???????????????????<< "‘ for framework " << frameworkId
???????????????????<< " with executor ‘" << executorId
???????????????????<< "‘ which is terminating/terminated";
?
??????const StatusUpdate update = protobuf::createStatusUpdate(
??????????frameworkId,
??????????info.id(),
??????????task.task_id(),
??????????TASK_LOST,
??????????TaskStatus::SOURCE_SLAVE,
??????????UUID::random(),
??????????"Executor terminating/terminated",
??????????TaskStatus::REASON_EXECUTOR_TERMINATED);
?
??????statusUpdate(update, UPID());
??????break;
????}
????case Executor::REGISTERING:
??????// Checkpoint the task before we do anything else.
??????if (executor->checkpoint) {
????????executor->checkpointTask(task);
??????}
?
??????// Queue task if the executor has not yet registered.
??????LOG(INFO) << "Queuing task ‘" << task.task_id()
????????????????<< "‘ for executor " << *executor;
?
??????executor->queuedTasks[task.task_id()] = task;
??????break;
????case Executor::RUNNING: {
??????// Checkpoint the task before we do anything else.
??????if (executor->checkpoint) {
????????executor->checkpointTask(task);
??????}
?
??????// Queue task until the containerizer is updated with new
??????// resource limits (MESOS-998).
??????LOG(INFO) << "Queuing task ‘" << task.task_id()
????????????????<< "‘ for executor " << *executor;
?
??????executor->queuedTasks[task.task_id()] = task;
?
??????// Update the resource limits for the container. Note that the
??????// resource limits include the currently queued tasks because we
??????// want the container to have enough resources to hold the
??????// upcoming tasks.
??????Resources resources = executor->resources;
?
??????// TODO(jieyu): Use foreachvalue instead once LinkedHashmap
??????// supports it.
??????foreach (const TaskInfo& task, executor->queuedTasks.values()) {
????????resources += task.resources();
??????}
?
??????containerizer->update(executor->containerId, resources)
????????.onAny(defer(self(),
?????????????????????&Self::runTasks,
?????????????????????lambda::_1,
?????????????????????frameworkId,
?????????????????????executorId,
?????????????????????executor->containerId,
?????????????????????list<TaskInfo>({task})));
??????break;
????}
????default:
??????LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
?????????????????<< executor->state;
??????break;
??}
?
??// We don‘t perform the checks for ‘removeFramework‘ here since
??// we‘re guaranteed by ‘launchExecutor‘ that ‘framework->executors‘
??// will be non-empty.
??CHECK(!framework->executors.empty());
}