标签:des style blog http java color
软件版本:
windows7: Tomcat7、JDK7、Spring4.0.2、Struts2.3、Hibernate4.3、myeclipse10.0、easyui;Linux(centos6.5):Hadoop2.4、Mahout1.0、JDK7;
使用Web工程调用Mahout的相关算法,提供监控,查看任务的执行状态。
自建Web项目,项目首页如下:
[root@node33 data]# jps 6033 NodeManager 5543 NameNode 5629 DataNode 5942 ResourceManager 41611 Jps 5800 SecondaryNameNode 6412 JobHistoryServer
<property> <name>mapreduce.jobhistory.address</name> <value>node33:10020</value> <description>MapReduce JobHistory Server IPC host:port</description> </property>yarn-default.xml:
<name>yarn.application.classpath</name> <value> $HADOOP_CONF_DIR, $HADOOP_COMMON_HOME/share/hadoop/common/*, $HADOOP_COMMON_HOME/share/hadoop/common/lib/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/*, $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*, $HADOOP_YARN_HOME/share/hadoop/yarn/*, $HADOOP_YARN_HOME/share/hadoop/yarn/lib/* </value>
<name>yarn.resourcemanager.hostname</name> <value>node33</value>注意classpath的路径是集群的相应路径;
mvn -Dhadoop2.version=2.4.1 -DskipTests clean install)导入的包有:
<Context path ="/mh" docBase ="D:\workspase\hadoop_hbase\MahoutAlgorithmPlatform2.1\WebRoot" privileged ="true" reloadable ="false" > </Context>
public int checkConnection(String fsStr,String rm) throws IOException{ Configuration conf = new Configuration(); conf.set("fs.defaultFS", fsStr); conf.set("yarn.resourcemanager.address", rm); conf.set("mapreduce.framework.name", "yarn"); FileSystem fs = FileSystem.get(conf); boolean fsOnline=fs.exists(new Path("/")); if(!fsOnline){ return 1; } JobClient jc = new JobClient(conf); ClusterStatus cs = jc.getClusterStatus(); if(!"RUNNING".equals(cs.getJobTrackerStatus().toString())){ return 0; } // 集群验证成功 HadoopUtils.setConf(conf); HadoopUtils.setFs(fs); // 通过判断Hadoop.getConf()是否为null来确定是否已经配置过集群 return 3; }主要通过两个方面:1、检查HDFS文件;2、检查集群状态是否是running;
public static void initialCurrentJobs(int nextJobNum) throws IOException{ /*if(list!=null&&list.size()==10){ list.clear(); }*/ list.clear(); // 清空上次遗留 JobStatus[] jbs=getJc().getAllJobs(); JobID jID = findLastJob(jbs).getJobID(); if(jID==null){ // the first time start the cluster , will be fixed next time // TODO fix the bug log.info("The cluster is started before and not running any job !!!"); } log.info("The last job id is :{}", jID.toString()); for(int i=1;i<=nextJobNum;i++){ CurrentJobInfo cj = new CurrentJobInfo(); cj.setJobId(new JobID(jID.getJtIdentifier(),jID.getId()+i)); list.add(cj); } }这里需要注意的是,如果集群是第一次启动,且没有运行MR任务的话,那么获取的任务ID为空,无法初始化(这个在下个版本修复);
public static List<CurrentJobInfo> getCurrentJobs() throws IOException{ for(int i=0;i<list.size();i++){ CurrentJobInfo iJob = list.get(i); RunningJob runningJob =findGivenJob(iJob.getJobId().toString()); if(runningJob==null){ break; } if(i==list.size()-1){ // 放在设置的前面 finished=runningJob.isComplete(); } iJob.setJobName(runningJob.getJobName()); iJob.setJobIdStr(runningJob.getJobStatus().getJobID().toString()); iJob.setMapProgress(Utils.toPercent(runningJob.mapProgress(),2)); iJob.setRedProgress(Utils.toPercent(runningJob.reduceProgress(), 2)); iJob.setState(JobStatus.getJobRunState(runningJob.getJobState())); // 有时map和reduce都到1时,此值仍是Running,需处理 } return list; }获取到任务信息后,在任务监控界面就可以监控到任务的运行状态。
/** * 读取聚类中心向量 * @param conf * @param centerPathDir * @return * @throws IOException */ public static String readCenter(Configuration conf,String centerPathDir) throws IOException{ StringBuffer buff = new StringBuffer(); Path input = new Path(centerPathDir, "part-*"); if(!HadoopUtils.getFs().exists(input)){ return input+" not exist ,please check the input"; } for(ClusterWritable cl:new SequenceFileDirValueIterable<ClusterWritable>(input, PathType.GLOB, conf)){ buff.append(cl.getValue().asFormatString(null)).append("\n"); } return buff.toString(); }
Mahout算法调用展示平台2.1,布布扣,bubuko.com
标签:des style blog http java color
原文地址:http://blog.csdn.net/fansy1990/article/details/37339079