码迷,mamicode.com
首页 > 其他好文 > 详细

Mesos源码分析(11): Mesos-Master接收到launchTasks消息

时间:2016-08-01 00:13:52      阅读:203      评论:0      收藏:0      [点我收藏+]

标签:

根据Mesos源码分析(6): Mesos Master的初始化中的代码分析,当Mesos-Master接收到launchTask消息的时候,会调用Master::launchTasks函数。

?

  1. void Master::launchTasks(
  2. ????const UPID& from,
  3. ????const FrameworkID& frameworkId,
  4. ????const vector<TaskInfo>& tasks,
  5. ????const Filters& filters,
  6. ????const vector<OfferID>& offerIds)
  7. {
  8. ??Framework* framework = getFramework(frameworkId);
  9. ?
  10. ??if (framework == NULL) {
  11. ????LOG(WARNING)
  12. ??????<< "Ignoring launch tasks message for offers " << stringify(offerIds)
  13. ??????<< " of framework " << frameworkId
  14. ??????<< " because the framework cannot be found";
  15. ?
  16. ????return;
  17. ??}
  18. ?
  19. ??if (framework->pid != from) {
  20. ????LOG(WARNING)
  21. ??????<< "Ignoring launch tasks message for offers " << stringify(offerIds)
  22. ??????<< " from ‘" << from << "‘ because it is not from the"
  23. ??????<< " registered framework " << *framework;
  24. ?
  25. ????return;
  26. ??}
  27. ?
  28. ??// Currently when no tasks are specified in the launchTasks message
  29. ??// it is implicitly considered a decline of the offers.
  30. ??if (!tasks.empty()) {
  31. ????scheduler::Call::Accept message;
  32. ????message.mutable_filters()->CopyFrom(filters);
  33. ?
  34. ????Offer::Operation* operation = message.add_operations();
  35. ????operation->set_type(Offer::Operation::LAUNCH);
  36. ?
  37. ????foreach (const TaskInfo& task, tasks) {
  38. ??????operation->mutable_launch()->add_task_infos()->CopyFrom(task);
  39. ????}
  40. ?
  41. ????foreach (const OfferID& offerId, offerIds) {
  42. ??????message.add_offer_ids()->CopyFrom(offerId);
  43. ????}
  44. ?
  45. ????accept(framework, message);
  46. ??} else {
  47. ????scheduler::Call::Decline message;
  48. ????message.mutable_filters()->CopyFrom(filters);
  49. ?
  50. ????foreach (const OfferID& offerId, offerIds) {
  51. ??????message.add_offer_ids()->CopyFrom(offerId);
  52. ????}
  53. ?
  54. ????decline(framework, message);
  55. ??}
  56. }

?

