标签:
一、说明
有时候我们在提交任务以后,能够获取一个MapReduce任务的ID,一般为Job_**********_xxxx的组合,下面将介绍如何获取JobID,与通过其他程序与JOBID停止一个正在运行的任务。
二、流程
1、提交任务并获取ID值。
通常情况下,我们进行远程提交时,都会使用job.waitForCompletion(true);函数去提交一个任务并且在eclipse中远程监听任务的执行情况。但是如果使用job.submit()方法后,则只是将任务提交到集群结束,即不在远程坚挺集群上任务执行的情况。
job.submit(); System.out.println(job.getJobID());
通过job.submit()方法,我们可以在提交任务后,直接结束客户端的提交行为。但是job在提交到集群前的相关数据已经封装到了job对象中。因此我们可以通过job.getJobID()获取刚刚提交的任务的ID值。
2、通过jobid停止任务
想要通过jobid停止任务,则需要重新获取集群(cluster)上的任务的示例,因此需要通过JobClient去完成相关的任务。通过下述代码可以完成对集群上正在运行任务的ID值:
package mr; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobStatus; public class MRKillJob { public static void main(String[] args) { MRKillJob test=new MRKillJob(); test.killJob("job_1461739723866_0094"); } /** * @author wozipa * @Date 2016-6-9 15:02 * @see 删除某一个任务 * @param jobId */ public void killJob(String id) { Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop1:9000"); conf.set("yarn.resourcemanager.address", "hadoop1:8032"); conf.set("mapreduce.jobhistory.address", "192.98.12.234:10020"); conf.set("yarn.resourcemanager.scheduler.address", "hadoop1:8030"); conf.set("mapreduce.framework.name", "yarn"); conf.set("mapreduce.app-submission.cross-platform", "true"); try { JobClient client=new JobClient(new InetSocketAddress("192.98.12.234",8032), conf); RunningJob job=client.getJob(id); System.out.println(job.setupProgress()); System.out.println(job.cleanupProgress()); System.out.println(job.mapProgress()); System.out.println(job.reduceProgress()); System.out.println(job.getJobName()); System.out.println(job.getJobState()); System.out.println(job.isComplete()); System.out.println(job.getFailureInfo()); System.out.println(job.getHistoryUrl().toString()); System.out.println(job.getID()); JobStatus status=job.getJobStatus(); System.out.println(status.toString()); System.out.println(job.getTrackingURL()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
在Hadoop2.x版本中,有两种执行MapReduce的方式,MR V1与YARN。因为在这里我的集群使用的YARN的模式(mapreduce.framework.name=yarn);因此任务的调度都是有YARN中的ResourceManager进程完成,因此想要获取JobClienti对象与集群的连接,就需要与ResourceMnager进行进行交互。因此在创建JobClient对象时的网络位置写的是ResouceManager进程的IP地址与端口号。
并且conf中需要制定ResourceManager进程的位置(id+port),因此这里照搬了任务远程提交时的配置。
当获取JobClient对象后,可以通过
RunningJob job=client.getJob(id);函数获取相应正在执行的任务句柄,然后就可以相应的再行执行的任务了。
在测试过程中发现,当任务执行完成后,jobclient对象会自动去Job History Server上获取任务的执行信息。如果没有早conf中指定history的地址的话,jobclient会去本地0.0.0.0:10020位置上去寻找历史服务器,因此需要在conf中指定历史服务器的位置。因此可知为何在conf中设置resoucemanager的ip+port了。
如果需要停止任务的话,则使用
job.killJob();
就可以完成对任务的终止了。
RunningJob对象中也含有任务执行过程中的其他信息,可以通过job对象中的方法进行获取,例如setupProgress属性是指setup程序完成的百分比,一次类推就知道mapProgress、reduceProgress、cleanupProgress的含义了。其他属性用户可以自己去进行测试。
通过MapReduce JobID 停止(kill)指定任务
标签:
原文地址:http://blog.csdn.net/u011518678/article/details/51356210