码迷,mamicode.com
首页 > 其他好文 > 详细

Mesos源码分析(12): Mesos-Slave接收到RunTask消息

时间:2016-08-01 01:38:14      阅读:203      评论:0      收藏:0      [点我收藏+]

标签:

在前文Mesos源码分析(8): Mesos-Slave的初始化中,Mesos-Slave接收到RunTaskMessage消息,会调用Slave::runTask.

?

  1. void Slave::runTask(
  2. ????const UPID& from,
  3. ????const FrameworkInfo& frameworkInfo,
  4. ????const FrameworkID& frameworkId_,
  5. ????const UPID& pid,
  6. ????TaskInfo task)
  7. {
  8. ……
  9. ??// Create frameworkId alias to use in the rest of the function.
  10. ??const FrameworkID frameworkId = frameworkInfo.id();
  11. ?
  12. ??LOG(INFO) << "Got assigned task " << task.task_id()
  13. ????????????<< " for framework " << frameworkId;
  14. ……
  15. ?
  16. ??CHECK(state == RECOVERING || state == DISCONNECTED ||
  17. ????????state == RUNNING || state == TERMINATING)
  18. ????<< state;
  19. ?
  20. ??// TODO(bmahler): Also ignore if we‘re DISCONNECTED.
  21. ??if (state == RECOVERING || state == TERMINATING) {
  22. ????LOG(WARNING) << "Ignoring task " << task.task_id()
  23. ?????????????????<< " because the slave is " << state;
  24. ????// TODO(vinod): Consider sending a TASK_LOST here.
  25. ????// Currently it is tricky because ‘statusUpdate()‘
  26. ????// ignores updates for unknown frameworks.
  27. ????return;
  28. ??}
  29. ?
  30. ??Future<bool> unschedule = true;
  31. ?
  32. ??// If we are about to create a new framework, unschedule the work
  33. ??// and meta directories from getting gc‘ed.
  34. ??Framework* framework = getFramework(frameworkId);
  35. ??if (framework == NULL) {
  36. ????// Unschedule framework work directory.
  37. ????string path = paths::getFrameworkPath(
  38. ????????flags.work_dir, info.id(), frameworkId);
  39. ?
  40. ????if (os::exists(path)) {
  41. ??????unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  42. ????}
  43. ?
  44. ????// Unschedule framework meta directory.
  45. ????path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
  46. ????if (os::exists(path)) {
  47. ??????unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  48. ????}
  49. ?
  50. ????Option<UPID> frameworkPid = None();
  51. ?
  52. ????if (pid != UPID()) {
  53. ??????frameworkPid = pid;
  54. ????}
  55. ?
  56. ????framework = new Framework(this, frameworkInfo, frameworkPid);
  57. ????frameworks[frameworkId] = framework;
  58. ????if (frameworkInfo.checkpoint()) {
  59. ??????framework->checkpointFramework();
  60. ????}
  61. ?
  62. ????// Is this same framework in completedFrameworks? If so, move the completed
  63. ????// executors to this framework and remove it from that list.
  64. ????// TODO(brenden): Consider using stout/cache.hpp instead of boost
  65. ????// circular_buffer.
  66. ????for (auto it = completedFrameworks.begin(), end = completedFrameworks.end();
  67. ?????????it != end;
  68. ?????????++it) {
  69. ??????if ((*it)->id() == frameworkId) {
  70. ????????framework->completedExecutors = (*it)->completedExecutors;
  71. ????????completedFrameworks.erase(it);
  72. ????????break;
  73. ??????}
  74. ????}
  75. ??}
  76. ?
  77. ??const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
  78. ??const ExecutorID& executorId = executorInfo.executor_id();
  79. ?
  80. ??if (HookManager::hooksAvailable()) {
  81. ????// Set task labels from run task label decorator.
  82. ????task.mutable_labels()->CopyFrom(HookManager::slaveRunTaskLabelDecorator(
  83. ????????task, executorInfo, frameworkInfo, info));
  84. ??}
  85. ?
  86. ??// We add the task to ‘pending‘ to ensure the framework is not
  87. ??// removed and the framework and top level executor directories
  88. ??// are not scheduled for deletion before ‘_runTask()‘ is called.
  89. ??CHECK_NOTNULL(framework);
  90. ??framework->pending[executorId][task.task_id()] = task;
  91. ?
  92. ??// If we are about to create a new executor, unschedule the top
  93. ??// level work and meta directories from getting gc‘ed.
  94. ??Executor* executor = framework->getExecutor(executorId);
  95. ??if (executor == NULL) {
  96. ????// Unschedule executor work directory.
  97. ????string path = paths::getExecutorPath(
  98. ????????flags.work_dir, info.id(), frameworkId, executorId);
  99. ?
  100. ????if (os::exists(path)) {
  101. ??????unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  102. ????}
  103. ?
  104. ????// Unschedule executor meta directory.
  105. ????path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
  106. ?
  107. ????if (os::exists(path)) {
  108. ??????unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  109. ????}
  110. ??}
  111. ?
  112. ??// Run the task after the unschedules are done.
  113. ??unschedule.onAny(
  114. ??????defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
  115. }

?

最终调用Slave::_runTask

  1. void Slave::_runTask(
  2. ????const Future<bool>& future,
  3. ????const FrameworkInfo& frameworkInfo,
  4. ????const TaskInfo& task)
  5. {
  6. ??const FrameworkID frameworkId = frameworkInfo.id();
  7. ?
  8. ??LOG(INFO) << "Launching task " << task.task_id()
  9. ????????????<< " for framework " << frameworkId;
  10. ?
  11. ??Framework* framework = getFramework(frameworkId);
  12. ??const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
  13. ??const ExecutorID& executorId = executorInfo.executor_id();
  14. ?
  15. ??if (framework->pending.contains(executorId) &&
  16. ??????framework->pending[executorId].contains(task.task_id())) {
  17. ????framework->pending[executorId].erase(task.task_id());
  18. ????if (framework->pending[executorId].empty()) {
  19. ??????framework->pending.erase(executorId);
  20. ??????// NOTE: Ideally we would perform the following check here:
  21. ??????//
  22. ??????// if (framework->executors.empty() &&
  23. ??????// framework->pending.empty()) {
  24. ??????// removeFramework(framework);
  25. ??????// }
  26. ??????//
  27. ??????// However, we need ‘framework‘ to stay valid for the rest of
  28. ??????// this function. As such, we perform the check before each of
  29. ??????// the ‘return‘ statements below.
  30. ????}
  31. ??} else {
  32. ????LOG(WARNING) << "Ignoring run task " << task.task_id()
  33. ?????????????????<< " of framework " << frameworkId
  34. ?????????????????<< " because the task has been killed in the meantime";
  35. ????return;
  36. ??}
  37. ?
  38. ??// We don‘t send a status update here because a terminating
  39. ??// framework cannot send acknowledgements.
  40. ??if (framework->state == Framework::TERMINATING) {
  41. ????LOG(WARNING) << "Ignoring run task " << task.task_id()
  42. ?????????????????<< " of framework " << frameworkId
  43. ?????????????????<< " because the framework is terminating";
  44. ?
  45. ????// Refer to the comment after ‘framework->pending.erase‘ above
  46. ????// for why we need this.
  47. ????if (framework->executors.empty() && framework->pending.empty()) {
  48. ??????removeFramework(framework);
  49. ????}
  50. ?
  51. ????return;
  52. ??}
  53. ?
  54. ??if (!future.isReady()) {
  55. ????LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
  56. ???????????????<< (future.isFailed() ? future.failure() : "future discarded");
  57. ?
  58. ????const StatusUpdate update = protobuf::createStatusUpdate(
  59. ????????frameworkId,
  60. ????????info.id(),
  61. ????????task.task_id(),
  62. ????????TASK_LOST,
  63. ????????TaskStatus::SOURCE_SLAVE,
  64. ????????UUID::random(),
  65. ????????"Could not launch the task because we failed to unschedule directories"
  66. ????????" scheduled for gc",
  67. ????????TaskStatus::REASON_GC_ERROR);
  68. ?
  69. ????// TODO(vinod): Ensure that the status update manager reliably
  70. ????// delivers this update. Currently, we don‘t guarantee this
  71. ????// because removal of the framework causes the status update
  72. ????// manager to stop retrying for its un-acked updates.
  73. ????statusUpdate(update, UPID());
  74. ?
  75. ????// Refer to the comment after ‘framework->pending.erase‘ above
  76. ????// for why we need this.
  77. ????if (framework->executors.empty() && framework->pending.empty()) {
  78. ??????removeFramework(framework);
  79. ????}
  80. ?
  81. ????return;
  82. ??}
  83. ?
  84. ??// NOTE: If the task or executor uses resources that are
  85. ??// checkpointed on the slave (e.g. persistent volumes), we should
  86. ??// already know about it. If the slave doesn‘t know about them (e.g.
  87. ??// CheckpointResourcesMessage was dropped or came out of order),
  88. ??// we send TASK_LOST status updates here since restarting the task
  89. ??// may succeed in the event that CheckpointResourcesMessage arrives
  90. ??// out of order.
  91. ??Resources checkpointedTaskResources =
  92. ????Resources(task.resources()).filter(needCheckpointing);
  93. ?
  94. ??foreach (const Resource& resource, checkpointedTaskResources) {
  95. ????if (!checkpointedResources.contains(resource)) {
  96. ??????LOG(WARNING) << "Unknown checkpointed resource " << resource
  97. ???????????????????<< " for task " << task.task_id()
  98. ???????????????????<< " of framework " << frameworkId;
  99. ?
  100. ??????const StatusUpdate update = protobuf::createStatusUpdate(
  101. ??????????frameworkId,
  102. ??????????info.id(),
  103. ??????????task.task_id(),
  104. ??????????TASK_LOST,
  105. ??????????TaskStatus::SOURCE_SLAVE,
  106. ??????????UUID::random(),
  107. ??????????"The checkpointed resources being used by the task are unknown to "
  108. ??????????"the slave",
  109. ??????????TaskStatus::REASON_RESOURCES_UNKNOWN);
  110. ?
  111. ??????statusUpdate(update, UPID());
  112. ?
  113. ??????// Refer to the comment after ‘framework->pending.erase‘ above
  114. ??????// for why we need this.
  115. ??????if (framework->executors.empty() && framework->pending.empty()) {
  116. ????????removeFramework(framework);
  117. ??????}
  118. ?
  119. ??????return;
  120. ????}
  121. ??}
  122. ?
  123. ??if (task.has_executor()) {
  124. ????Resources checkpointedExecutorResources =
  125. ??????Resources(task.executor().resources()).filter(needCheckpointing);
  126. ?
  127. ????foreach (const Resource& resource, checkpointedExecutorResources) {
  128. ??????if (!checkpointedResources.contains(resource)) {
  129. ????????LOG(WARNING) << "Unknown checkpointed resource " << resource
  130. ?????????????????????<< " for executor ‘" << task.executor().executor_id()
  131. ?????????????????????<< "‘ of framework " << frameworkId;
  132. ?
  133. ????????const StatusUpdate update = protobuf::createStatusUpdate(
  134. ????????????frameworkId,
  135. ????????????info.id(),
  136. ????????????task.task_id(),
  137. ????????????TASK_LOST,
  138. ????????????TaskStatus::SOURCE_SLAVE,
  139. ????????????UUID::random(),
  140. ????????????"The checkpointed resources being used by the executor are unknown "
  141. ????????????"to the slave",
  142. ????????????TaskStatus::REASON_RESOURCES_UNKNOWN,
  143. ????????????task.executor().executor_id());
  144. ?
  145. ????????statusUpdate(update, UPID());
  146. ?
  147. ????????// Refer to the comment after ‘framework->pending.erase‘ above
  148. ????????// for why we need this.
  149. ????????if (framework->executors.empty() && framework->pending.empty()) {
  150. ??????????removeFramework(framework);
  151. ????????}
  152. ?
  153. ????????return;
  154. ??????}
  155. ????}
  156. ??}
  157. ?
  158. ??// NOTE: The slave cannot be in ‘RECOVERING‘ because the task would
  159. ??// have been rejected in ‘runTask()‘ in that case.
  160. ??CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
  161. ????<< state;
  162. ?
  163. ??if (state == TERMINATING) {
  164. ????LOG(WARNING) << "Ignoring run task " << task.task_id()
  165. ?????????????????<< " of framework " << frameworkId
  166. ?????????????????<< " because the slave is terminating";
  167. ?
  168. ????// Refer to the comment after ‘framework->pending.erase‘ above
  169. ????// for why we need this.
  170. ????if (framework->executors.empty() && framework->pending.empty()) {
  171. ??????removeFramework(framework);
  172. ????}
  173. ?
  174. ????// We don‘t send a TASK_LOST here because the slave is
  175. ????// terminating.
  176. ????return;
  177. ??}
  178. ?
  179. ??CHECK(framework->state == Framework::RUNNING) << framework->state;
  180. ?
  181. ??// Either send the task to an executor or start a new executor
  182. ??// and queue the task until the executor has started.
  183. ??Executor* executor = framework->getExecutor(executorId);
  184. ?
  185. ??if (executor == NULL) {
  186. ????executor = framework->launchExecutor(executorInfo, task);
  187. ??}
  188. ?
  189. ??CHECK_NOTNULL(executor);
  190. ?
  191. ??switch (executor->state) {
  192. ????case Executor::TERMINATING:
  193. ????case Executor::TERMINATED: {
  194. ??????LOG(WARNING) << "Asked to run task ‘" << task.task_id()
  195. ???????????????????<< "‘ for framework " << frameworkId
  196. ???????????????????<< " with executor ‘" << executorId
  197. ???????????????????<< "‘ which is terminating/terminated";
  198. ?
  199. ??????const StatusUpdate update = protobuf::createStatusUpdate(
  200. ??????????frameworkId,
  201. ??????????info.id(),
  202. ??????????task.task_id(),
  203. ??????????TASK_LOST,
  204. ??????????TaskStatus::SOURCE_SLAVE,
  205. ??????????UUID::random(),
  206. ??????????"Executor terminating/terminated",
  207. ??????????TaskStatus::REASON_EXECUTOR_TERMINATED);
  208. ?
  209. ??????statusUpdate(update, UPID());
  210. ??????break;
  211. ????}
  212. ????case Executor::REGISTERING:
  213. ??????// Checkpoint the task before we do anything else.
  214. ??????if (executor->checkpoint) {
  215. ????????executor->checkpointTask(task);
  216. ??????}
  217. ?
  218. ??????// Queue task if the executor has not yet registered.
  219. ??????LOG(INFO) << "Queuing task ‘" << task.task_id()
  220. ????????????????<< "‘ for executor " << *executor;
  221. ?
  222. ??????executor->queuedTasks[task.task_id()] = task;
  223. ??????break;
  224. ????case Executor::RUNNING: {
  225. ??????// Checkpoint the task before we do anything else.
  226. ??????if (executor->checkpoint) {
  227. ????????executor->checkpointTask(task);
  228. ??????}
  229. ?
  230. ??????// Queue task until the containerizer is updated with new
  231. ??????// resource limits (MESOS-998).
  232. ??????LOG(INFO) << "Queuing task ‘" << task.task_id()
  233. ????????????????<< "‘ for executor " << *executor;
  234. ?
  235. ??????executor->queuedTasks[task.task_id()] = task;
  236. ?
  237. ??????// Update the resource limits for the container. Note that the
  238. ??????// resource limits include the currently queued tasks because we
  239. ??????// want the container to have enough resources to hold the
  240. ??????// upcoming tasks.
  241. ??????Resources resources = executor->resources;
  242. ?
  243. ??????// TODO(jieyu): Use foreachvalue instead once LinkedHashmap
  244. ??????// supports it.
  245. ??????foreach (const TaskInfo& task, executor->queuedTasks.values()) {
  246. ????????resources += task.resources();
  247. ??????}
  248. ?
  249. ??????containerizer->update(executor->containerId, resources)
  250. ????????.onAny(defer(self(),
  251. ?????????????????????&Self::runTasks,
  252. ?????????????????????lambda::_1,
  253. ?????????????????????frameworkId,
  254. ?????????????????????executorId,
  255. ?????????????????????executor->containerId,
  256. ?????????????????????list<TaskInfo>({task})));
  257. ??????break;
  258. ????}
  259. ????default:
  260. ??????LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
  261. ?????????????????<< executor->state;
  262. ??????break;
  263. ??}
  264. ?
  265. ??// We don‘t perform the checks for ‘removeFramework‘ here since
  266. ??// we‘re guaranteed by ‘launchExecutor‘ that ‘framework->executors‘
  267. ??// will be non-empty.
  268. ??CHECK(!framework->executors.empty());
  269. }

?

在这个函数中,会调用Framework::launchExecutor去运行一个Executor

  1. // Create and launch an executor.
  2. Executor* Framework::launchExecutor(
  3. ????const ExecutorInfo& executorInfo,
  4. ????const TaskInfo& taskInfo)
  5. {
  6. ??// Generate an ID for the executor‘s container.
  7. ??// TODO(idownes) This should be done by the containerizer but we
  8. ??// need the ContainerID to create the executor‘s directory. Fix
  9. ??// this when ‘launchExecutor()‘ is handled asynchronously.
  10. ??ContainerID containerId;
  11. ??containerId.set_value(UUID::random().toString());
  12. ?
  13. ??Option<string> user = None();
  14. ??// Create a directory for the executor.
  15. ??const string directory = paths::createExecutorDirectory(
  16. ??????slave->flags.work_dir,
  17. ??????slave->info.id(),
  18. ??????id(),
  19. ??????executorInfo.executor_id(),
  20. ??????containerId,
  21. ??????user);
  22. ?
  23. ??Executor* executor = new Executor(
  24. ??????slave, id(), executorInfo, containerId, directory, info.checkpoint());
  25. ?
  26. ??if (executor->checkpoint) {
  27. ????executor->checkpointExecutor();
  28. ??}
  29. ?
  30. ??CHECK(!executors.contains(executorInfo.executor_id()))
  31. ????<< "Unknown executor " << executorInfo.executor_id();
  32. ?
  33. ??executors[executorInfo.executor_id()] = executor;
  34. ?
  35. ??LOG(INFO) << "Launching executor " << executorInfo.executor_id()
  36. ????????????<< " of framework " << id()
  37. ????????????<< " with resources " << executorInfo.resources()
  38. ????????????<< " in work directory ‘" << directory << "";
  39. ?
  40. ??slave->files->attach(executor->directory, executor->directory)
  41. ????.onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
  42. ?
  43. ??// Tell the containerizer to launch the executor.
  44. ??// NOTE: We modify the ExecutorInfo to include the task‘s
  45. ??// resources when launching the executor so that the containerizer
  46. ??// has non-zero resources to work with when the executor has
  47. ??// no resources. This should be revisited after MESOS-600.
  48. ??ExecutorInfo executorInfo_ = executor->info;
  49. ??Resources resources = executorInfo_.resources();
  50. ??resources += taskInfo.resources();
  51. ??executorInfo_.mutable_resources()->CopyFrom(resources);
  52. ?
  53. ??// Launch the container.
  54. ??Future<bool> launch;
  55. ??if (!executor->isCommandExecutor()) {
  56. ????// If the executor is _not_ a command executor, this means that
  57. ????// the task will include the executor to run. The actual task to
  58. ????// run will be enqueued and subsequently handled by the executor
  59. ????// when it has registered to the slave.
  60. ????launch = slave->containerizer->launch(
  61. ????????containerId,
  62. ????????executorInfo_, // Modified to include the task‘s resources, see above.
  63. ????????executor->directory,
  64. ????????user,
  65. ????????slave->info.id(),
  66. ????????slave->self(),
  67. ????????info.checkpoint());
  68. ??} else {
  69. ????// An executor has _not_ been provided by the task and will
  70. ????// instead define a command and/or container to run. Right now,
  71. ????// these tasks will require an executor anyway and the slave
  72. ????// creates a command executor. However, it is up to the
  73. ????// containerizer how to execute those tasks and the generated
  74. ????// executor info works as a placeholder.
  75. ????// TODO(nnielsen): Obsolete the requirement for executors to run
  76. ????// one-off tasks.
  77. ????launch = slave->containerizer->launch(
  78. ????????containerId,
  79. ????????taskInfo,
  80. ????????executorInfo_, // Modified to include the task‘s resources, see above.
  81. ????????executor->directory,
  82. ????????user,
  83. ????????slave->info.id(),
  84. ????????slave->self(),
  85. ????????info.checkpoint());
  86. ??}
  87. ?
  88. ??launch.onAny(defer(slave,
  89. ?????????????????????&Slave::executorLaunched,
  90. ?????????????????????id(),
  91. ?????????????????????executor->id,
  92. ?????????????????????containerId,
  93. ?????????????????????lambda::_1));
  94. ?
  95. ??// Make sure the executor registers within the given timeout.
  96. ??delay(slave->flags.executor_registration_timeout,
  97. ????????slave,
  98. ????????&Slave::registerExecutorTimeout,
  99. ????????id(),
  100. ????????executor->id,
  101. ????????containerId);
  102. ?
  103. ??return executor;
  104. }

?

会给Task创建一个文件夹里面有Framework和Executor的信息。

最终会调用containerizer->launch。

根据前面Mesos源码分析(7): Mesos-Slave的启动中所说,这里的containerizer是指ComposingContainerizer

  1. Future<bool> ComposingContainerizer::launch(
  2. ????const ContainerID& containerId,
  3. ????const TaskInfo& taskInfo,
  4. ????const ExecutorInfo& executorInfo,
  5. ????const string& directory,
  6. ????const Option<string>& user,
  7. ????const SlaveID& slaveId,
  8. ????const PID<Slave>& slavePid,
  9. ????bool checkpoint)
  10. {
  11. ??return dispatch(process,
  12. ??????????????????&ComposingContainerizerProcess::launch,
  13. ??????????????????containerId,
  14. ??????????????????taskInfo,
  15. ??????????????????executorInfo,
  16. ??????????????????directory,
  17. ??????????????????user,
  18. ??????????????????slaveId,
  19. ??????????????????slavePid,
  20. ??????????????????checkpoint);
  21. }

?

ComposingContainerizer调用ComposingContainerizerProcess::launch

  1. Future<bool> ComposingContainerizerProcess::launch(
  2. ????const ContainerID& containerId,
  3. ????const TaskInfo& taskInfo,
  4. ????const ExecutorInfo& executorInfo,
  5. ????const string& directory,
  6. ????const Option<string>& user,
  7. ????const SlaveID& slaveId,
  8. ????const PID<Slave>& slavePid,
  9. ????bool checkpoint)
  10. {
  11. ??if (containers_.contains(containerId)) {
  12. ????return Failure("Container ‘" + stringify(containerId) +
  13. ???????????????????"‘ is already launching");
  14. ??}
  15. ?
  16. ??// Try each containerizer. If none of them handle the
  17. ??// TaskInfo/ExecutorInfo then return a Failure.
  18. ??vector<Containerizer*>::iterator containerizer = containerizers_.begin();
  19. ?
  20. ??Container* container = new Container();
  21. ??container->state = LAUNCHING;
  22. ??container->containerizer = *containerizer;
  23. ??containers_[containerId] = container;
  24. ?
  25. ??return (*containerizer)->launch(
  26. ??????containerId,
  27. ??????taskInfo,
  28. ??????executorInfo,
  29. ??????directory,
  30. ??????user,
  31. ??????slaveId,
  32. ??????slavePid,
  33. ??????checkpoint)
  34. ????.then(defer(self(),
  35. ????????????????&Self::_launch,
  36. ????????????????containerId,
  37. ????????????????taskInfo,
  38. ????????????????executorInfo,
  39. ????????????????directory,
  40. ????????????????user,
  41. ????????????????slaveId,
  42. ????????????????slavePid,
  43. ????????????????checkpoint,
  44. ????????????????containerizer,
  45. ????????????????lambda::_1));
  46. }

?

上面这个函数可以从第一个containerizer开始,调用它的launch函数,然后再调用ComposingContainerizerProcess::_launch函数。

  1. Future<bool> ComposingContainerizerProcess::_launch(
  2. ????const ContainerID& containerId,
  3. ????const Option<TaskInfo>& taskInfo,
  4. ????const ExecutorInfo& executorInfo,
  5. ????const string& directory,
  6. ????const Option<string>& user,
  7. ????const SlaveID& slaveId,
  8. ????const PID<Slave>& slavePid,
  9. ????bool checkpoint,
  10. ????vector<Containerizer*>::iterator containerizer,
  11. ????bool launched)
  12. {
  13. ??// The container struct won‘t be cleaned up by destroy because
  14. ??// in destroy we only forward the destroy, and wait until the
  15. ??// launch returns and clean up here.
  16. ??CHECK(containers_.contains(containerId));
  17. ??Container* container = containers_[containerId];
  18. ??if (container->state == DESTROYED) {
  19. ????containers_.erase(containerId);
  20. ????delete container;
  21. ????return Failure("Container was destroyed while launching");
  22. ??}
  23. ?
  24. ??if (launched) {
  25. ????container->state = LAUNCHED;
  26. ????return true;
  27. ??}
  28. ?
  29. ??// Try the next containerizer.
  30. ??++containerizer;
  31. ?
  32. ??if (containerizer == containerizers_.end()) {
  33. ????containers_.erase(containerId);
  34. ????delete container;
  35. ????return false;
  36. ??}
  37. ?
  38. ??container->containerizer = *containerizer;
  39. ?
  40. ??Future<bool> f = taskInfo.isSome() ?
  41. ??????(*containerizer)->launch(
  42. ??????????containerId,
  43. ??????????taskInfo.get(),
  44. ??????????executorInfo,
  45. ??????????directory,
  46. ??????????user,
  47. ??????????slaveId,
  48. ??????????slavePid,
  49. ??????????checkpoint) :
  50. ??????(*containerizer)->launch(
  51. ??????????containerId,
  52. ??????????executorInfo,
  53. ??????????directory,
  54. ??????????user,
  55. ??????????slaveId,
  56. ??????????slavePid,
  57. ??????????checkpoint);
  58. ?
  59. ??return f.then(
  60. ??????defer(self(),
  61. ????????????&Self::_launch,
  62. ????????????containerId,
  63. ????????????taskInfo,
  64. ????????????executorInfo,
  65. ????????????directory,
  66. ????????????user,
  67. ????????????slaveId,
  68. ????????????slavePid,
  69. ????????????checkpoint,
  70. ????????????containerizer,
  71. ????????????lambda::_1));
  72. }

?

ComposingContainerizerProcess::_launch函数会调用下一个Containerizerlaunch函数,直到最后一个containerizer

Mesos源码分析(12): Mesos-Slave接收到RunTask消息

标签:

原文地址:http://www.cnblogs.com/popsuper1982/p/5724360.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!