// 011990-99999 SIHCCAJAVRI 195005151800 -11
// 011990-99999 SIHCCAJAVRI 195005151200 22
// 011990-99999 SIHCCAJAVRI 195005150700 0
// 012650-99999 TRNSET-HANSMOEN 194903241800 78
// 012650-99999 TRNSET-HANSMOEN 194903241200 111
/**
* 自定义分区方法,将气象站id相同的记录分到相同的reducer中
*
*/
publicstaticclass KeyPartitioner extends Partitioner<
TextPair,Text>{
publicint getPartition(TextPair
key,Text value,int numPartitions){
//根据气象站id进行选择分区,而不是组合键的整体
return (key.getFirst().hashCode()&Integer.MAX_VALUE )
% numPartitions;
}
}
/**
* 自定义分组,将气象站id相同的key放到同一个reducer中执行。然后再通过TextPair进行内部比较排序
*/
publicstaticclass GroupingComparator extends WritableComparator{
public GroupingComparator()
{
super (TextPair.class,true);
}
@SuppressWarnings ("rawtypes" )
@Override
publicint compare(WritableComparable
w1, WritableComparable w2) {
TextPair tp1=(TextPair)w1;
TextPair tp2=(TextPair)w2;
Text f1= tp1.getFirst();
Text f2= tp2.getFirst();
return f1.compareTo(f2);
}
}
@Override
publicint run(String[]
args) throws Exception{
Configuration conf = new Configuration();//
读取配置文件
Job job = Job. getInstance();//
新建一个任务
job.setJarByClass(JoinRecordWithStationName. class );//
主类
Path recordInputPath = new Path(args[0]);//天气记录数据源
Path stationInputPath = new Path(args[1]);//气象站数据源
Path outputPath = new Path(args[2]);//输出路径
//如果输出路径存在就删除
FileSystem fs = outputPath.getFileSystem(conf);
if (fs.isDirectory(outputPath)){
fs.delete(outputPath, true );
}
MultipleInputs. addInputPath(job,recordInputPath,TextInputFormat. class ,JoinRecordMapper.class);//读取天气记录Mapper
MultipleInputs. addInputPath(job,stationInputPath,TextInputFormat. class ,JoinStationMapper.class);//读取气象站Mapper
FileOutputFormat. setOutputPath(job,outputPath);
job.setPartitionerClass(KeyPartitioner. class );//自定义分区
job.setGroupingComparatorClass(GroupingComparator. class );//自定义分组
job.setMapOutputKeyClass(TextPair. class );
job.setReducerClass(JoinReducer. class );//
Reducer
job.setOutputKeyClass(Text. class );
return job.waitForCompletion(true)?0:1;
}
publicstaticvoid main(String[]
args) throws Exception{
String args0[]={
"hdfs://yun-11:9000/join/records.txt" ,
"hdfs://yun-11:9000/join/station.txt" ,
"hdfs://yun-11:9000/join/out" };
int exitCode
= ToolRunner.run( new JoinRecordWithStationName(),args0);
System. exit(exitCode);
}
}
自定义组合键:TextPair
一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1
------------------------------------------------------------------------------------------------------------------------------
package com.tan.join1;
import java.io.*;
import org.apache.hadoop.io.*;
publicclassTextPair implements WritableComparable<TextPair>
{
private Text first; //Text
类型的实例变量 first --气象站id
private Text second; //Text
类型的实例变量 second--标记符号
public TextPair()
{
set( new Text(),new Text());
}
public TextPair(String
first, String second) {
set( new Text(first),new Text(second));
}
public TextPair(Text
first, Text second) {
set(first, second);
}
publicvoid set(Text
first, Text second) {
this .first =
first;
this .second =
second;
}
public Text
getFirst() {
returnfirst ;
}
public Text
getSecond() {
returnsecond ;
}
//将对象转换为字节流并写入到输出流out中
@Override
publicvoid write(DataOutput
out)throws IOException {
first.write(out);
second.write(out);
}
//从输入流in中读取字节流反序列化为对象
@Override
publicvoid readFields(DataInput
in)throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
publicint hashCode()
{
returnfirst .hashCode()
*163+ second.hashCode();
}
@Override
publicboolean equals(Object
o) {
if (o instanceof TextPair)
{
TextPair tp = (TextPair) o;
returnfirst .equals(tp.first)
&& second .equals(tp.second );
}
returnfalse ;
}
@Override
public String
toString() {
returnfirst +"\t" + second ;
}
//排序
@Override
publicint compareTo(TextPair
tp) {
//根据第一个字段进行对比,如果相等再根据第二个字段进行排序
if (!first .equals(tp.first))
{
returnfirst .compareTo(tp.first);
} elseif (!second .equals(tp.second)){
returnsecond .compareTo(tp.second);
}
return 0;
}
}
Map端连接
---------------------------------------------------------------------------------------