它会进一步调用accept函数

  1. void Master::accept(
  2. ????Framework* framework,
  3. ????const scheduler::Call::Accept& accept)
  4. {
  5. ??CHECK_NOTNULL(framework);
  6. ?
  7. ??foreach (const Offer::Operation& operation, accept.operations()) {
  8. ????if (operation.type() == Offer::Operation::LAUNCH) {
  9. ??????if (operation.launch().task_infos().size() > 0) {
  10. ????????++metrics->messages_launch_tasks;
  11. ??????} else {
  12. ????????++metrics->messages_decline_offers;
  13. ??????}
  14. ????}
  15. ?
  16. ????// TODO(jieyu): Add metrics for non launch operations.
  17. ??}
  18. ?
  19. ??// TODO(bmahler): We currently only support using multiple offers
  20. ??// for a single slave.
  21. ??Resources offeredResources;
  22. ??Option<SlaveID> slaveId = None();
  23. ??Option<Error> error = None();
  24. ?
  25. ??if (accept.offer_ids().size() == 0) {
  26. ????error = Error("No offers specified");
  27. ??} else {
  28. ????// Validate the offers.
  29. ????error = validation::offer::validate(accept.offer_ids(), this, framework);
  30. ?
  31. ????// Compute offered resources and remove the offers. If the
  32. ????// validation failed, return resources to the allocator.
  33. ????foreach (const OfferID& offerId, accept.offer_ids()) {
  34. ??????Offer* offer = getOffer(offerId);
  35. ?
  36. ??????// Since we re-use `OfferID`s, it is possible to arrive here with either
  37. ??????// a resource offer, or an inverse offer. We first try as a resource offer
  38. ??????// and if that fails, then we assume it is an inverse offer. This is
  39. ??????// correct as those are currently the only 2 ways to get an `OfferID`.
  40. ??????if (offer != NULL) {
  41. ????????slaveId = offer->slave_id();
  42. ????????offeredResources += offer->resources();
  43. ?
  44. ????????if (error.isSome()) {
  45. ??????????allocator->recoverResources(
  46. ??????????????offer->framework_id(),
  47. ??????????????offer->slave_id(),
  48. ??????????????offer->resources(),
  49. ??????????????None());
  50. ????????}
  51. ????????removeOffer(offer);
  52. ????????continue;
  53. ??????}
  54. ?
  55. ??????// Try it as an inverse offer. If this fails then the offer is no longer
  56. ??????// valid.
  57. ??????InverseOffer* inverseOffer = getInverseOffer(offerId);
  58. ??????if (inverseOffer != NULL) {
  59. ????????mesos::master::InverseOfferStatus status;
  60. ????????status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
  61. ????????status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
  62. ????????status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
  63. ?
  64. ????????allocator->updateInverseOffer(
  65. ????????????inverseOffer->slave_id(),
  66. ????????????inverseOffer->framework_id(),
  67. ????????????UnavailableResources{
  68. ????????????????inverseOffer->resources(),
  69. ????????????????inverseOffer->unavailability()},
  70. ????????????status,
  71. ????????????accept.filters());
  72. ?
  73. ????????removeInverseOffer(inverseOffer);
  74. ????????continue;
  75. ??????}
  76. ?
  77. ??????// If the offer was neither in our offer or inverse offer sets, then this
  78. ??????// offer is no longer valid.
  79. ??????LOG(WARNING) << "Ignoring accept of offer " << offerId
  80. ???????????????????<< " since it is no longer valid";
  81. ????}
  82. ??}
  83. ?
  84. ??// If invalid, send TASK_LOST for the launch attempts.
  85. ??// TODO(jieyu): Consider adding a ‘drop‘ overload for ACCEPT call to
  86. ??// consistently handle message dropping. It would be ideal if the
  87. ??// ‘drop‘ overload can handle both resource recovery and lost task
  88. ??// notifications.
  89. ??if (error.isSome()) {
  90. ????LOG(WARNING) << "ACCEPT call used invalid offers ‘" << accept.offer_ids()
  91. ?????????????????<< "‘: " << error.get().message;
  92. ?
  93. ????foreach (const Offer::Operation& operation, accept.operations()) {
  94. ??????if (operation.type() != Offer::Operation::LAUNCH) {
  95. ????????continue;
  96. ??????}
  97. ?
  98. ??????foreach (const TaskInfo& task, operation.launch().task_infos()) {
  99. ????????const StatusUpdate& update = protobuf::createStatusUpdate(
  100. ????????????framework->id(),
  101. ????????????task.slave_id(),
  102. ????????????task.task_id(),
  103. ????????????TASK_LOST,
  104. ????????????TaskStatus::SOURCE_MASTER,
  105. ????????????None(),
  106. ????????????"Task launched with invalid offers: " + error.get().message,
  107. ????????????TaskStatus::REASON_INVALID_OFFERS);
  108. ?
  109. ????????metrics->tasks_lost++;
  110. ?
  111. ????????metrics->incrementTasksStates(
  112. ????????????TASK_LOST,
  113. ????????????TaskStatus::SOURCE_MASTER,
  114. ????????????TaskStatus::REASON_INVALID_OFFERS);
  115. ?
  116. ????????forward(update, UPID(), framework);
  117. ??????}
  118. ????}
  119. ?
  120. ????return;
  121. ??}
  122. ?
  123. ??CHECK_SOME(slaveId);
  124. ??Slave* slave = slaves.registered.get(slaveId.get());
  125. ??CHECK_NOTNULL(slave);
  126. ?
  127. ??LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
  128. ????????????<< " on slave " << *slave << " for framework " << *framework;
  129. ?
  130. ??list<Future<bool>> futures;
  131. ??foreach (const Offer::Operation& operation, accept.operations()) {
  132. ????switch (operation.type()) {
  133. ??????case Offer::Operation::LAUNCH: {
  134. ????????// Authorize the tasks. A task is in ‘framework->pendingTasks‘
  135. ????????// before it is authorized.
  136. ????????foreach (const TaskInfo& task, operation.launch().task_infos()) {
  137. ??????????futures.push_back(authorizeTask(task, framework));
  138. ?
  139. ??????????// Add to pending tasks.
  140. ??????????//
  141. ??????????// NOTE: The task ID here hasn‘t been validated yet, but it
  142. ??????????// doesn‘t matter. If the task ID is not valid, the task won‘t
  143. ??????????// be launched anyway. If two tasks have the same ID, the second
  144. ??????????// one will not be put into ‘framework->pendingTasks‘, therefore
  145. ??????????// will not be launched.
  146. ??????????if (!framework->pendingTasks.contains(task.task_id())) {
  147. ????????????framework->pendingTasks[task.task_id()] = task;
  148. ??????????}
  149. ????????}
  150. ????????break;
  151. ??????}
  152. ?
  153. ??????// NOTE: When handling RESERVE and UNRESERVE operations, authorization
  154. ??????// will proceed even if no principal is specified, although currently
  155. ??????// resources cannot be reserved or unreserved unless a principal is
  156. ??????// provided. Any RESERVE/UNRESERVE operation with no associated principal
  157. ??????// will be found invalid when `validate()` is called in `_accept()` below.
  158. ?
  159. ??????// The RESERVE operation allows a principal to reserve resources.
  160. ??????case Offer::Operation::RESERVE: {
  161. ????????Option<string> principal = framework->info.has_principal()
  162. ??????????? framework->info.principal()
  163. ??????????: Option<string>::none();
  164. ?
  165. ????????futures.push_back(
  166. ????????????authorizeReserveResources(
  167. ????????????????operation.reserve(), principal));
  168. ?
  169. ????????break;
  170. ??????}
  171. ?
  172. ??????// The UNRESERVE operation allows a principal to unreserve resources.
  173. ??????case Offer::Operation::UNRESERVE: {
  174. ????????Option<string> principal = framework->info.has_principal()
  175. ??????????? framework->info.principal()
  176. ??????????: Option<string>::none();
  177. ?
  178. ????????futures.push_back(
  179. ????????????authorizeUnreserveResources(
  180. ????????????????operation.unreserve(), principal));
  181. ?
  182. ????????break;
  183. ??????}
  184. ?
  185. ??????// The CREATE operation allows the creation of a persistent volume.
  186. ??????case Offer::Operation::CREATE: {
  187. ????????Option<string> principal = framework->info.has_principal()
  188. ??????????? framework->info.principal()
  189. ??????????: Option<string>::none();
  190. ?
  191. ????????futures.push_back(
  192. ????????????authorizeCreateVolume(
  193. ????????????????operation.create(), principal));
  194. ?
  195. ????????break;
  196. ??????}
  197. ?
  198. ??????// The DESTROY operation allows the destruction of a persistent volume.
  199. ??????case Offer::Operation::DESTROY: {
  200. ????????Option<string> principal = framework->info.has_principal()
  201. ??????????? framework->info.principal()
  202. ??????????: Option<string>::none();
  203. ?
  204. ????????futures.push_back(
  205. ????????????authorizeDestroyVolume(
  206. ????????????????operation.destroy(), principal));
  207. ?
  208. ????????break;
  209. ??????}
  210. ????}
  211. ??}
  212. ?
  213. ??// Wait for all the tasks to be authorized.
  214. ??await(futures)
  215. ????.onAny(defer(self(),
  216. ?????????????????&Master::_accept,
  217. ?????????????????framework->id(),
  218. ?????????????????slaveId.get(),
  219. ?????????????????offeredResources,
  220. ?????????????????accept,
  221. ?????????????????lambda::_1));
  222. }

