标签:data enter mllib cat ado utils 图表 tput cli
1)配置好db.properties中相应用户名密码/数据库等参数; 2)第一次启动tomcat,修改hibernate.cfg.xml文件中的hibernate.hbm2ddl.auto值为create,第二次启动修改为update; 3) 打开集群参数页面,点击初始化,初始化集群参数,如果集群参数和当前集群不匹配,那么需要做相应修改; 暂时考虑使用配置文件的方式来配置集群参数,如果要调整为数据库配置,那么修改Utisl.dbOrFile参数即可;即,暂时只需修改utisl.properties文件; 4)拷贝Spark_MLlib_Algorithm_1.6.0工程生成的算法到到3)中spark.jar所在路径; 5)拷贝集群中的yarn-site.xml到3)中spark.files所在路径; 6)拷贝spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar到3)中spark.yarn.jar所在路径;
package com.fz.classification import com.fz.util.Utils import org.apache.spark.mllib.classification.{LogisticRegressionWithSGD, LogisticRegressionWithLBFGS} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.{SparkConf, SparkContext} /** * 逻辑回归封装算法 * Labels used in Logistic Regression should be {0, 1, ..., k - 1} for k classes multi-label classification problem * 输入参数: * testOrNot : 是否是测试,正常情况设置为false * input:输出数据; * minPartitions : 输入数据最小partition个数 * output:输出路径 * targetIndex:目标列所在下标,从1开始 * splitter:数据分隔符; * method:使用逻辑回归算法:"SGD" or "LBFGS" * hasIntercept : 是否具有截距 * numClasses: 目标列类别个数; * Created by fanzhe on 2016/12/19. */ object LogisticRegression { def main (args: Array[String]) { if(args.length != 9){ println("Usage: com.fz.classification.LogisticRegression testOrNot input minPartitions output targetIndex " + "splitter method hasIntercept numClasses") System.exit(-1) } val testOrNot = args(0).toBoolean // 是否是测试,sparkContext获取方式不一样, true 为test val input = args(1) val minPartitions = args(2).toInt val output = args(3) val targetIndex = args(4).toInt // 从1开始,不是从0开始要注意 val splitter = args(5) val method = args(6) //should be "SGD" or "LBFGS" val hasIntercept = args(7).toBoolean val numClasses = args(8).toInt val sc = Utils.getSparkContext(testOrNot,"Logistic Create Model") // construct data // Load and parse the data val training = Utils.getLabeledPointData(sc,input,minPartitions,splitter,targetIndex).cache() // Run training algorithm to build the model val model = method match { case "SGD" => new LogisticRegressionWithSGD() .setIntercept(hasIntercept) .run(training) case "LBFGS" => new LogisticRegressionWithLBFGS().setNumClasses(numClasses) .setIntercept(hasIntercept) .run(training) case _ => throw new RuntimeException("no method") } // save model model.save(sc,output) sc.stop() } }在上面的代码中,有对每个参数的解释,包括参数的含义,参数有哪些参数等;
package com.fz.classification import java.io.File import com.fz.util.Utils import org.junit.{Assert, Test} import Assert._ /** * 测试Logistics Regression算法 * Created by fanzhe on 2016/12/19. */ @Test class LogisticRegressionTest { @Test def testMain1()={ // testOrNot input output targetIndex splitter method hasIntercept numClasses val args = Array( "true", "./src/data/classification_regression/logistic.dat", "2", "./target/logistic/tmp1", "1", " ", "SGD", "true", "2" // this parameter is useless ) // 删除输出目录 Utils.deleteOutput(args(3)) LogisticRegression.main(args) assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000", "org.apache.spark.mllib.classification.LogisticRegressionModel")) } @Test def testMain2()={ // testOrNot input minPartitions output targetIndex splitter method hasIntercept numClasses val args = Array( "true", "./src/data/classification_regression/logistic.dat", "2", "./target/logistic/tmp2", "1", " ", "LBFGS", "true", "2" ) // 删除输出目录 Utils.deleteOutput(args(3)) LogisticRegression.main(args) assertTrue(Utils.fileContainsClassName(args(3)+"/metadata/part-00000", "org.apache.spark.mllib.classification.LogisticRegressionModel")) } }这里面的方法都是第一步先构建算法参数;接着调用main方法;第三步,查看输出中是否具有模型的相关信息;
1) 获取JobInfo中最新的records条记录; 2) 查找其中isFinished字段为false的数据; 3) 根据2)中查找的数据,去YARN获取其实时状态,并更新1)中的数据,然后存入数据库中; 4) 根据row和page字段分页返回JSON数据;其代码如下所示:
public void getJobInfo(){ Map<String ,Object> jsonMap = new HashMap<String,Object>(); // 1. List<Object> jobInfos = dBService.getLastNRows("JobInfo","jobId",true,records); // 2,3 List<Object> list = null; try { list = HUtils.updateJobInfo(jobInfos); if(list != null || list.size()>0) { dBService.updateTableData(list); } }catch (Exception e){ e.printStackTrace(); log.warn("更新任务状态异常!"); jsonMap.put("total", 0); jsonMap.put("rows", null); Utils.write2PrintWriter(JSON.toJSONString(jsonMap)); return ; } // 4. jsonMap.put("total",list.size()); jsonMap.put("rows",Utils.getSubList(list,page,rows)); Utils.write2PrintWriter(JSON.toJSONString(jsonMap)); }第一步通过dBService获取给定records个记录;第二步则更新这些记录;看下HUtils.updateJobInfo的实现:
public static List<Object> updateJobInfo(List<Object> jobInfos)throws YarnException,IOException{ List<Object> list = new ArrayList<>(); JobInfo jobInfo; for(Object o :jobInfos){ jobInfo = (JobInfo) o; if(!jobInfo.isFinished()){ // 如果没有完成,则检查其最新状态 ApplicationReport appReport=null; try { appReport = getClient().getApplicationReport(SparkUtils.getAppId(jobInfo.getJobId())); } catch (YarnException | IOException e) { e.printStackTrace(); throw e; } /** * NEW, 0 NEW_SAVING, 1 SUBMITTED, 2 ACCEPTED, 3 RUNNING, 4 FINISHED, 5 FAILED, 6 KILLED; 7 */ switch (appReport.getYarnApplicationState().ordinal()){ case 0 | 1 | 2 |3 : // 都更新为Accepted状态 jobInfo.setRunState(JobState.ACCETPED); break; case 4 : jobInfo.setRunState(JobState.RUNNING);break; case 5: // UNDEFINED, // SUCCEEDED, // FAILED, // KILLED; switch (appReport.getFinalApplicationStatus().ordinal()){ case 1: jobInfo.setRunState(JobState.SUCCESSED); SparkUtils.cleanupStagingDir(jobInfo.getJobId()); jobInfo.setFinished(true);break; case 2: jobInfo.setRunState(JobState.FAILED); SparkUtils.cleanupStagingDir(jobInfo.getJobId()); jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH)); jobInfo.setFinished(true);break; case 3: jobInfo.setRunState(JobState.KILLED); SparkUtils.cleanupStagingDir(jobInfo.getJobId()); jobInfo.setFinished(true);break; default: log.warn("App:" + jobInfo.getJobId() + "获取任务状态异常! " + "appReport.getFinalApplicationStatus():"+appReport.getFinalApplicationStatus().name() +",ordinal:"+ appReport.getFinalApplicationStatus().ordinal()); } break; case 6: jobInfo.setRunState(JobState.FAILED); SparkUtils.cleanupStagingDir(jobInfo.getJobId()); jobInfo.setErrorInfo(appReport.getDiagnostics().substring(0,Utils.EXCEPTIONMESSAGELENGTH)); jobInfo.setFinished(true);break; case 7: jobInfo.setRunState(JobState.KILLED); SparkUtils.cleanupStagingDir(jobInfo.getJobId()); jobInfo.setFinished(true);break; default: log.warn("App:" + jobInfo.getJobId() + "获取任务状态异常!"+ "appReport.getYarnApplicationState():"+appReport.getYarnApplicationState().name() +",ordinal:"+ appReport.getYarnApplicationState().ordinal()); } jobInfo.setModifiedTime(new Date()); } list.add(jobInfo);// 把更新后的或原始的JobInfo添加到list中 } return list; }这里的工作就是根据数据库中任务的状态,只查询任务没有完成的任务的最新状态,并更新原始任务状态,最后把更新后的或者原始任务添加到list中,并返回;
1)编写src/main/java/下算法对应的Thread; 2)编写webapp下的对应页面; 3)编写webapp/js下对应的js; 4)修改webapp/preprocess/upload.jsp,添加一条数据上传记录,并在main/data下添加对应的数据; 5)启动工程,在页面上传数据,然后选择算法,设置参数,即可提交任务,提交任务后在监控界面即可看到算法运行状态;
标签:data enter mllib cat ado utils 图表 tput cli
原文地址:http://blog.csdn.net/fansy1990/article/details/62238078