我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。
硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、hadoop-1.2.1
首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:
Map端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
Text addressId = new Text();
Text info = new Text();
String[] line = value.toString().split(":");// 获取文件的每一行数据,并以":"分割
String path = ((FileSplit) context.getInputSplit()).getPath().toString();
if (line.length < 2) {
return;
}
if (path.indexOf("company") >= 0) {//处理company文件的value信息: "Beijing Red Star:1"
addressId.set(line[1]);//"1"
info.set("company" + ":" + line[0]);//"company:Beijing Red Star"
context.write(addressId,info);//<key,value> --<"1","company:Beijing Red Star">
} else if (path.indexOf("address") >= 0) {//处理adress文件的value信息:"1:Beijing"
addressId.set(line[0]);//"1"
info.set("address" + ":" + line[1]);//"address:Beijing"
context.write(addressId,info);//<key,value> --<"1","address:Beijing">
}
}
}
Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。
Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static class ReduceClass extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> companys = new ArrayList<String>();
List<String> addresses = new ArrayList<String>();
//["company:Beijing Red Star","company:Beijing JD","address:Beijing"]
Iterator<Text> it = values.iterator();
while(it.hasNext()){
String value = it.next().toString();//"company:Beijing Red Star"
String[] result = value.split(":");
if(result.length >= 2){
if(result[0].equals("company")){
companys.add(result[1]);
}else if(result[0].equals("address")){
addresses.add(result[1]);
}
}
}
// 求笛卡尔积
if(0 != companys.size()&& 0 != addresses.size()){
for(int i=0;i<companys.size();i++){
for(int j=0;j<addresses.size();j++){
context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>--<"Beijing JD","Beijing">
}
}
}
}
}
驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: company Join address <companyTableDir> <addressTableDir> <out>");
System.exit(2);
}
Job job = new Job(conf, "company Join address");
//设置Job入口类
job.setJarByClass(CompanyJoinAddress.class);
// 设置Map和Reduce处理类
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDir
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDir
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//out
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
[hadoop@K-Master ~]$ start-dfs.sh
[hadoop@K-Master ~]$ start-mapred.sh
[hadoop@K-Master ~]$ jps
5283 SecondaryNameNode
5445 JobTracker
5578 Jps
5109 NameNode
#设置工作环境
[hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/MapReduce
#部署源码
将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;
… 你可以直接 下载 CompanyJoinAddress
#切换工作目录
[hadoop@K-Master ~]$ cd /usr/hadoop/workspace/MapReduce/CompanyJoinAddress
#编译文件
[hadoop@K-Master CompanyJoinAddress]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin src/com/zonesion/tablejoin/CompanyJoinAddress.java
[hadoop@K-Master CompanyJoinAddress]$ ls bin/com/zonesion/tablejoin/* -la
-rw-rw-r-- 1 hadoop hadoop 1909 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress.class
-rw-rw-r-- 1 hadoop hadoop 2199 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class
-rw-rw-r-- 1 hadoop hadoop 2242 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class
[hadoop@K-Master CompanyJoinAddress]$ jar -cvf CompanyJoinAddress.jar -C bin/ .
added manifest
adding: com/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/(in = 0) (out= 0)(stored 0%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class(in = 2273) (out= 951)(deflated 58%)
adding: com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class(in = 2242) (out= 1029)(deflated 54%)
adding: com/zonesion/tablejoin/CompanyJoinAddress.class(in = 1909) (out= 983)(deflated 48%)
#创建company输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/company/
#创建address输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/address/
#上传文件到company输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/company* CompanyJoinAddress/input/company/
#上传文件到address输入文件夹
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/address* CompanyJoinAddress/input/address/
[hadoop@K-Master CompanyJoinAddress]$ hadoop jar CompanyJoinAddress.jar com.zonesion.tablejoin.CompanyJoinAddress CompanyJoinAddress/input/company/ CompanyJoinAddress/input/address/ CompanyJoinAddress/output
14/08/01 10:50:05 INFO input.FileInputFormat: Total input paths to process : 4
14/08/01 10:50:05 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/08/01 10:50:05 WARN snappy.LoadSnappy: Snappy native library not loaded
14/08/01 10:50:05 INFO mapred.JobClient: Running job: job_201408010921_0008
14/08/01 10:50:06 INFO mapred.JobClient: map 0% reduce 0%
14/08/01 10:50:09 INFO mapred.JobClient: map 50% reduce 0%
14/08/01 10:50:10 INFO mapred.JobClient: map 100% reduce 0%
14/08/01 10:50:17 INFO mapred.JobClient: map 100% reduce 100%
14/08/01 10:50:17 INFO mapred.JobClient: Job complete: job_201408010921_0008
14/08/01 10:50:17 INFO mapred.JobClient: Counters: 29
......
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -ls CompanyJoinAddress/output
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_logs
-rw-r--r-- 1 hadoop supergroup 241 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/part-r-00000
[hadoop@K-Master CompanyJoinAddress]$ hadoop fs -cat CompanyJoinAddress/output/part-r-00000
Beijing Red Star Beijing
Beijing Rising Beijing
Back of Beijing Beijing
Beijing JD Beijing
xiaomi Beijing
Guangzhou Honda Guangzhou
Guangzhou Development Bank Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
aiplay hangzhou
huawei wuhan
【Hadoop基础教程】5、Hadoop之单词计数
【Hadoop基础教程】6、Hadoop之单表关联查询
【Hadoop基础教程】7、Hadoop之一对多关联查询
【Hadoop基础教程】8、Hadoop之一对多关联查询
【Hadoop基础教程】9、Hadoop之倒排索引
原文地址:http://blog.csdn.net/andie_guo/article/details/44086183