标签:mapreduce cdh5 jdk1.8 string bin ati oat etc combine
Hadoop2.6.0(IDEA中源码编译使用CDH5.7.3,对应Hadoop2.6.0),集群使用原生Hadoop2.6.4,JDK1.8,Intellij IDEA 14 。源码可以在https://github.com/fansy1990/linear_regression 下载。
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(randN <= 0) { // 如果randN 比0小,那么不再次打乱数据 context.write(randFloatKey,value); return ; } if(++countI >= randN){// 如果randN等于1,那么每次随机的值都是不一样的 randFloatKey.set(random.nextFloat()); countI =0; } context.write(randFloatKey,value); }
theta0 = theta0 -alpha*(h(x)-y)x theta1 = theta1 -alpha*(h(x)-y)x其中,h(x)= theta0 + theta1 * x ;同时,需要注意这里的更新是同步更新,其核心代码如下所示:
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { float[] xy = Utils.str2float(value.toString().split(splitter)); float x = xy[0]; float y = xy[1]; // 同步更新 theta0 and theta1 lastTheta0 = theta0; theta0 -= alpha *(theta0+theta1* x - y) * x; // 保持theta0 和theta1 不变 theta1 -= alpha *(lastTheta0 + theta1 * x -y) * x;// 保持theta0 和theta1 不变 }
protected void cleanup(Context context) throws IOException, InterruptedException { theta0_1.set(theta0 + splitter + theta1); context.write(theta0_1,NullWritable.get()); }
conf.setLong("mapreduce.input.fileinputformat.split.maxsize",700L);// 获取多个mapper; job.setNumReduceTasks(0);
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { float[] xy = Utils.str2float(value.toString().split(splitter)); for(int i =0;i<thetas.size() ;i++){ // error = (theta0 + theta1 * x - y) ^2 thetaErrors[i] += (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) * (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) ; thetaNumbers[i]+= 1; } }
protected void cleanup(Context context) throws IOException, InterruptedException { for(int i =0;i<thetas.size() ;i++){ theta.set(thetas.get(i)); floatAndLong.set(thetaErrors[i],thetaNumbers[i]); context.write(theta,floatAndLong); } }
protected void reduce(FloatAndFloat key, Iterable<FloatAndLong> values, Context context) throws IOException, InterruptedException { float sumF = 0.0f; long sumL = 0L ; for(FloatAndLong value:values){ sumF +=value.getSumFloat(); sumL += value.getSumLong(); } theta_error.add(new float[]{key.getTheta0(),key.getTheta1(), (float)Math.sqrt((double)sumF / sumL)}); logger.info("theta:{}, error:{}", new Object[]{key.toString(),Math.sqrt(sumF/sumL)}); }
protected void cleanup(Context context) throws IOException, InterruptedException { // 如何加权? // 方式1:如果误差越小,那么说明权重应该越大; // 方式2:直接平均值 float [] theta_all = new float[2]; if("average".equals(method)){ // theta_all = theta_error.get(0); for(int i=0;i< theta_error.size();i++){ theta_all[0] += theta_error.get(i)[0]; theta_all[1] += theta_error.get(i)[1]; } theta_all[0] /= theta_error.size(); theta_all[1] /= theta_error.size(); } else { float sumErrors = 0.0f; for(float[] d:theta_error){ sumErrors += 1/d[2]; } for(float[] d: theta_error){ theta_all[0] += d[0] * 1/d[2] /sumErrors; theta_all[1] += d[1] * 1/d[2] /sumErrors; } } context.write(new FloatAndFloat(theta_all),NullWritable.get()); }
public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://master:8020/user/fanzhe/linear_regression.txt", "hdfs://master:8020/user/fanzhe/shuffle_out", "1" } ; ToolRunner.run(Utils.getConf(),new ShuffleDataJob(),args); }
6.1101,17.592 5.5277,9.1302 8.5186,13.662 。。。Shuffle输出:
public static void main(String[] args) throws Exception { // <input> <output> <theta0;theta1;alpha> <splitter> // 注意第三个参数使用分号分割 args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/linear_regression", "1;0;0.01", "," } ; ToolRunner.run(Utils.getConf(),new LinearRegressionJob(),args); }查看输出结果:
public static void main(String[] args) throws Exception { // <input> <output> <theta_path> <splitter> <average|weight> args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/single_linear_regression_error", "hdfs://master:8020/user/fanzhe/linear_regression", ",", "weight" } ; ToolRunner.run(Utils.getConf(),new SingleLinearRegressionError(),args); }这里设置的合并theta值的方式使用加权,读者可以设置为average,从而使用平均值;
public static void main(String[] args) throws Exception { // <input> <output> <theta_path> <splitter> args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/last_linear_regression_error", "hdfs://master:8020/user/fanzhe/single_linear_regression_error", ",", } ; ToolRunner.run(Utils.getConf(),new LastLinearRegressionError(),args); }输出结果为:
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
标签:mapreduce cdh5 jdk1.8 string bin ati oat etc combine
原文地址:http://blog.csdn.net/fansy1990/article/details/52962863