码迷,mamicode.com
首页 > 其他好文 > 详细

大数据学习之手写MR框架(WordCount程序开发)08

时间:2019-05-01 01:46:46      阅读:255      评论:0      收藏:0      [点我收藏+]

标签:str   exists   remote   file   一个   string   try   edr   java   

简介:这里先手写一个MR程序,大致实现一个单词计数程序。帮助后面学习MapReduce组件。

1:先自定义一个Mapper接口

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:28:00
 * @version 1.0
 * 
 * 思路?
 * 接口设计
 */
public interface Mapper {
	//通用方法
	public void map(String line,Context context);
}

2:定义一个Context类:

 

该类主要实现数据的传输,和数据的封装(这里用的一个HashMap进行封装的)

 

 

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:18:13
 * @version 1.0
 * 
 * 思路?
 * 数据传输的类
 * 封装数据
 * 集合
 * <单词,1>
 */

import java.util.HashMap;

public class Context {
	//数据封装
	private HashMap<Object, Object> contextMap=new HashMap<>();
	
	//写数据
	public void write(Object key,Object value) {
		//放数据到map中
		contextMap.put(key, value);
	}
	
	//定义根据key拿到值方法
	public Object get(Object key) {
		return contextMap.get(key);
	}
	
	//拿到map当中的数据内容
	public HashMap<Object, Object> getContextMap(){
		return contextMap;
	}
}

 

3:实现Mapper类(其实这里就是简化的MapReduce阶段)

 

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:22:35
 * @version 1.0
 * 
 * 思路:
 * 	添加一个map方法  单词切分 相同key的value ++
 */
public class WordCountMapper implements Mapper{

	@Override
	public void map(String line, Context context) {
		//1.拿到这行数据 切分
		String[] words=line.split(" ");
		
		//2.拿到单词 相同的key value++  hello 1 itstar 1
		for(String word:words) {
			Object value=context.get(word);
			//相对于插入数据到HashMap中
			if(null==value) {
				context.write(word, 1);
			}else {
				//HashMap不为空 
				int v=(int) value;
				context.write(word, v+1);
			}
		}
	}
	
}

 

4:写一个总程序将这几个串起来(相当于是一个MR中的那个Driver程序,指定MapReduce的类。总程序入口)

 

package it.dawn.HDFSPra.HandWritingMR;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/**
 * @author Dawn
 * @date 2019年4月30日23:07:18
 * @version 1.0
 * 
 * 需求:文件(hello itstar hello hunter hello hunter henshuai ) 统计每个单词出现的次数?
 * 	   数据存储在hdfs、统计出来的结果存储到hdfs
 * 
 * 2004google:dfs/bigtable/mapreduce
 * 
 * 大数据解决的问题?
 * 	1.海量数据的存储
 * 		hdfs
 *  2.海量数据的计算
 *  	mapreduce
 *  
 * 思路?
 * 	 hello 1
 *   itstar 1
 *   hello 1
 *   ...
 *   
 * 基于用户体验:
 * 	用户输入数据(hdfs)
 *  用户处理的方式
 *  用户指定结果数据存储位置
 * 
 */
public class HdfsWordCount {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException {
		//反射
		Properties pro=new Properties();
		//加载配置文件
		pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties"));
		
		Path inpath=new Path(pro.getProperty("IN_PATH"));
		Path outpath=new Path(pro.getProperty("OUT_PATH"));
		
		Class<?> mapper_class=Class.forName(pro.getProperty("MAPPER_CLASS"));
		//实例化
		Mapper mapper=(Mapper) mapper_class.newInstance();
		
		Context context=new Context();
		
		//1.构建hdfs客户端对象
		Configuration conf=new Configuration();
		FileSystem fs=FileSystem.get(new URI("hdfs://bigdata11:9000"), conf, "root");
		
		//2.读取用户输入的文件
		//读取到的是改文件下的所有的txt文件
		RemoteIterator<LocatedFileStatus> iter=fs.listFiles(inpath, false);
		
		while(iter.hasNext()) {
			LocatedFileStatus file=iter.next();
			//打开路径 获取输入流
			FSDataInputStream in=fs.open(file.getPath());
			BufferedReader br=new BufferedReader(new InputStreamReader(in, "utf-8"));
			
			String line = null;
			
			while((line=br.readLine()) != null) {
				//调用map方法执行业务逻辑
				mapper.map(line, context);
			}
			
			br.close();
			in.close();
		}
		
		//如果用户输入的结果路径不存在 则创建一个
		Path out = new Path("/wc/out/");
		if(!fs.exists(out))
			fs.mkdirs(out);
		
		//将缓存的结果放入hdfs中存储 
		HashMap<Object, Object> contextMap=context.getContextMap();
		FSDataOutputStream out1=fs.create(outpath);
		
		//遍历HashMap
		Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
		for(Entry<Object, Object> entry:entrySet) {
			//写数据
			out1.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes());
		}
		
		out1.close();
		fs.close();
		
		System.out.println("数据统计结果完成....");
	}

}

 

  

配置文件(job.properties)如下:

IN_PATH=/dawn
OUT_PATH=/wc/out/rs.txt
MAPPER_CLASS=it.dawn.HDFSPra.HandWritingMR.WordCountMapper

 

 

 

  

大数据学习之手写MR框架(WordCount程序开发)08

标签:str   exists   remote   file   一个   string   try   edr   java   

原文地址:https://www.cnblogs.com/hidamowang/p/10798796.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!