标签:根据 adl 行数据 big style tco shm ado hashmap
一、数据处理类
package com.css.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; /** * 需求:文件(hello world hello teacher hello john tom ) 统计每个单词出现的次数? * 数据存储在hdfs、统计出来的结果存储到hdfs * * 2004google:dfs/bigtable/mapreduce * * 大数据解决的问题? * 1.海量数据的存储 * hdfs * 2.海量数据的计算 * mapreduce * * 思路? * hello 2 * world 1 * hello 1 * ... * * 基于用户体验: * 用户输入数据 * 用户处理的方式 * 用户指定结果数据存储位置 */ public class HdfsWordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException { // 反射 Properties pro = new Properties(); // 加载配置文件 pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties")); Path inPath = new Path(pro.getProperty("IN_PATH")); Path outPath = new Path(pro.getProperty("OUT_PATH")); Class<?> mapper_class = Class.forName(pro.getProperty("MAPPER_CLASS")); // 实例化 Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); // 构建hdfs客户端对象 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root"); // 读取用户输入的文件 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inPath, false); while (iter.hasNext()) { LocatedFileStatus file = iter.next(); // 打开路径 获取输入流 FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { // 调用map方法执行业务逻辑 mapper.map(line, context); } // 关闭资源 br.close(); in.close(); } // 如果用户输入的结果路径不存在 则创建一个 Path out = new Path("/wc/out/"); if (!fs.exists(out)) { fs.mkdirs(out); } // 将缓存的结果放入hdfs中存储 HashMap<Object, Object> contextMap = context.getContextMap(); FSDataOutputStream out1 = fs.create(outPath); // 遍历hashmap Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { // 写数据 out1.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } // 关闭资源 out1.close(); fs.close(); System.out.println("数据统计结果完成......"); } }
二、接口类
package com.css.hdfs; /** * 思路: * 接口设计 */ public interface Mapper { // 调用方法 public void map(String line, Context context); }
三、数据传输类
package com.css.hdfs; import java.util.HashMap; /** * 思路: * 数据传输的类 * 封装数据 * 集合 * <单词,1> */ public class Context { // 数据封装 private HashMap<Object, Object> contextMap = new HashMap<>(); // 写数据 public void write(Object key, Object value){ // 放数据到map中 contextMap.put(key, value); } // 定义根据key拿到值方法 public Object get(Object key){ return contextMap.get(key); } // 拿到map中的数据内容 public HashMap<Object, Object> getContextMap(){ return contextMap; } }
四、单词计数类
package com.css.hdfs; /** * 思路: * 添加一个map方法 单词切分 相同key的value ++ */ public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { // 拿到这行数据 切分 String[] words = line.split(" "); // 拿到单词 相同的key value++ hello 1 world 1 for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); }else { // 不为空 int v = (int)value; context.write(word, v+1); } } } }
五、配置文件job.properties
IN_PATH=/wc/in OUT_PATH=/wc/out/rs.txt MAPPER_CLASS=com.css.hdfs.WordCountMapper
标签:根据 adl 行数据 big style tco shm ado hashmap
原文地址:https://www.cnblogs.com/areyouready/p/9813978.html