标签:
项目使用软件:Myeclipse10.0,JDK1.7,Hadoop2.6,MySQL5.6,EasyUI1.3.6,jQuery2.0,Spring4.1.3,
Hibernate4.3.1,struts2.3.1,Tomcat7 ,Maven3.2.1。
项目下载地址:https://github.com/fansy1990/friend_find ,项目部署参考:http://blog.csdn.net/fansy1990/article/details/46481409 。
public interface ObjectInterface {
/**
* 不用每个表都建立一个方法,这里根据表名自动装配
* @param map
* @return
*/
public Object setObjectByMap(Map<String,Object> map);
}/**
* 更新或者插入表
* 不用每个表都建立一个方法,这里根据表名自动装配
* @param tableName
* @param json
* @return
*/
public boolean updateOrSave(String tableName,String json){
try{
// 根据表名获得实体类,并赋值
Object o = Utils.getEntity(Utils.getEntityPackages(tableName),json);
baseDao.saveOrUpdate(o);
log.info("保存表{}!",new Object[]{tableName});
}catch(Exception e){
e.printStackTrace();
return false;
}
return true;
}/**
* 根据类名获得实体类
* @param tableName
* @param json
* @return
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
*/
@SuppressWarnings("unchecked")
public static Object getEntity(String tableName, String json) throws ClassNotFoundException, InstantiationException, IllegalAccessException, JsonParseException, JsonMappingException, IOException {
Class<?> cl = Class.forName(tableName);
ObjectInterface o = (ObjectInterface)cl.newInstance();
Map<String,Object> map = new HashMap<String,Object>();
ObjectMapper mapper = new ObjectMapper();
try {
//convert JSON string to Map
map = mapper.readValue(json, Map.class);
return o.setObjectByMap(map);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}fs.copyFromLocalFile(src, dst);
// =====uploadId,数据上传button绑定 click方法
$(‘#uploadId‘).bind(‘click‘, function(){
var input_i=$(‘#localFileId‘).val();
// 弹出进度框
popupProgressbar(‘数据上传‘,‘数据上传中...‘,1000);
// ajax 异步提交任务
callByAJax(‘cloud/cloud_upload.action‘,{input:input_i});
});其中调用aJax使用一个封装的方法,以后都可以调用,如下:// 调用ajax异步提交
// 任务返回成功,则提示成功,否则提示失败的信息
function callByAJax(url,data_){
$.ajax({
url : url,
data: data_,
async:true,
dataType:"json",
context : document.body,
success : function(data) {
// $.messager.progress(‘close‘);
closeProgressbar();
console.info("data.flag:"+data.flag);
var retMsg;
if("true"==data.flag){
retMsg=‘操作成功!‘;
}else{
retMsg=‘操作失败!失败原因:‘+data.msg;
}
$.messager.show({
title : ‘提示‘,
msg : retMsg
});
if("true"==data.flag&&"true"==data.monitor){// 添加监控页面
// 使用单独Tab的方式
layout_center_addTabFun({
title : ‘MR算法监控‘,
closable : true,
// iconCls : node.iconCls,
href : ‘cluster/monitor_one.jsp‘
});
}
}
});
}后台返回的是json数据,并且这里为了和云平台监控任务兼容(考虑通用性),这里还添加了一个打开监控的代码。/**
* 去重任务提交
*/
public void deduplicate(){
Map<String ,Object> map = new HashMap<String,Object>();
try{
HUtils.setJobStartTime(System.currentTimeMillis()-10000);
HUtils.JOBNUM=1;
new Thread(new Deduplicate(input,output)).start();
map.put("flag", "true");
map.put("monitor", "true");
} catch (Exception e) {
e.printStackTrace();
map.put("flag", "false");
map.put("monitor", "false");
map.put("msg", e.getMessage());
}
Utils.write2PrintWriter(JSON.toJSONString(map));
}首先设置全部任务的起始时间,这里往前推迟了10s,是为了防止时间相差太大(也可以设置2s左右,如果tomcat所在机器和集群机器时间一样则不用设置);接着设置任务的总个数;最后启动多线程运行MR任务。<script type="text/javascript">
// 自动定时刷新 1s
var monitor_cf_interval= setInterval("monitor_one_refresh()",3000);
</script>function monitor_one_refresh(){
$.ajax({ // ajax提交
url : ‘cloud/cloud_monitorone.action‘,
dataType : "json",
success : function(data) {
if (data.finished == ‘error‘) {// 获取信息错误 ,返回数据设置为0,否则正常返回
clearInterval(monitor_cf_interval);
setJobInfoValues(data);
console.info("monitor,finished:"+data.finished);
$.messager.show({
title : ‘提示‘,
msg : ‘任务运行失败!‘
});
} else if(data.finished == ‘true‘){
// 所有任务运行成功则停止timer
console.info(‘monitor,data.finished=‘+data.finished);
setJobInfoValues(data);
clearInterval(monitor_cf_interval);
$.messager.show({
title : ‘提示‘,
msg : ‘所有任务成功运行完成!‘
});
}else{
// 设置提示,并更改页面数据,多行显示job任务信息
setJobInfoValues(data);
}
}
});
}/**
* 单个任务监控
* @throws IOException
*/
public void monitorone() throws IOException{
Map<String ,Object> jsonMap = new HashMap<String,Object>();
List<CurrentJobInfo> currJobList =null;
try{
currJobList= HUtils.getJobs();
// jsonMap.put("rows", currJobList);// 放入数据
jsonMap.put("jobnums", HUtils.JOBNUM);
// 任务完成的标识是获取的任务个数必须等于jobNum,同时最后一个job完成
// true 所有任务完成
// false 任务正在运行
// error 某一个任务运行失败,则不再监控
if(currJobList.size()>=HUtils.JOBNUM){// 如果返回的list有JOBNUM个,那么才可能完成任务
if("success".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
jsonMap.put("finished", "true");
// 运行完成,初始化时间点
HUtils.setJobStartTime(System.currentTimeMillis());
}else if("running".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
jsonMap.put("finished", "false");
}else{// fail 或者kill则设置为error
jsonMap.put("finished", "error");
HUtils.setJobStartTime(System.currentTimeMillis());
}
}else if(currJobList.size()>0){
if("fail".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))||
"kill".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
jsonMap.put("finished", "error");
HUtils.setJobStartTime(System.currentTimeMillis());
}else{
jsonMap.put("finished", "false");
}
}
if(currJobList.size()==0){
jsonMap.put("finished", "false");
// return ;
}else{
if(jsonMap.get("finished").equals("error")){
CurrentJobInfo cj =currJobList.get(currJobList.size()-1);
cj.setRunState("Error!");
jsonMap.put("rows", cj);
}else{
jsonMap.put("rows", currJobList.get(currJobList.size()-1));
}
}
jsonMap.put("currjob", currJobList.size());
}catch(Exception e){
e.printStackTrace();
jsonMap.put("finished", "error");
HUtils.setJobStartTime(System.currentTimeMillis());
}
System.out.println(new java.util.Date()+":"+JSON.toJSONString(jsonMap));
Utils.write2PrintWriter(JSON.toJSONString(jsonMap));// 使用JSON数据传输
return ;
}/**
* 根据时间来判断,然后获得Job的状态,以此来进行监控 Job的启动时间和使用system.currentTimeMillis获得的时间是一致的,
* 不存在时区不同的问题;
*
* @return
* @throws IOException
*/
public static List<CurrentJobInfo> getJobs() throws IOException {
JobStatus[] jss = getJobClient().getAllJobs();
List<CurrentJobInfo> jsList = new ArrayList<CurrentJobInfo>();
jsList.clear();
for (JobStatus js : jss) {
if (js.getStartTime() > jobStartTime) {
jsList.add(new CurrentJobInfo(getJobClient().getJob(
js.getJobID()), js.getStartTime(), js.getRunState()));
}
}
Collections.sort(jsList);
return jsList;
}当有多个任务时,使用此监控也是可以的,只用设置HUtils.JOBNUM的值即可。fs.copyToLocalFile(false, file.getPath(), new Path(dst, "hdfs_" + (i++) + HUtils.DOWNLOAD_EXTENSION), true);4.数据入库
/**
* 批量插入xmlPath数据
* @param xmlPath
* @return
*/
public Map<String,Object> insertUserData(String xmlPath){
Map<String,Object> map = new HashMap<String,Object>();
try{
baseDao.executeHql("delete UserData");
// if(!Utils.changeDat2Xml(xmlPath)){
// map.put("flag", "false");
// map.put("msg", "HDFS文件转为xml失败");
// return map;
// }
// List<String[]> strings= Utils.parseXmlFolder2StrArr(xmlPath);
// ---解析不使用xml解析,直接使用定制解析即可
//---
List<String[]>strings = Utils.parseDatFolder2StrArr(xmlPath);
List<Object> uds = new ArrayList<Object>();
for(String[] s:strings){
uds.add(new UserData(s));
}
int ret =baseDao.saveBatch(uds);
log.info("用户表批量插入了{}条记录!",ret);
}catch(Exception e){
e.printStackTrace();
map.put("flag", "false");
map.put("msg", e.getMessage());
return map;
}
map.put("flag", "true");
return map;
}public Integer saveBatch(List<Object> lists) {
Session session = this.getCurrentSession();
// org.hibernate.Transaction tx = session.beginTransaction();
int i=0;
try{
for ( Object l:lists) {
i++;
session.save(l);
if( i % 50 == 0 ) { // Same as the JDBC batch size
//flush a batch of inserts and release memory:
session.flush();
session.clear();
if(i%1000==0){
System.out.println(new java.util.Date()+":已经预插入了"+i+"条记录...");
}
}
}}catch(Exception e){
e.printStackTrace();
}
// tx.commit();
// session.close();
Utils.simpleLog("插入数据数为:"+i);
return i;
}private static boolean db2hdfs(List<Object> list, Path path) throws IOException {
boolean flag =false;
int recordNum=0;
SequenceFile.Writer writer = null;
Configuration conf = getConf();
try {
Option optPath = SequenceFile.Writer.file(path);
Option optKey = SequenceFile.Writer
.keyClass(IntWritable.class);
Option optVal = SequenceFile.Writer.valueClass(DoubleArrIntWritable.class);
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal);
DoubleArrIntWritable dVal = new DoubleArrIntWritable();
IntWritable dKey = new IntWritable();
for (Object user : list) {
if(!checkUser(user)){
continue; // 不符合规则
}
dVal.setValue(getDoubleArr(user),-1);
dKey.set(getIntVal(user));
writer.append(dKey, dVal);// 用户id,<type,用户的有效向量 >// 后面执行分类的时候需要统一格式,所以这里需要反过来
recordNum++;
}
} catch (IOException e) {
Utils.simpleLog("db2HDFS失败,+hdfs file:"+path.toString());
e.printStackTrace();
flag =false;
throw e;
} finally {
IOUtils.closeStream(writer);
}
flag=true;
Utils.simpleLog("db2HDFS 完成,hdfs file:"+path.toString()+",records:"+recordNum);
return flag;
}public void map(IntWritable key,DoubleArrIntWritable value,Context cxt)throws InterruptedException,IOException{
cxt.getCounter(FilterCounter.MAP_COUNTER).increment(1L);
if(cxt.getCounter(FilterCounter.MAP_COUNTER).getValue()%3000==0){
log.info("Map处理了{}条记录...",cxt.getCounter(FilterCounter.MAP_COUNTER).getValue());
log.info("Map生成了{}条记录...",cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).getValue());
}
Configuration conf = cxt.getConfiguration();
SequenceFile.Reader reader = null;
FileStatus[] fss=input.getFileSystem(conf).listStatus(input);
for(FileStatus f:fss){
if(!f.toString().contains("part")){
continue; // 排除其他文件
}
try {
reader = new SequenceFile.Reader(conf, Reader.file(f.getPath()),
Reader.bufferSize(4096), Reader.start(0));
IntWritable dKey = (IntWritable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
DoubleArrIntWritable dVal = (DoubleArrIntWritable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
while (reader.next(dKey, dVal)) {// 循环读取文件
// 当前IntWritable需要小于给定的dKey
if(key.get()<dKey.get()){
cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).increment(1L);
double dis= HUtils.getDistance(value.getDoubleArr(), dVal.getDoubleArr());
newKey.set(dis);
newValue.setValue(key.get(), dKey.get());
cxt.write(newKey, newValue);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(reader);
}
}
}Reducer的reduce函数直接输出:public void reduce(DoubleWritable key,Iterable<IntPairWritable> values,Context cxt)throws InterruptedException,IOException{
for(IntPairWritable v:values){
cxt.getCounter(FilterCounter.REDUCE_COUNTER).increment(1);
cxt.write(key, v);
}
}/**
* 根据给定的阈值百分比返回阈值
*
* @param percent
* 一般为1~2%
* @return
*/
public static double findInitDC(double percent, String path,long iNPUT_RECORDS2) {
Path input = null;
if (path == null) {
input = new Path(HUtils.getHDFSPath(HUtils.FILTER_CALDISTANCE
+ "/part-r-00000"));
} else {
input = new Path(HUtils.getHDFSPath(path + "/part-r-00000"));
}
Configuration conf = HUtils.getConf();
SequenceFile.Reader reader = null;
long counter = 0;
long percent_ = (long) (percent * iNPUT_RECORDS2);
try {
reader = new SequenceFile.Reader(conf, Reader.file(input),
Reader.bufferSize(4096), Reader.start(0));
DoubleWritable dkey = (DoubleWritable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
Writable dvalue = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
while (reader.next(dkey, dvalue)) {// 循环读取文件
counter++;
if(counter%1000==0){
Utils.simpleLog("读取了"+counter+"条记录。。。");
}
if (counter >= percent_) {
HUtils.DELTA_DC = dkey.get();// 赋予最佳DC阈值
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(reader);
}
return HUtils.DELTA_DC;
}
/** * Find the local density of every point vector * * 输入为 <key,value>--> <distance,<id_i,id_j>> * <距离,<向量i编号,向量j编号>> * * Mapper: * 输出向量i编号,1 * 向量j编号,1 * Reducer: * 输出 * 向量i编号,局部密度 * 有些向量是没有局部密度的,当某个向量距离其他点的距离全部都大于给定阈值dc时就会发生 * @author fansy * @date 2015-7-3 */Mapper的逻辑如下:
/** * 输入为<距离d_ij,<向量i编号,向量j编号>> * 根据距离dc阈值判断距离d_ij是否小于dc,符合要求则 * 输出 * 向量i编号,1 * 向量j编号,1 * @author fansy * @date 2015-7-3 */map函数:
public void map(DoubleWritable key,IntPairWritable value,Context cxt)throws InterruptedException,IOException{
double distance= key.get();
if(method.equals("gaussian")){
one.set(Math.pow(Math.E, -(distance/dc)*(distance/dc)));
}
if(distance<dc){
vectorId.set(value.getFirst());
cxt.write(vectorId, one);
vectorId.set(value.getSecond());
cxt.write(vectorId, one);
}
}这里的密度有两种计算方式,根据前台传入的参数选择不同的算法即可,这里默认使用的cut-off,即局部密度有一个点则局部密度加1;public void reduce(IntWritable key, Iterable<DoubleWritable> values,Context cxt)
throws IOException,InterruptedException{
double sum =0;
for(DoubleWritable v:values){
sum+=v.get();
}
sumAll.set(sum);//
cxt.write(key, sumAll);
Utils.simpleLog("vectorI:"+key.get()+",density:"+sumAll);
}2)最小距离MR/** * find delta distance of every point * 寻找大于自身密度的最小其他向量的距离 * mapper输入: * 输入为<距离d_ij,<向量i编号,向量j编号>> * 把LocalDensityJob的输出 * i,density_i * 放入一个map中,用于在mapper中进行判断两个局部密度的大小以决定是否输出 * mapper输出: * i,<density_i,min_distance_j> * IntWritable,DoublePairWritable * reducer 输出: * <density_i*min_distancd_j> <density_i,min_distance_j,i> * DoubleWritable, IntDoublePairWritable * @author fansy * @date 2015-7-3 */这里reducer输出为每个点(即每个用户)局部密度和最小距离的乘积,一种方式寻找聚类中心个数的方法就是把这个乘积从大到小排序,并把这些点画折线图,看其斜率变化最大的点,取前面点的个数即为聚类中心个数。
/**
*
*/
package com.fz.fastcluster.keytype;
/**
* 自定义DoubleWritable
* 修改其排序方式,
* 从大到小排列
* @author fansy
* @date 2015-7-3
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* Writable for Double values.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class CustomDoubleWritable implements WritableComparable<CustomDoubleWritable> {
private double value = 0.0;
public CustomDoubleWritable() {
}
public CustomDoubleWritable(double value) {
set(value);
}
@Override
public void readFields(DataInput in) throws IOException {
value = in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(value);
}
public void set(double value) { this.value = value; }
public double get() { return value; }
/**
* Returns true iff <code>o</code> is a DoubleWritable with the same value.
*/
@Override
public boolean equals(Object o) {
if (!(o instanceof CustomDoubleWritable)) {
return false;
}
CustomDoubleWritable other = (CustomDoubleWritable)o;
return this.value == other.value;
}
@Override
public int hashCode() {
return (int)Double.doubleToLongBits(value);
}
@Override
public int compareTo(CustomDoubleWritable o) {// 修改这里即可
return (value < o.value ? 1 : (value == o.value ? 0 : -1));
}
@Override
public String toString() {
return Double.toString(value);
}
/** A Comparator optimized for DoubleWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(CustomDoubleWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
double thisValue = readDouble(b1, s1);
double thatValue = readDouble(b2, s2);
return (thisValue < thatValue ? 1 : (thisValue == thatValue ? 0 : -1));
}
}
static { // register this comparator
WritableComparator.define(CustomDoubleWritable.class, new Comparator());
}
}
/**
* 根据给定的阈值寻找聚类中心向量,并写入hdfs
* 非MR任务,不需要监控,注意返回值
*/
public void center2hdfs(){
// localfile:method
// 1. 读取SortJob的输出,获取前面k条记录中的大于局部密度和最小距离阈值的id;
// 2. 根据id,找到每个id对应的记录;
// 3. 把记录转为double[] ;
// 4. 把向量写入hdfs
// 5. 把向量写入本地文件中,方便后面的查看
Map<String,Object> retMap=new HashMap<String,Object>();
Map<Object,Object> firstK =null;
List<Integer> ids= null;
List<UserData> users=null;
try{
firstK=HUtils.readSeq(input==null?HUtils.SORTOUTPUT+"/part-r-00000":input,
100);// 这里默认使用 前100条记录
ids=HUtils.getCentIds(firstK,numReducerDensity,numReducerDistance);
// 2
users = dBService.getTableData("UserData",ids);
Utils.simpleLog("聚类中心向量有"+users.size()+"个!");
// 3,4,5
HUtils.writecenter2hdfs(users,method,output);
}catch(Exception e){
e.printStackTrace();
retMap.put("flag", "false");
retMap.put("msg", e.getMessage());
Utils.write2PrintWriter(JSON.toJSONString(retMap));
return ;
}
retMap.put("flag", "true");
Utils.write2PrintWriter(JSON.toJSONString(retMap));
return ;
}写入HDFS和本地的聚类中心如下:public void map(IntWritable key,DoubleArrIntWritable value,Context cxt){
double[] inputI= value.getDoubleArr();
// hdfs
Configuration conf = cxt.getConfiguration();
FileSystem fs = null;
Path path = null;
SequenceFile.Reader reader = null;
try {
fs = FileSystem.get(conf);
// read all before center files
String parentFolder =null;
double smallDistance = Double.MAX_VALUE;
int smallDistanceType=-1;
double distance;
// if iter_i !=0,then start i with 1,else start with 0
for(int i=start;i<iter_i;i++){// all files are clustered points
parentFolder=HUtils.CENTERPATH+"/iter_"+i+"/clustered";
RemoteIterator<LocatedFileStatus> files=fs.listFiles(new Path(parentFolder), false);
while(files.hasNext()){
path = files.next().getPath();
if(!path.toString().contains("part")){
continue; // return
}
reader = new SequenceFile.Reader(conf, Reader.file(path),
Reader.bufferSize(4096), Reader.start(0));
IntWritable dkey = (IntWritable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
DoubleArrIntWritable dvalue = (DoubleArrIntWritable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
while (reader.next(dkey, dvalue)) {// read file literally
distance = HUtils.getDistance(inputI, dvalue.getDoubleArr());
if(distance>dc){// not count the farest point
continue;
}
// 这里只要找到离的最近的点并且其distance<=dc 即可,把这个点的type赋值给当前值即可
if(distance<smallDistance){
smallDistance=distance;
smallDistanceType=dvalue.getIdentifier();
}
}// while
}// while
}// for
vectorI.set(key.get());// 用户id
typeDoubleArr.setValue(inputI,smallDistanceType);
if(smallDistanceType!=-1){
log.info("clustered-->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()});
cxt.getCounter(ClusterCounter.CLUSTERED).increment(1);
out.write("clustered", vectorI, typeDoubleArr,"clustered/part");
}else{
log.info("unclustered---->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()});
cxt.getCounter(ClusterCounter.UNCLUSTERED).increment(1);
out.write("unclustered", vectorI, typeDoubleArr,"unclustered/part");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(reader);
}
}public void run() {
input=input==null?HUtils.FILTER_PREPAREVECTORS:input;
// 删除iter_i(i>0)的所有文件
try {
HUtils.clearCenter((output==null?HUtils.CENTERPATH:output));
} catch (FileNotFoundException e2) {
e2.printStackTrace();
} catch (IOException e2) {
e2.printStackTrace();
}
output=output==null?HUtils.CENTERPATHPREFIX:output+"/iter_";
// 加一个操作,把/user/root/preparevectors里面的数据复制到/user/root/_center/iter_0/unclustered里面
HUtils.copy(input,output+"0/unclustered");
try {
Thread.sleep(200);// 暂停200ms
} catch (InterruptedException e1) {
e1.printStackTrace();
}
// 求解dc的阈值,这里的dc不用传入进来即可,即delta的值
// 阈值问题可以在讨论,这里暂时使用传进来的阈值即可
// double dc =dcs[0];
// 读取聚类中心文件
Map<Object,Object> vectorsMap= HUtils.readSeq(output+"0/clustered/part-m-00000", Integer.parseInt(k));
double[][] vectors = HUtils.getCenterVector(vectorsMap);
double[] distances= Utils.getDistances(vectors);
// 这里不使用传入进来的阈值
int iter_i=0;
int ret=0;
double tmpDelta=0;
int kInt = Integer.parseInt(k);
try {
do{
if(iter_i>=distances.length){
// delta= String.valueOf(distances[distances.length-1]/2);
// 这里使用什么方式还没有想好。。。
// 使用下面的方式
tmpDelta=Double.parseDouble(delta);
while(kInt-->0){// 超过k次后就不再增大
tmpDelta*=2;// 每次翻倍
}
delta=String.valueOf(tmpDelta);
}else{
delta=String.valueOf(distances[iter_i]/2);
}
log.info("this is the {} iteration,with dc:{}",new Object[]{iter_i,delta});
String[] ar={
HUtils.getHDFSPath(output)+iter_i+"/unclustered",
HUtils.getHDFSPath(output)+(iter_i+1),//output
//HUtils.getHDFSPath(HUtils.CENTERPATHPREFIX)+iter_i+"/clustered/part-m-00000",//center file
k,
delta,
String.valueOf((iter_i+1))
};
try{
ret = ToolRunner.run(HUtils.getConf(), new ClusterDataJob(), ar);
if(ret!=0){
log.info("ClusterDataJob failed, with iteration {}",new Object[]{iter_i});
break;
}
}catch(Exception e){
e.printStackTrace();
}
iter_i++;
HUtils.JOBNUM++;// 每次循环后加1
}while(shouldRunNextIter());
} catch (IllegalArgumentException e) {
e.printStackTrace();
}
if(ret==0){
log.info("All cluster Job finished with iteration {}",new Object[]{iter_i});
}
}public void runCluster2(){
Map<String ,Object> map = new HashMap<String,Object>();
try {
//提交一个Hadoop MR任务的基本流程
// 1. 设置提交时间阈值,并设置这组job的个数
//使用当前时间即可,当前时间往前10s,以防服务器和云平台时间相差
HUtils.setJobStartTime(System.currentTimeMillis()-10000);//
// 由于不知道循环多少次完成,所以这里设置为2,每次循环都递增1
// 当所有循环完成的时候,就该值减去2即可停止监控部分的循环
HUtils.JOBNUM=2;
// 2. 使用Thread的方式启动一组MR任务
new Thread(new RunCluster2(input, output,delta, record)).start();
// 3. 启动成功后,直接返回到监控,同时监控定时向后台获取数据,并在前台展示;
map.put("flag", "true");
map.put("monitor", "true");
} catch (Exception e) {
e.printStackTrace();
map.put("flag", "false");
map.put("monitor", "false");
map.put("msg", e.getMessage());
}
Utils.write2PrintWriter(JSON.toJSONString(map));
}在MR任务循环结束后,重新设置JOBNUM的值即可控制监控的循环停止:/**
* 是否应该继续下次循环
* 直接使用分类记录数和未分类记录数来判断
* @throws IOException
* @throws IllegalArgumentException
*/
private boolean shouldRunNextIter() {
if(HUtils.UNCLUSTERED==0||HUtils.CLUSTERED==0){
HUtils.JOBNUM-=2;// 不用监控 则减去2;
return false;
}
return true;
}执行分类页面:/**
* 把分类的数据解析到list里面
* @param path
* @return
*/
private static Collection<? extends UserGroup> resolve(Path path) {
// TODO Auto-generated method stub
List<UserGroup> list = new ArrayList<UserGroup>();
Configuration conf = HUtils.getConf();
SequenceFile.Reader reader = null;
int i=0;
try {
reader = new SequenceFile.Reader(conf, Reader.file(path),
Reader.bufferSize(4096), Reader.start(0));
IntWritable dkey = (IntWritable) ReflectionUtils
.newInstance(reader.getKeyClass(), conf);
DoubleArrIntWritable dvalue = (DoubleArrIntWritable) ReflectionUtils
.newInstance(reader.getValueClass(), conf);
while (reader.next(dkey, dvalue)) {// 循环读取文件
// 使用这个进行克隆
list.add(new UserGroup(i++,dkey.get(),dvalue.getIdentifier()));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(reader);
}
Utils.simpleLog("读取"+list.size()+"条记录,文件:"+path.toString());
return list;
}版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/fansy1990/article/details/46943657