码迷,mamicode.com
首页 > 编程语言 > 详细

2018-07-30期 MapReduce对象排序(单列排序)

时间:2018-07-30 11:33:51      阅读:187      评论:0      收藏:0      [点我收藏+]

标签:stat   float   ons   nmap   员工   instance   row   import   writable   

1、EmpSalaryBean1 对象类

package cn.sjq.mr.sort;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**

* 定义一个员工薪水的JavaBean,并实现MapReduce的WritableComparable<EmpSalaryBean>序列化及比较器接口

* @author songjq

*

*/

public class EmpSalaryBean1 implements WritableComparable<EmpSalaryBean1> {

/*

  定义成员属性

  c_oid

  c_employee_name

  c_second_company_name

  c_third_company_name

  c_fourth_company_name

  c_company_name

  c_dept_name

  c_sub_total

  c_com_fund_payamt

*/

private int seq;

private String c_oid;

private String c_employee_name;

private String c_second_company_name;

private String c_third_company_name;

private String c_fourth_company_name;

private String c_company_name;

private String c_dept_name;

private float c_sub_total;

private float c_com_fund_payamt;

public int getSeq() {

return seq;

}

public void setSeq(int seq) {

this.seq = seq;

}

public String getC_oid() {

return c_oid;

}

public void setC_oid(String c_oid) {

this.c_oid = c_oid;

}

public String getC_employee_name() {

return c_employee_name;

}

public void setC_employee_name(String c_employee_name) {

this.c_employee_name = c_employee_name;

}

public String getC_second_company_name() {

return c_second_company_name;

}

public void setC_second_company_name(String c_second_company_name) {

this.c_second_company_name = c_second_company_name;

}

public String getC_third_company_name() {

return c_third_company_name;

}

public void setC_third_company_name(String c_third_company_name) {

this.c_third_company_name = c_third_company_name;

}

public String getC_fourth_company_name() {

return c_fourth_company_name;

}

public void setC_fourth_company_name(String c_fourth_company_name) {

this.c_fourth_company_name = c_fourth_company_name;

}

public String getC_company_name() {

return c_company_name;

}

public void setC_company_name(String c_company_name) {

this.c_company_name = c_company_name;

}

public String getC_dept_name() {

return c_dept_name;

}

public void setC_dept_name(String c_dept_name) {

this.c_dept_name = c_dept_name;

}

public float getC_sub_total() {

return c_sub_total;

}

public void setC_sub_total(float c_sub_total) {

this.c_sub_total = c_sub_total;

}

public float getC_com_fund_payamt() {

return c_com_fund_payamt;

}

public void setC_com_fund_payamt(float c_com_fund_payamt) {

this.c_com_fund_payamt = c_com_fund_payamt;

}

//方序列化方法

@Override

public void readFields(DataInput in) throws IOException {

this.seq = in.readInt();

this.c_oid = in.readUTF();

this.c_employee_name = in.readUTF();

this.c_second_company_name = in.readUTF();

this.c_third_company_name = in.readUTF();

this.c_fourth_company_name = in.readUTF();

this.c_company_name = in.readUTF();

this.c_dept_name = in.readUTF();

this.c_sub_total = in.readFloat();

this.c_com_fund_payamt = in.readFloat();

}

//序列化方法

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.seq);

out.writeUTF(this.c_oid);

out.writeUTF(this.c_employee_name);

out.writeUTF(this.c_second_company_name);

out.writeUTF(this.c_third_company_name);

out.writeUTF(this.c_fourth_company_name);

out.writeUTF(this.c_company_name);

out.writeUTF(this.c_dept_name);

out.writeFloat(this.c_sub_total);

out.writeFloat(this.c_com_fund_payamt);

}

@Override

public String toString() {

return this.seq+"\t"+this.c_oid+"\t"+

  this.c_employee_name+"\t"+this.c_second_company_name+"\t"+

  this.c_third_company_name+"\t"+this.c_fourth_company_name+"\t"+

  this.c_company_name+"\t"+this.c_dept_name+"\t"+

  this.c_sub_total+"\t"+this.c_com_fund_payamt+"\t";

}

/*

* 实现compareTo方法

* (non-Javadoc)

* @see java.lang.Comparable#compareTo(java.lang.Object)

*/

@Override

public int compareTo(EmpSalaryBean1 o) {

// 单列排序,这里根据部门名称c_dept_name进行升序排序,部门号为字符串

if (this.c_dept_name.compareTo(o.getC_dept_name()) >= 0) {

// 当前c_dept_name大于等于传入的c_dept_name,则返回1

return 1;

} else {

// 当前c_dept_name小于传入的c_dept_name,则返回-1

return -1;

}

}

}

2、ObjectSortSingleClumn实现类

package cn.sjq.mr.sort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* 利用MapReduce对EmpSalaryBean排序

* @author songjq

*

*/

public class ObjectSortSingleClumn {

/**

* 排序Mapper类

* 由于MapReduce排序主要发生在Key2,因此需要将EmpSalaryBean作为Key2的输出

* 这里使用mapper类即可,不需要写Reducer类

* @author songjq

*

*/

static class ObjectSortSingleClumnMapper extends Mapper<LongWritable, Text, EmpSalaryBean1, NullWritable>{

@Override

protected void map(LongWritable key1, Text value1,

Context context)

throws IOException, InterruptedException {

//获取一行

String line = value1.toString();

//分词

String[] fds = StringUtils.split(line, ",");

//将分词数据封装到EmpSalaryBean对象

EmpSalaryBean1 es = new EmpSalaryBean1();

es.setSeq(new Integer(fds[0]).intValue());

es.setC_oid(fds[1]);

es.setC_employee_name(fds[2]);

es.setC_second_company_name(fds[3]);

es.setC_third_company_name(fds[4]);

es.setC_fourth_company_name(fds[5]);

es.setC_company_name(fds[6]);

es.setC_dept_name(fds[7]);

es.setC_sub_total(new Float(fds[8]).floatValue());

es.setC_com_fund_payamt(new Float(fds[9]).floatValue());

//序列化输出到Reducer

context.write(es,NullWritable.get());

}

}

/**

* 提交job

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(ObjectSortSingleClumn.class);

job.setMapperClass(ObjectSortSingleClumnMapper.class);

job.setMapOutputKeyClass(EmpSalaryBean1.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(EmpSalaryBean1.class);

job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\sort\\empsalary.csv"));

FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\sort\\out7"));

job.waitForCompletion(true);

}

}


2018-07-30期 MapReduce对象排序(单列排序)

标签:stat   float   ons   nmap   员工   instance   row   import   writable   

原文地址:http://blog.51cto.com/2951890/2151972

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!