码迷,mamicode.com
首页 > 其他好文 > 详细

大数据学习第12天

时间:2019-06-20 09:16:39      阅读:107      评论:0      收藏:0      [点我收藏+]

标签:turn   好友   put   mapred   throws   key   on()   jar   run   

package com.bjsxt.mgqq;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.laoxiao.mr.weather.MyKey;

public class RunJob {

public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\peiyou\\hadoop-2.6.5");
Configuration conf =new Configuration();
conf.setFloat("mapreduce.map.sort.spill.percent", 0.6f);
try {
Job job =Job.getInstance(conf);
FileSystem fs =FileSystem.get(conf);
job.setJobName("wc");
job.setJarByClass(RunJob.class);

job.setMapperClass(Mapper1.class);
job.setReducerClass(Reducer1.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// job.setCombinerClass(WordCountReducer.class);//指定一个combiner的类

job.setNumReduceTasks(1); //设置reduce的数量
job.setInputFormatClass(KeyValueTextInputFormat.class);
//设置计算输入数据
FileInputFormat.addInputPath(job, new Path("/input/friend"));
//设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录)
Path outpath =new Path("/output/f1/"); //该目录必须不能存在,如果存在计算框架会出错
if(fs.exists(outpath)){//如果存在该目录,则删除
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);

//开始执行
boolean f =job.waitForCompletion(true);
if(f){
System.out.println("mapreduce程序1执行成功");
job =Job.getInstance(conf);
job.setJobName("wc");
job.setJarByClass(RunJob.class);

job.setMapperClass(Mapper2.class);
job.setReducerClass(Reducer2.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(SortComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
// job.setCombinerClass(WordCountReducer.class);//指定一个combiner的类

job.setNumReduceTasks(1); //设置reduce的数量
job.setInputFormatClass(KeyValueTextInputFormat.class);
//设置计算输入数据
FileInputFormat.addInputPath(job, new Path("/output/f1/"));
//设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录)
outpath =new Path("/output/f2/"); //该目录必须不能存在,如果存在计算框架会出错
if(fs.exists(outpath)){//如果存在该目录,则删除
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);

//开始执行
f =job.waitForCompletion(true);
if(f){
System.out.println("mapreduce程序2执行成功");
}
}

} catch (Exception e) {
e.printStackTrace();
}
}

static class Mapper1 extends Mapper<Text, Text, Text, IntWritable>{
protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {
String user=key.toString();
String[] friends=value.toString().split("\t");
for (int i = 0; i < friends.length; i++) {
String f1 = friends[i];
String userAndFriend=getFof(user, f1);
context.write(new Text(userAndFriend), new IntWritable(-1));
for(int j=i+1;j<friends.length;j++){
String f2 = friends[j];
String fof=getFof(f1, f2);
context.write(new Text(fof), new IntWritable(1));
}
}
}

/**
* 保证一致的顺序
* @param user1
* @param user2
* @return
*/
private String getFof(String user1,String user2){
if(user1.compareTo(user2)>0){
return user1+":"+user2;
}else{
return user2+":"+user1;
}
}
}

static class Reducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, Iterable<IntWritable> iter,
Context context)
throws IOException, InterruptedException {
int sum=0;
boolean flag=true; //判断是否存在直接好友的FOF
for(IntWritable i:iter){
if(i.get()==-1){
flag=false;
break;
}else{
sum+=i.get();
}
}
if(flag){
context.write(key, new IntWritable(sum));
}
}
}

static class Mapper2 extends Mapper<Text, Text, Text, Text>{
protected void map(Text key, Text value,
Context context)
throws IOException, InterruptedException {
String[] users=key.toString().split(":");
int count=Integer.parseInt(value.toString());
Text k1 =new Text(users[0]+","+count);
Text v1 =new Text(users[1]);
context.write(k1, v1);
Text k2 =new Text(users[1]+","+count);
Text v2 =new Text(users[0]);
context.write(k2, v2);
}
}

static class SortComparator extends WritableComparator{

//比较器必须有构造方法
public SortComparator() {
super(Text.class,true);
}

public int compare(WritableComparable a, WritableComparable b) {
Text k1 =(Text) a;
Text k2 =(Text) b;

String[] kk1 =k1.toString().split(",");
String[] kk2 =k2.toString().split(",");

int r1=kk1[0].compareTo(kk2[0]);
if(r1==0){
return -Integer.compare(Integer.parseInt(kk1[1]), Integer.parseInt(kk2[1]));
}
return r1;
}

}

static class GroupComparator extends WritableComparator{

public GroupComparator() {
super(Text.class,true);
}

public int compare(WritableComparable a, WritableComparable b) {
Text k1 =(Text) a;
Text k2 =(Text) b;
String[] kk1 =k1.toString().split(",");
String[] kk2 =k2.toString().split(",");
return kk1[0].compareTo(kk2[0]);
}
}

static class Reducer2 extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> iter,
Context context)
throws IOException, InterruptedException {
StringBuffer sb =new StringBuffer();
String user =key.toString().split(",")[0];
for(Text v:iter){
sb.append(v.toString()).append("\t");
}
sb.substring(0, sb.length()-1);
context.write(new Text(user), new Text(sb.toString()));
}
}
}

大数据学习第12天

标签:turn   好友   put   mapred   throws   key   on()   jar   run   

原文地址:https://www.cnblogs.com/lkoooox/p/11056512.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!