void Master::_accept(
????const FrameworkID& frameworkId,
????const SlaveID& slaveId,
????const Resources& offeredResources,
????const scheduler::Call::Accept& accept,
????const Future<list<Future<bool>>>& _authorizations)
{
??Framework* framework = getFramework(frameworkId);
……
?
??Slave* slave = slaves.registered.get(slaveId);
?
??if (slave == NULL || !slave->connected) {
????foreach (const Offer::Operation& operation, accept.operations()) {
??????if (operation.type() != Offer::Operation::LAUNCH) {
????????continue;
??????}
?
??????foreach (const TaskInfo& task, operation.launch().task_infos()) {
????????const TaskStatus::Reason reason =
????????????slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
??????????????????????????: TaskStatus::REASON_SLAVE_DISCONNECTED;
????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????framework->id(),
????????????task.slave_id(),
????????????task.task_id(),
????????????TASK_LOST,
????????????TaskStatus::SOURCE_MASTER,
????????????None(),
????????????slave == NULL ? "Slave removed" : "Slave disconnected",
????????????reason);
?
????????metrics->tasks_lost++;
?
????????metrics->incrementTasksStates(
????????????TASK_LOST,
????????????TaskStatus::SOURCE_MASTER,
????????????reason);
?
????????forward(update, UPID(), framework);
??????}
????}
?
????// Tell the allocator about the recovered resources.
????allocator->recoverResources(
????????frameworkId,
????????slaveId,
????????offeredResources,
????????None());
?
????return;
??}
?
??// Some offer operations update the offered resources. We keep
??// updated offered resources here. When a task is successfully
??// launched, we remove its resource from offered resources.
??Resources _offeredResources = offeredResources;
?
??// The order of `authorizations` must match the order of the operations in
??// `accept.operations()`, as they are iterated through simultaneously.
??CHECK_READY(_authorizations);
??list<Future<bool>> authorizations = _authorizations.get();
?
??foreach (const Offer::Operation& operation, accept.operations()) {
????switch (operation.type()) {
??????// The RESERVE operation allows a principal to reserve resources.
??????case Offer::Operation::RESERVE: {
????????Future<bool> authorization = authorizations.front();
????????authorizations.pop_front();
?
????????CHECK(!authorization.isDiscarded());
?
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
??????????drop(framework,
???????????????operation,
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to reserve resources failed: " +
???????????????authorization.failure());
?
??????????continue;
????????} else
if (!authorization.get()) {
??????????drop(framework,
???????????????operation,
???????????????"Not authorized to reserve resources as ‘" +
?????????????????framework->info.principal() + "‘");
?
??????????continue;
????????}
?
????????Option<string> principal = framework->info.has_principal()
??????????? framework->info.principal()
??????????: Option<string>::none();
?
????????// Make sure this reserve operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.reserve(), principal);
?
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
??????????continue;
????????}
?
????????// Test the given operation on the included resources.
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
??????????continue;
????????}
?
????????_offeredResources = resources.get();
?
????????LOG(INFO) << "Applying RESERVE operation for resources "
??????????????????<< operation.reserve().resources() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
?
????????apply(framework, slave, operation);
????????break;
??????}
?
??????// The UNRESERVE operation allows a principal to unreserve resources.
??????case Offer::Operation::UNRESERVE: {
????????Future<bool> authorization = authorizations.front();
????????authorizations.pop_front();
?
????????CHECK(!authorization.isDiscarded());
?
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
??????????drop(framework,
???????????????operation,
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to unreserve resources failed: " +
???????????????authorization.failure());
?
??????????continue;
????????} else
if (!authorization.get()) {
??????????drop(framework,
???????????????operation,
???????????????"Not authorized to unreserve resources as ‘" +
?????????????????framework->info.principal() + "‘");
?
??????????continue;
????????}
?
????????// Make sure this unreserve operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.unreserve());
?
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
??????????continue;
????????}
?
????????// Test the given operation on the included resources.
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
??????????continue;
????????}
?
????????_offeredResources = resources.get();
?
????????LOG(INFO) << "Applying UNRESERVE operation for resources "
??????????????????<< operation.unreserve().resources() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
?
????????apply(framework, slave, operation);
????????break;
??????}
?
??????case Offer::Operation::CREATE: {
????????Future<bool> authorization = authorizations.front();
????????authorizations.pop_front();
?
????????CHECK(!authorization.isDiscarded());
?
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
??????????drop(framework,
???????????????operation,
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to create persistent volumes failed: " +
???????????????authorization.failure());
?
??????????continue;
????????} else
if (!authorization.get()) {
??????????drop(framework,
???????????????operation,
???????????????"Not authorized to create persistent volumes as ‘" +
?????????????????framework->info.principal() + "‘");
?
??????????continue;
????????}
?
????????// Make sure this create operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.create(), slave->checkpointedResources);
?
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
??????????continue;
????????}
?
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
??????????continue;
????????}
?
????????_offeredResources = resources.get();
?
????????LOG(INFO) << "Applying CREATE operation for volumes "
??????????????????<< operation.create().volumes() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
?
????????apply(framework, slave, operation);
????????break;
??????}
?
??????case Offer::Operation::DESTROY: {
????????Future<bool> authorization = authorizations.front();
????????authorizations.pop_front();
?
????????CHECK(!authorization.isDiscarded());
?
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
??????????drop(framework,
???????????????operation,
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to destroy persistent volumes failed: " +
???????????????authorization.failure());
?
??????????continue;
????????} else
if (!authorization.get()) {
??????????drop(framework,
???????????????operation,
???????????????"Not authorized to destroy persistent volumes as ‘" +
?????????????????framework->info.principal() + "‘");
?
??????????continue;
????????}
?
????????// Make sure this destroy operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.destroy(), slave->checkpointedResources);
?
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
??????????continue;
????????}
?
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
??????????continue;
????????}
?
????????_offeredResources = resources.get();
?
????????LOG(INFO) << "Applying DESTROY operation for volumes "
??????????????????<< operation.create().volumes() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
?
????????apply(framework, slave, operation);
????????break;
??????}
?
??????case Offer::Operation::LAUNCH: {
????????foreach (const TaskInfo& task, operation.launch().task_infos()) {
??????????Future<bool> authorization = authorizations.front();
??????????authorizations.pop_front();
?
??????????// NOTE: The task will not be in ‘pendingTasks‘ if
??????????// ‘killTask()‘ for the task was called before we are here.
??????????// No need to launch the task if it‘s no longer pending.
??????????// However, we still need to check the authorization result
??????????// and do the validation so that we can send status update
??????????// in case the task has duplicated ID.
??????????bool pending = framework->pendingTasks.contains(task.task_id());
?
??????????// Remove from pending tasks.
??????????framework->pendingTasks.erase(task.task_id());
?
??????????CHECK(!authorization.isDiscarded());
?
??????????if (authorization.isFailed() || !authorization.get()) {
????????????string user = framework->info.user(); // Default user.
????????????if (task.has_command() && task.command().has_user()) {
??????????????user = task.command().user();
????????????} else
if (task.has_executor() &&
???????????????????????task.executor().command().has_user()) {
??????????????user = task.executor().command().user();
????????????}
?
????????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????????framework->id(),
????????????????task.slave_id(),
????????????????task.task_id(),
????????????????TASK_ERROR,
????????????????TaskStatus::SOURCE_MASTER,
????????????????None(),
????????????????authorization.isFailed() ?
????????????????????"Authorization failure: " + authorization.failure() :
????????????????????"Not authorized to launch as user ‘" + user + "‘",
????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
?
????????????metrics->tasks_error++;
?
????????????metrics->incrementTasksStates(
????????????????TASK_ERROR,
????????????????TaskStatus::SOURCE_MASTER,
????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
?
????????????forward(update, UPID(), framework);
?
????????????continue;
??????????}
?
??????????// Validate the task.
?
??????????// Make a copy of the original task so that we can
??????????// fill the missing `framework_id` in ExecutorInfo
??????????// if needed. This field was added to the API later
??????????// and thus was made optional.
??????????TaskInfo task_(task);
??????????if (task.has_executor() && !task.executor().has_framework_id()) {
????????????task_.mutable_executor()
????????????????->mutable_framework_id()->CopyFrom(framework->id());
??????????}
?
??????????const Option<Error>& validationError = validation::task::validate(
??????????????task_,
??????????????framework,
??????????????slave,
??????????????_offeredResources);
?
??????????if (validationError.isSome()) {
????????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????????framework->id(),
????????????????task_.slave_id(),
????????????????task_.task_id(),
????????????????TASK_ERROR,
????????????????TaskStatus::SOURCE_MASTER,
????????????????None(),
????????????????validationError.get().message,
????????????????TaskStatus::REASON_TASK_INVALID);
?
????????????metrics->tasks_error++;
?
????????????metrics->incrementTasksStates(
????????????????TASK_ERROR,
????????????????TaskStatus::SOURCE_MASTER,
????????????????TaskStatus::REASON_TASK_INVALID);
?
????????????forward(update, UPID(), framework);
?
????????????continue;
??????????}
?
??????????// Add task.
??????????if (pending) {
????????????_offeredResources -= addTask(task_, framework, slave);
?
????????????// TODO(bmahler): Consider updating this log message to
????????????// indicate when the executor is also being launched.
????????????LOG(INFO) << "Launching task " << task_.task_id()
??????????????????????<< " of framework " << *framework
??????????????????????<< " with resources " << task_.resources()
??????????????????????<< " on slave " << *slave;
?
????????????RunTaskMessage message;
????????????message.mutable_framework()->MergeFrom(framework->info);
?
????????????// TODO(anand): We set ‘pid‘ to UPID() for http frameworks
????????????// as ‘pid‘ was made optional in 0.24.0. In 0.25.0, we
????????????// no longer have to set pid here for http frameworks.
????????????message.set_pid(framework->pid.getOrElse(UPID()));
????????????message.mutable_task()->MergeFrom(task_);
?
????????????if (HookManager::hooksAvailable()) {
??????????????// Set labels retrieved from label-decorator hooks.
??????????????message.mutable_task()->mutable_labels()->CopyFrom(
??????????????????HookManager::masterLaunchTaskLabelDecorator(
??????????????????????task_,
??????????????????????framework->info,
??????????????????????slave->info));
????????????}
?
????????????send(slave->pid, message);
??????????}
????????}
????????break;
??????}
?
??????default:
????????LOG(ERROR) << "Unsupported offer operation " << operation.type();
????????break;
????}
??}
?
??if (!_offeredResources.empty()) {
????// Tell the allocator about the unused (e.g., refused) resources.
????allocator->recoverResources(
????????frameworkId,
????????slaveId,
????????_offeredResources,
????????accept.filters());
??}
}