标签:
1. 样例数据011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 组合键,此例中用于辅助排序,包括气象站ID和“标记”。 * “标记”是一个虚拟字段,其唯一目的是对记录排序,使气象站的记录比天气记录先到达。 * 虽然可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况, * 因为其中任何一组的记录数量都可能非常庞大,远远超出 reducer 的可用内存量 */ public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } // RawComparator 允许直接比较数据流中的记录,无须先把数据流反序列化为对象,这样避免了新建对象的额外开销 // WritableComparator 是对继承自 WritableComparable 类的 RawComparator 的一个通用实现。 public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // firstL1、firstL2 表示每个字节流中第一个 Text 字段的长度 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 标记气象站记录的 mapper */ public class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new TextPair(val[0], "0"), new Text(val[1])); } } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 标记天气记录的 mapper */ public class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 3) { context.write(new TextPair(val[0], "1"), new Text(val[1] + "\t" + val[2])); } } }
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 连接已标记的气象站记录和天气记录的 reducer */ public class JoinReducer extends Reducer<TextPair, Text, Text, Text> { @Override protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); Text stationName = new Text(iter.next()); // reducer 会先接收气象站记录(这里千万不能写成 Text stationName = iter.next(); ) while (iter.hasNext()) { Text record = iter.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class JoinRecordWithStationName { static class KeyPartitioner extends Partitioner<TextPair, Text> { @Override public int getPartition(TextPair textPair, Text text, int numPartitions) { return (textPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Parameter number is wrong, please enter three parameters:<ncdc input> <station input> <output>"); System.exit(-1); } Path ncdcInputPath = new Path(otherArgs[0]); Path stationInputPath = new Path(otherArgs[1]); Path outputPath = new Path(otherArgs[2]); //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance(conf, "JoinRecordWithStationName"); //job.setJar("F:/workspace/AssistRanking/target/AssistRanking-1.0-SNAPSHOT.jar"); MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(job, outputPath); //仅按照 first(气象台ID) 分区、分组 (同一分区的记录将被同一个Reducer处理,同一区同一组的记录将被同一个Reducer在同一次reduce()函数调用中处理) job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
标签:
原文地址:http://my.oschina.net/zc741520/blog/528207