void Master::_accept(
????const FrameworkID& frameworkId,
????const SlaveID& slaveId,
????const Resources& offeredResources,
????const scheduler::Call::Accept& accept,
????const Future<list<Future<bool>>>& _authorizations)
??Framework* framework = getFramework(frameworkId);
??Slave* slave = slaves.registered.get(slaveId);
??if (slave == NULL || !slave->connected) {
????foreach (const Offer::Operation& operation, accept.operations()) {
??????if (operation.type() != Offer::Operation::LAUNCH) {
??????foreach (const TaskInfo& task, operation.launch().task_infos()) {
????????const TaskStatus::Reason reason =
????????????slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
??????????????????????????: TaskStatus::REASON_SLAVE_DISCONNECTED;
????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????slave == NULL ? "Slave removed" : "Slave disconnected",
????????forward(update, UPID(), framework);
????// Tell the allocator about the recovered resources.
??// Some offer operations update the offered resources. We keep
??// updated offered resources here. When a task is successfully
??// launched, we remove its resource from offered resources.
??Resources _offeredResources = offeredResources;
??// The order of `authorizations` must match the order of the operations in
??// `accept.operations()`, as they are iterated through simultaneously.
??list<Future<bool>> authorizations = _authorizations.get();
??foreach (const Offer::Operation& operation, accept.operations()) {
????switch (operation.type()) {
??????// The RESERVE operation allows a principal to reserve resources.
??????case Offer::Operation::RESERVE: {
????????Future<bool> authorization = authorizations.front();
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to reserve resources failed: " +
????????} else
if (!authorization.get()) {
???????????????"Not authorized to reserve resources as ‘" +
?????????????????framework->info.principal() + "‘");
????????Option<string> principal = framework->info.has_principal()
??????????? framework->info.principal()
??????????: Option<string>::none();
????????// Make sure this reserve operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.reserve(), principal);
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
????????// Test the given operation on the included resources.
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
????????_offeredResources = resources.get();
????????LOG(INFO) << "Applying RESERVE operation for resources "
??????????????????<< operation.reserve().resources() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
????????apply(framework, slave, operation);
??????// The UNRESERVE operation allows a principal to unreserve resources.
??????case Offer::Operation::UNRESERVE: {
????????Future<bool> authorization = authorizations.front();
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to unreserve resources failed: " +
????????} else
if (!authorization.get()) {
???????????????"Not authorized to unreserve resources as ‘" +
?????????????????framework->info.principal() + "‘");
????????// Make sure this unreserve operation is valid.
????????Option<Error> error = validation::operation::validate(
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
????????// Test the given operation on the included resources.
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
????????_offeredResources = resources.get();
????????LOG(INFO) << "Applying UNRESERVE operation for resources "
??????????????????<< operation.unreserve().resources() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
????????apply(framework, slave, operation);
??????case Offer::Operation::CREATE: {
????????Future<bool> authorization = authorizations.front();
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to create persistent volumes failed: " +
????????} else
if (!authorization.get()) {
???????????????"Not authorized to create persistent volumes as ‘" +
?????????????????framework->info.principal() + "‘");
????????// Make sure this create operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.create(), slave->checkpointedResources);
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
????????_offeredResources = resources.get();
????????LOG(INFO) << "Applying CREATE operation for volumes "
??????????????????<< operation.create().volumes() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
????????apply(framework, slave, operation);
??????case Offer::Operation::DESTROY: {
????????Future<bool> authorization = authorizations.front();
????????if (authorization.isFailed()) {
??????????// TODO(greggomann): We may want to retry this failed authorization
??????????// request rather than dropping it immediately.
???????????????"Authorization of principal ‘" + framework->info.principal() +
???????????????"‘ to destroy persistent volumes failed: " +
????????} else
if (!authorization.get()) {
???????????????"Not authorized to destroy persistent volumes as ‘" +
?????????????????framework->info.principal() + "‘");
????????// Make sure this destroy operation is valid.
????????Option<Error> error = validation::operation::validate(
????????????operation.destroy(), slave->checkpointedResources);
????????if (error.isSome()) {
??????????drop(framework, operation, error.get().message);
????????Try<Resources> resources = _offeredResources.apply(operation);
????????if (resources.isError()) {
??????????drop(framework, operation, resources.error());
????????_offeredResources = resources.get();
????????LOG(INFO) << "Applying DESTROY operation for volumes "
??????????????????<< operation.create().volumes() << " from framework "
??????????????????<< *framework << " to slave " << *slave;
????????apply(framework, slave, operation);
??????case Offer::Operation::LAUNCH: {
????????foreach (const TaskInfo& task, operation.launch().task_infos()) {
??????????Future<bool> authorization = authorizations.front();
??????????// NOTE: The task will not be in ‘pendingTasks‘ if
??????????// ‘killTask()‘ for the task was called before we are here.
??????????// No need to launch the task if it‘s no longer pending.
??????????// However, we still need to check the authorization result
??????????// and do the validation so that we can send status update
??????????// in case the task has duplicated ID.
??????????bool pending = framework->pendingTasks.contains(task.task_id());
??????????// Remove from pending tasks.
??????????if (authorization.isFailed() || !authorization.get()) {
????????????string user = framework->info.user(); // Default user.
????????????if (task.has_command() && task.command().has_user()) {
??????????????user = task.command().user();
????????????} else
if (task.has_executor() &&
???????????????????????task.executor().command().has_user()) {
??????????????user = task.executor().command().user();
????????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????????authorization.isFailed() ?
????????????????????"Authorization failure: " + authorization.failure() :
????????????????????"Not authorized to launch as user ‘" + user + "‘",
????????????forward(update, UPID(), framework);
??????????// Validate the task.
??????????// Make a copy of the original task so that we can
??????????// fill the missing `framework_id` in ExecutorInfo
??????????// if needed. This field was added to the API later
??????????// and thus was made optional.
??????????TaskInfo task_(task);
??????????if (task.has_executor() && !task.executor().has_framework_id()) {
??????????const Option<Error>& validationError = validation::task::validate(
??????????if (validationError.isSome()) {
????????????const StatusUpdate& update = protobuf::createStatusUpdate(
????????????forward(update, UPID(), framework);
??????????// Add task.
??????????if (pending) {
????????????_offeredResources -= addTask(task_, framework, slave);
????????????// TODO(bmahler): Consider updating this log message to
????????????// indicate when the executor is also being launched.
????????????LOG(INFO) << "Launching task " << task_.task_id()
??????????????????????<< " of framework " << *framework
??????????????????????<< " with resources " << task_.resources()
??????????????????????<< " on slave " << *slave;
????????????RunTaskMessage message;
????????????// TODO(anand): We set ‘pid‘ to UPID() for http frameworks
????????????// as ‘pid‘ was made optional in 0.24.0. In 0.25.0, we
????????????// no longer have to set pid here for http frameworks.
????????????if (HookManager::hooksAvailable()) {
??????????????// Set labels retrieved from label-decorator hooks.
????????????send(slave->pid, message);
????????LOG(ERROR) << "Unsupported offer operation " << operation.type();
??if (!_offeredResources.empty()) {
????// Tell the allocator about the unused (e.g., refused) resources.