?

如果鉴权通过,则调用Master::_accept

  1. void Master::_accept(
  2. ????const FrameworkID& frameworkId,
  3. ????const SlaveID& slaveId,
  4. ????const Resources& offeredResources,
  5. ????const scheduler::Call::Accept& accept,
  6. ????const Future<list<Future<bool>>>& _authorizations)
  7. {
  8. ??Framework* framework = getFramework(frameworkId);
  9. ……
  10. ?
  11. ??Slave* slave = slaves.registered.get(slaveId);
  12. ?
  13. ??if (slave == NULL || !slave->connected) {
  14. ????foreach (const Offer::Operation& operation, accept.operations()) {
  15. ??????if (operation.type() != Offer::Operation::LAUNCH) {
  16. ????????continue;
  17. ??????}
  18. ?
  19. ??????foreach (const TaskInfo& task, operation.launch().task_infos()) {
  20. ????????const TaskStatus::Reason reason =
  21. ????????????slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
  22. ??????????????????????????: TaskStatus::REASON_SLAVE_DISCONNECTED;
  23. ????????const StatusUpdate& update = protobuf::createStatusUpdate(
  24. ????????????framework->id(),
  25. ????????????task.slave_id(),
  26. ????????????task.task_id(),
  27. ????????????TASK_LOST,
  28. ????????????TaskStatus::SOURCE_MASTER,
  29. ????????????None(),
  30. ????????????slave == NULL ? "Slave removed" : "Slave disconnected",
  31. ????????????reason);
  32. ?
  33. ????????metrics->tasks_lost++;
  34. ?
  35. ????????metrics->incrementTasksStates(
  36. ????????????TASK_LOST,
  37. ????????????TaskStatus::SOURCE_MASTER,
  38. ????????????reason);
  39. ?
  40. ????????forward(update, UPID(), framework);
  41. ??????}
  42. ????}
  43. ?
  44. ????// Tell the allocator about the recovered resources.
  45. ????allocator->recoverResources(
  46. ????????frameworkId,
  47. ????????slaveId,
  48. ????????offeredResources,
  49. ????????None());
  50. ?
  51. ????return;
  52. ??}
  53. ?
  54. ??// Some offer operations update the offered resources. We keep
  55. ??// updated offered resources here. When a task is successfully
  56. ??// launched, we remove its resource from offered resources.
  57. ??Resources _offeredResources = offeredResources;
  58. ?
  59. ??// The order of `authorizations` must match the order of the operations in
  60. ??// `accept.operations()`, as they are iterated through simultaneously.
  61. ??CHECK_READY(_authorizations);
  62. ??list<Future<bool>> authorizations = _authorizations.get();
  63. ?
  64. ??foreach (const Offer::Operation& operation, accept.operations()) {
  65. ????switch (operation.type()) {
  66. ??????// The RESERVE operation allows a principal to reserve resources.
  67. ??????case Offer::Operation::RESERVE: {
  68. ????????Future<bool> authorization = authorizations.front();
  69. ????????authorizations.pop_front();
  70. ?
  71. ????????CHECK(!authorization.isDiscarded());
  72. ?
  73. ????????if (authorization.isFailed()) {
  74. ??????????// TODO(greggomann): We may want to retry this failed authorization
  75. ??????????// request rather than dropping it immediately.
  76. ??????????drop(framework,
  77. ???????????????operation,
  78. ???????????????"Authorization of principal ‘" + framework->info.principal() +
  79. ???????????????"‘ to reserve resources failed: " +
  80. ???????????????authorization.failure());
  81. ?
  82. ??????????continue;
  83. ????????} else if (!authorization.get()) {
  84. ??????????drop(framework,
  85. ???????????????operation,
  86. ???????????????"Not authorized to reserve resources as ‘" +
  87. ?????????????????framework->info.principal() + "");
  88. ?
  89. ??????????continue;
  90. ????????}
  91. ?
  92. ????????Option<string> principal = framework->info.has_principal()
  93. ??????????? framework->info.principal()
  94. ??????????: Option<string>::none();
  95. ?
  96. ????????// Make sure this reserve operation is valid.
  97. ????????Option<Error> error = validation::operation::validate(
  98. ????????????operation.reserve(), principal);
  99. ?
  100. ????????if (error.isSome()) {
  101. ??????????drop(framework, operation, error.get().message);
  102. ??????????continue;
  103. ????????}
  104. ?
  105. ????????// Test the given operation on the included resources.
  106. ????????Try<Resources> resources = _offeredResources.apply(operation);
  107. ????????if (resources.isError()) {
  108. ??????????drop(framework, operation, resources.error());
  109. ??????????continue;
  110. ????????}
  111. ?
  112. ????????_offeredResources = resources.get();
  113. ?
  114. ????????LOG(INFO) << "Applying RESERVE operation for resources "
  115. ??????????????????<< operation.reserve().resources() << " from framework "
  116. ??????????????????<< *framework << " to slave " << *slave;
  117. ?
  118. ????????apply(framework, slave, operation);
  119. ????????break;
  120. ??????}
  121. ?
  122. ??????// The UNRESERVE operation allows a principal to unreserve resources.
  123. ??????case Offer::Operation::UNRESERVE: {
  124. ????????Future<bool> authorization = authorizations.front();
  125. ????????authorizations.pop_front();
  126. ?
  127. ????????CHECK(!authorization.isDiscarded());
  128. ?
  129. ????????if (authorization.isFailed()) {
  130. ??????????// TODO(greggomann): We may want to retry this failed authorization
  131. ??????????// request rather than dropping it immediately.
  132. ??????????drop(framework,
  133. ???????????????operation,
  134. ???????????????"Authorization of principal ‘" + framework->info.principal() +
  135. ???????????????"‘ to unreserve resources failed: " +
  136. ???????????????authorization.failure());
  137. ?
  138. ??????????continue;
  139. ????????} else if (!authorization.get()) {
  140. ??????????drop(framework,
  141. ???????????????operation,
  142. ???????????????"Not authorized to unreserve resources as ‘" +
  143. ?????????????????framework->info.principal() + "");
  144. ?
  145. ??????????continue;
  146. ????????}
  147. ?
  148. ????????// Make sure this unreserve operation is valid.
  149. ????????Option<Error> error = validation::operation::validate(
  150. ????????????operation.unreserve());
  151. ?
  152. ????????if (error.isSome()) {
  153. ??????????drop(framework, operation, error.get().message);
  154. ??????????continue;
  155. ????????}
  156. ?
  157. ????????// Test the given operation on the included resources.
  158. ????????Try<Resources> resources = _offeredResources.apply(operation);
  159. ????????if (resources.isError()) {
  160. ??????????drop(framework, operation, resources.error());
  161. ??????????continue;
  162. ????????}
  163. ?
  164. ????????_offeredResources = resources.get();
  165. ?
  166. ????????LOG(INFO) << "Applying UNRESERVE operation for resources "
  167. ??????????????????<< operation.unreserve().resources() << " from framework "
  168. ??????????????????<< *framework << " to slave " << *slave;
  169. ?
  170. ????????apply(framework, slave, operation);
  171. ????????break;
  172. ??????}
  173. ?
  174. ??????case Offer::Operation::CREATE: {
  175. ????????Future<bool> authorization = authorizations.front();
  176. ????????authorizations.pop_front();
  177. ?
  178. ????????CHECK(!authorization.isDiscarded());
  179. ?
  180. ????????if (authorization.isFailed()) {
  181. ??????????// TODO(greggomann): We may want to retry this failed authorization
  182. ??????????// request rather than dropping it immediately.
  183. ??????????drop(framework,
  184. ???????????????operation,
  185. ???????????????"Authorization of principal ‘" + framework->info.principal() +
  186. ???????????????"‘ to create persistent volumes failed: " +
  187. ???????????????authorization.failure());
  188. ?
  189. ??????????continue;
  190. ????????} else if (!authorization.get()) {
  191. ??????????drop(framework,
  192. ???????????????operation,
  193. ???????????????"Not authorized to create persistent volumes as ‘" +
  194. ?????????????????framework->info.principal() + "");
  195. ?
  196. ??????????continue;
  197. ????????}
  198. ?
  199. ????????// Make sure this create operation is valid.
  200. ????????Option<Error> error = validation::operation::validate(
  201. ????????????operation.create(), slave->checkpointedResources);
  202. ?
  203. ????????if (error.isSome()) {
  204. ??????????drop(framework, operation, error.get().message);
  205. ??????????continue;
  206. ????????}
  207. ?
  208. ????????Try<Resources> resources = _offeredResources.apply(operation);
  209. ????????if (resources.isError()) {
  210. ??????????drop(framework, operation, resources.error());
  211. ??????????continue;
  212. ????????}
  213. ?
  214. ????????_offeredResources = resources.get();
  215. ?
  216. ????????LOG(INFO) << "Applying CREATE operation for volumes "
  217. ??????????????????<< operation.create().volumes() << " from framework "
  218. ??????????????????<< *framework << " to slave " << *slave;
  219. ?
  220. ????????apply(framework, slave, operation);
  221. ????????break;
  222. ??????}
  223. ?
  224. ??????case Offer::Operation::DESTROY: {
  225. ????????Future<bool> authorization = authorizations.front();
  226. ????????authorizations.pop_front();
  227. ?
  228. ????????CHECK(!authorization.isDiscarded());
  229. ?
  230. ????????if (authorization.isFailed()) {
  231. ??????????// TODO(greggomann): We may want to retry this failed authorization
  232. ??????????// request rather than dropping it immediately.
  233. ??????????drop(framework,
  234. ???????????????operation,
  235. ???????????????"Authorization of principal ‘" + framework->info.principal() +
  236. ???????????????"‘ to destroy persistent volumes failed: " +
  237. ???????????????authorization.failure());
  238. ?
  239. ??????????continue;
  240. ????????} else if (!authorization.get()) {
  241. ??????????drop(framework,
  242. ???????????????operation,
  243. ???????????????"Not authorized to destroy persistent volumes as ‘" +
  244. ?????????????????framework->info.principal() + "");
  245. ?
  246. ??????????continue;
  247. ????????}
  248. ?
  249. ????????// Make sure this destroy operation is valid.
  250. ????????Option<Error> error = validation::operation::validate(
  251. ????????????operation.destroy(), slave->checkpointedResources);
  252. ?
  253. ????????if (error.isSome()) {
  254. ??????????drop(framework, operation, error.get().message);
  255. ??????????continue;
  256. ????????}
  257. ?
  258. ????????Try<Resources> resources = _offeredResources.apply(operation);
  259. ????????if (resources.isError()) {
  260. ??????????drop(framework, operation, resources.error());
  261. ??????????continue;
  262. ????????}
  263. ?
  264. ????????_offeredResources = resources.get();
  265. ?
  266. ????????LOG(INFO) << "Applying DESTROY operation for volumes "
  267. ??????????????????<< operation.create().volumes() << " from framework "
  268. ??????????????????<< *framework << " to slave " << *slave;
  269. ?
  270. ????????apply(framework, slave, operation);
  271. ????????break;
  272. ??????}
  273. ?
  274. ??????case Offer::Operation::LAUNCH: {
  275. ????????foreach (const TaskInfo& task, operation.launch().task_infos()) {
  276. ??????????Future<bool> authorization = authorizations.front();
  277. ??????????authorizations.pop_front();
  278. ?
  279. ??????????// NOTE: The task will not be in ‘pendingTasks‘ if
  280. ??????????// ‘killTask()‘ for the task was called before we are here.
  281. ??????????// No need to launch the task if it‘s no longer pending.
  282. ??????????// However, we still need to check the authorization result
  283. ??????????// and do the validation so that we can send status update
  284. ??????????// in case the task has duplicated ID.
  285. ??????????bool pending = framework->pendingTasks.contains(task.task_id());
  286. ?
  287. ??????????// Remove from pending tasks.
  288. ??????????framework->pendingTasks.erase(task.task_id());
  289. ?
  290. ??????????CHECK(!authorization.isDiscarded());
  291. ?
  292. ??????????if (authorization.isFailed() || !authorization.get()) {
  293. ????????????string user = framework->info.user(); // Default user.
  294. ????????????if (task.has_command() && task.command().has_user()) {
  295. ??????????????user = task.command().user();
  296. ????????????} else if (task.has_executor() &&
  297. ???????????????????????task.executor().command().has_user()) {
  298. ??????????????user = task.executor().command().user();
  299. ????????????}
  300. ?
  301. ????????????const StatusUpdate& update = protobuf::createStatusUpdate(
  302. ????????????????framework->id(),
  303. ????????????????task.slave_id(),
  304. ????????????????task.task_id(),
  305. ????????????????TASK_ERROR,
  306. ????????????????TaskStatus::SOURCE_MASTER,
  307. ????????????????None(),
  308. ????????????????authorization.isFailed() ?
  309. ????????????????????"Authorization failure: " + authorization.failure() :
  310. ????????????????????"Not authorized to launch as user ‘" + user + "",
  311. ????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
  312. ?
  313. ????????????metrics->tasks_error++;
  314. ?
  315. ????????????metrics->incrementTasksStates(
  316. ????????????????TASK_ERROR,
  317. ????????????????TaskStatus::SOURCE_MASTER,
  318. ????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
  319. ?
  320. ????????????forward(update, UPID(), framework);
  321. ?
  322. ????????????continue;
  323. ??????????}
  324. ?
  325. ??????????// Validate the task.
  326. ?
  327. ??????????// Make a copy of the original task so that we can
  328. ??????????// fill the missing `framework_id` in ExecutorInfo
  329. ??????????// if needed. This field was added to the API later
  330. ??????????// and thus was made optional.
  331. ??????????TaskInfo task_(task);
  332. ??????????if (task.has_executor() && !task.executor().has_framework_id()) {
  333. ????????????task_.mutable_executor()
  334. ????????????????->mutable_framework_id()->CopyFrom(framework->id());
  335. ??????????}
  336. ?
  337. ??????????const Option<Error>& validationError = validation::task::validate(
  338. ??????????????task_,
  339. ??????????????framework,
  340. ??????????????slave,
  341. ??????????????_offeredResources);
  342. ?
  343. ??????????if (validationError.isSome()) {
  344. ????????????const StatusUpdate& update = protobuf::createStatusUpdate(
  345. ????????????????framework->id(),
  346. ????????????????task_.slave_id(),
  347. ????????????????task_.task_id(),
  348. ????????????????TASK_ERROR,
  349. ????????????????TaskStatus::SOURCE_MASTER,
  350. ????????????????None(),
  351. ????????????????validationError.get().message,
  352. ????????????????TaskStatus::REASON_TASK_INVALID);
  353. ?
  354. ????????????metrics->tasks_error++;
  355. ?
  356. ????????????metrics->incrementTasksStates(
  357. ????????????????TASK_ERROR,
  358. ????????????????TaskStatus::SOURCE_MASTER,
  359. ????????????????TaskStatus::REASON_TASK_INVALID);
  360. ?
  361. ????????????forward(update, UPID(), framework);
  362. ?
  363. ????????????continue;
  364. ??????????}
  365. ?
  366. ??????????// Add task.
  367. ??????????if (pending) {
  368. ????????????_offeredResources -= addTask(task_, framework, slave);
  369. ?
  370. ????????????// TODO(bmahler): Consider updating this log message to
  371. ????????????// indicate when the executor is also being launched.
  372. ????????????LOG(INFO) << "Launching task " << task_.task_id()
  373. ??????????????????????<< " of framework " << *framework
  374. ??????????????????????<< " with resources " << task_.resources()
  375. ??????????????????????<< " on slave " << *slave;
  376. ?
  377. ????????????RunTaskMessage message;
  378. ????????????message.mutable_framework()->MergeFrom(framework->info);
  379. ?
  380. ????????????// TODO(anand): We set ‘pid‘ to UPID() for http frameworks
  381. ????????????// as ‘pid‘ was made optional in 0.24.0. In 0.25.0, we
  382. ????????????// no longer have to set pid here for http frameworks.
  383. ????????????message.set_pid(framework->pid.getOrElse(UPID()));
  384. ????????????message.mutable_task()->MergeFrom(task_);
  385. ?
  386. ????????????if (HookManager::hooksAvailable()) {
  387. ??????????????// Set labels retrieved from label-decorator hooks.
  388. ??????????????message.mutable_task()->mutable_labels()->CopyFrom(
  389. ??????????????????HookManager::masterLaunchTaskLabelDecorator(
  390. ??????????????????????task_,
  391. ??????????????????????framework->info,
  392. ??????????????????????slave->info));
  393. ????????????}
  394. ?
  395. ????????????send(slave->pid, message);
  396. ??????????}
  397. ????????}
  398. ????????break;
  399. ??????}
  400. ?
  401. ??????default:
  402. ????????LOG(ERROR) << "Unsupported offer operation " << operation.type();
  403. ????????break;
  404. ????}
  405. ??}
  406. ?
  407. ??if (!_offeredResources.empty()) {
  408. ????// Tell the allocator about the unused (e.g., refused) resources.
  409. ????allocator->recoverResources(
  410. ????????frameworkId,
  411. ????????slaveId,
  412. ????????_offeredResources,
  413. ????????accept.filters());
  414. ??}
  415. }

?

Mesos-Master将RunTaskMessage消息发送给Mesos-Slave

Mesos源码分析(11): Mesos-Master接收到launchTasks消息

标签:

原文地址:http://www.cnblogs.com/popsuper1982/p/5724166.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!