码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop基础(十八):MapReduce框架原理(二)切片机制(二)

时间:2020-07-17 09:33:01      阅读:73      评论:0      收藏:0      [点我收藏+]

标签:二进制   init   输入   etl   mat   over   小文件   sda   esper   

1.5 CombineTextInputFormat案例实操

1.需求

将输入的大量小文件合并成一个切片统一处理。

(1)输入数据

准备4个小文件

(2)期望

期望一个切片处理4个文件

2.实现过程

(1)不做任何处理,运行1.6的WordCount案例程序,观察切片个数为4

 

(2)在WordcountDriver增加如下代码运行程序,并观察运行的切片个数为3

(a)驱动类中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

 

//虚拟存储切片最大值设置4m

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

(b)运行如果为3个切片。

 

3在WordcountDriver增加如下代码运行程序,并观察运行的切片个数为1

(a)驱动中添加代码如下:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

 

//虚拟存储切片最大值设置20m

CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

(b)运行如果为1个切片。

 

1.6 FileInputFormat实现

 技术图片

 

 

 技术图片

 

 

 技术图片

 

 

 技术图片

 

 

 

1.7 KeyValueTextInputFormat使用案例

 

1.需求

统计输入文件中每一行的第一个单词相同的行数。

1)输入数据

 

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

 

2)期望结果数据

 

banzhang    2
xihuan    2

 

2.需求分析

技术图片

 

 

3.代码实现

1编写Mapper类

package com.atguigu.mapreduce.KeyValueTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{
    
// 1 设置value
   LongWritable v = new LongWritable(1);  
    
    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

// banzhang ni hao
        
        // 2 写出
        context.write(key, v);  
    }
}

2)编写Reducer

 

package com.atguigu.mapreduce.KeyValueTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    
    LongWritable v = new LongWritable();  
    
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {
        
         long sum = 0L;  

         // 1 汇总统计
        for (LongWritable value : values) {  
            sum += value.get();  
        }
         
        v.set(sum);  
         
        // 2 输出
        context.write(key, v);  
    }
}

 

3)编写Driver

 

package com.atguigu.mapreduce.keyvaleTextInputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        Configuration conf = new Configuration();
        // 设置切割符
    conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
        // 1 获取job对象
        Job job = Job.getInstance(conf);
        
        // 2 设置jar包位置,关联mapper和reducer
        job.setJarByClass(KVTextDriver.class);
        job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
                
        // 3 设置map输出kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 4 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
        
        // 5 设置输入输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        
        // 设置输入格式
    job.setInputFormatClass(KeyValueTextInputFormat.class);
        
        // 6 设置输出数据路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // 7 提交job
        job.waitForCompletion(true);
    }
}

 

1.8 NLineInputFormat使用案例

1需求

对每个单词进行个数统计,要求根据每个输入文件的行数规定输出多少个切片。此案例要求行放入一个切片中。

1)输入数据

 

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

 

2)期望输出数据

 

Number of splits:4

2.需求分析

技术图片

 

 

3.代码实现

1)编写Mapper

 

package com.atguigu.mapreduce.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
    private Text k = new Text();
    private LongWritable v = new LongWritable(1);
    
    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {
        
         // 1 获取一行
        String line = value.toString();
        
        // 2 切割
        String[] splited = line.split(" ");
        
        // 3 循环写出
        for (int i = 0; i < splited.length; i++) {
            
            k.set(splited[i]);
            
           context.write(k, v);
        }
    }
}

 

2)编写Reducer

 

package com.atguigu.mapreduce.nline;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    
    LongWritable v = new LongWritable();
    
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {
        
        long sum = 0l;

        // 1 汇总
        for (LongWritable value : values) {
            sum += value.get();
        }  
        
        v.set(sum);
        
        // 2 输出
        context.write(key, v);
    }
}

 

3)编写Driver

 

package com.atguigu.mapreduce.nline;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NLineDriver {
    
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputword", "e:/output1" };

         // 1 获取job对象
         Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        
        // 7设置每个切片InputSplit中划分三条记录
        NLineInputFormat.setNumLinesPerSplit(job, 3);
          
        // 8使用NLineInputFormat处理记录数  
        job.setInputFormatClass(NLineInputFormat.class);  
          
        // 2设置jar包位置,关联mapper和reducer
        job.setJarByClass(NLineDriver.class);  
        job.setMapperClass(NLineMapper.class);  
        job.setReducerClass(NLineReducer.class);  
        
        // 3设置map输出kv类型
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(LongWritable.class);  
        
        // 4设置最终输出kv类型
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(LongWritable.class);  
          
        // 5设置输入输出数据路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
          
        // 6提交job
        job.waitForCompletion(true);  
    }
}

 

4.测试

1)输入数据

 

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

 

2)输出结果的切片,如图4-10所示

技术图片

1.9 自定义InputFormat

技术图片

1.10 自定义InputFormat案例实操

无论HDFS还是MapReduce,在处理小文件时效率非常,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1需求

多个小文件合并成一个SequenceFile文件SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key文件内容为value

1)输入数据

 

技术图片 技术图片 技术图片

2)期望输出文件格式

技术图片

2.需求分析

技术图片

3.程序实现

1)自定义InputFromat

 

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

// 定义类继承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)    throws IOException, InterruptedException {
        
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);
        
        return recordReader;
    }
}

 

2)自定义RecordReader

 

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

    private Configuration configuration;
    private FileSplit split;
    
    private boolean isProgress= true;
    private BytesWritable value = new BytesWritable();
    private Text k = new Text();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        
        this.split = (FileSplit)split;
        configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        
        if (isProgress) {

            // 1 定义缓存区
            byte[] contents = new byte[(int)split.getLength()];
            
            FileSystem fs = null;
            FSDataInputStream fis = null;
            
            try {
                // 2 获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);
                
                // 3 读取数据
                fis = fs.open(path);
                
                // 4 读取文件内容
                IOUtils.readFully(fis, contents, 0, contents.length);
                
                // 5 输出文件内容
                value.set(contents, 0, contents.length);

// 6 获取文件路径及名称
String name = split.getPath().toString();

// 7 设置输出的key值
k.set(name);

            } catch (Exception e) {
                
            }finally {
                IOUtils.closeStream(fis);
            }
            
            isProgress = false;
            
            return true;
        }
        
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
    }
}

 

3)编写SequenceFileMapper类处理流程

 

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
    
    @Override
    protected void map(Text key, BytesWritable value,            Context context)        throws IOException, InterruptedException {

        context.write(key, value);
    }
}

 

4)编写SequenceFileReducer类处理流程

 

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {

        context.write(key, values.iterator().next());
    }
}

 

5)编写SequenceFileDriver类处理流程

 

package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
       // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "e:/input/inputinputformat", "e:/output1" };

       // 1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

       // 2 设置jar包存储位置、关联自定义的mapper和reducer
        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

       // 7设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);

       // 8设置输出的outputFormat
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
// 3 设置map输出端的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        
       // 4 设置最终输出端的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

       // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

       // 6 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

 

Hadoop基础(十八):MapReduce框架原理(二)切片机制(二)

标签:二进制   init   输入   etl   mat   over   小文件   sda   esper   

原文地址:https://www.cnblogs.com/qiu-hua/p/13326492.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!