标签:简单的 text with ext over nio job style 设计
package com.zhyea.dev; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; public class ContentCompare { public static class SplitterMapper extends Mapper<Object, Text, Text, Text> { private Text text = new Text(); @Override public void map(Object key, Text value, Context context) { try { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); text.set(fileName); context.write(value, text); } catch (Exception e) { e.printStackTrace(); } } } public static class UnionReducer extends Reducer<Text, Text, Text, NullWritable> { @Override public void reduce(Text key, Iterable<Text> values, Context context) { try { context.write(key, NullWritable.get()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class InterReducer extends Reducer<Text, Text, Text, NullWritable> { @Override public void reduce(Text key, Iterable<Text> values, Context context) { try { Iterator<Text> itr = values.iterator(); boolean flagA = false; boolean flagB = false; while (itr.hasNext()) { String s = itr.next().toString(); if (s.equals("B")) { flagB = true; } if (s.equals("A")) { flagA = true; } } if (flagA && flagB) { context.write(key, NullWritable.get()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static class DiffAReducer extends Reducer<Text, Text, Text, NullWritable> { @Override public void reduce(Text key, Iterable<Text> values, Context context) { try { Iterator<Text> itr = values.iterator(); boolean flagA = false; boolean flagB = false; while (itr.hasNext()) { String s = itr.next().toString(); if (s.equals("A")) { flagA = true; } if (s.equals("B")) { flagB = true; } } if (flagA && !flagB) { context.write(key, NullWritable.get()); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "content-compare"); job.setJarByClass(ContentCompare.class); job.setMapperClass(SplitterMapper.class); job.setReducerClass(DiffAReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
package com.talkingdata.campaign import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat} import org.apache.spark.rdd.HadoopRDD import org.apache.spark.{SparkConf, SparkContext} object ContentCompare { def main(args: Array[String]): Unit = { val inputPath = args(0) val outputPath = args(1) val conf = new SparkConf().setAppName("content compare") val sc = new SparkContext(conf) val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath) val hadoopRDD = data.asInstanceOf[HadoopRDD[LongWritable, Text]] hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile) .reduceByKey(_ + _) .filter(p => p._2.length == 2) .map(p => p._1) .repartition(1) .saveAsTextFile(outputPath) def readFile(inputSplit: InputSplit, itr: Iterator[(LongWritable, Text)]) = { val fileName = inputSplit.asInstanceOf[FileSplit].getPath.getName itr.map(p => (p._2.toString, fileName)) } } }
hadoopRDD.mapPartitionsWithInputSplit[(String, String)](readFile) .reduceByKey(_ + _) .filter(p => p._2.length == 1 && p._2 == "A") .map(p => p._1) .repartition(1) .saveAsTextFile(outputPath)
