标签:des blog java os io for ar art div
package com.libc; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Process { public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> { private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String datas = ""; try { datas = new String(value.getBytes(), 0, value.getLength(), "GBK"); } catch (UnsupportedEncodingException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // datas = value.toString(); try { String[] split = datas.split(" time="); // 处理头中包含空格的字段 Pattern p = Pattern.compile("phonemodel=\"(.*?)\""); String pm = getIndex(split[0], p); split[0] = split[0].replaceAll(pm, pm.replace(" ", "")); Pattern p1 = Pattern.compile("networktype=\"(.*?)\""); String nt = getIndex(split[0], p1); split[0] = split[0].replaceAll(nt, nt.replace(" ", "")); for (int i = 1; i < split.length; i++) { String[] codes = split[i].split(" ", 4); int headLen = split[0].split(" ").length; if (headLen != 20) { // 丢掉错误日志 continue; } // 处理旧版本日志判别标准:| if (codes[2].equals("code=\"100\"")){ if(codes[3].indexOf("contact_name")>-1){ codes[3] = process100(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); }else if(codes[2].equals("code=\"101\"") ){ if(codes[3].indexOf("message_to_")>-1){ codes[3] = process101(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); } else if(codes[2].equals("code=\"102\"")){ if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){ codes[3] = process102(codes[3]); } codes[3] = codes[3].replace(‘ ‘, ‘#‘); }else{ codes[3] = codes[3].replace(" ", " "); } String collect = split[0] + " time=" + codes[0] + " " + codes[1] + " " + codes[2] + " " + codes[3]; word.set(collect); context.write(word, new Text("")); } } catch (Exception e) { // TODO Auto-generated catch block } } } public static String process100(String code) throws Exception{ String[] codes = code.split(" "); HashMap<String, Contact> hs = new HashMap<String, Process.Contact>(); Pattern p0 = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)\""); for (int i = 0; i < codes.length; i++) { if (codes[i].equals("")) continue; String index = getIndex(codes[i], p0); if (index == null) continue; String value = getIndex(codes[i], p1); Contact contact = null; if (hs.containsKey(index)) { contact = hs.get(index); } else { contact = new Contact(); } if (codes[i].startsWith("contact_name_")) { contact.contactName = value; } else if (codes[i].startsWith("contact_num_")) { contact.contactNum = value; } contact.index = index; hs.put(index, contact); } return printToString(hs); } public static String process101(String code) throws Exception{ String[] codes = code.split("\" "); HashMap<String, Message> hs = new HashMap<String, Process.Message>(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); String value = getIndex(codes[i], p1); if (index == null) continue; Message message = null; if (hs.containsKey(index)) { message = hs.get(index); } else { message = new Message(); } if (codes[i].startsWith("message_time_")) { message.messageTime = value; } else if (codes[i].startsWith("message_to_")) { message.messageTo = value; } message.index = index; hs.put(index, message); } return printToString(hs); } public static String process102(String code) throws Exception{ String[] codes = code.split("\" "); HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>(); Pattern p = Pattern.compile("_(\\d*)="); Pattern p1 = Pattern.compile("\"(.*)"); for (int i = 0; i < codes.length; i++) { String index = getIndex(codes[i], p); if (index == null) continue; String value = getIndex(codes[i], p1); CallLog callLog = null; if (hs.containsKey(index)) { callLog = hs.get(index); } else { callLog = new CallLog(); } if (codes[i].startsWith("caller_date_")) { callLog.callerDate = value; } else if (codes[i].startsWith("caller_duration_")) { callLog.callerDuration = value; } else if (codes[i].startsWith("caller_name_")) { callLog.callerName = value; } else if (codes[i].startsWith("caller_num_")) { callLog.callerNum = value; } callLog.index = index; hs.put(index, callLog); } return printToString(hs); } public static String printToString(Map hs) { Set set = hs.keySet(); Iterator<String> it = set.iterator(); String result = ""; while (it.hasNext()) { result = result + hs.get(it.next()).toString() + "|"; } return result; } public static String getIndex(String code, Pattern p) { String index = null; Matcher matcher = p.matcher(code); if (matcher.find()) { index = matcher.group(1); } return index; } public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Text rr, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static class Contact { public String index; public String contactName; public String contactNum; @Override public String toString() { // TODO Auto-generated method stub return "contact_" + index + "=" + this.contactName + ";" + this.contactNum; } } public static class Message { public String index; public String messageTime; public String messageTo; @Override public String toString() { // TODO Auto-generated method stub return "message_" + this.index + "=" + this.messageTo + ";" + this.messageTime; } } public static class CallLog { public String index; public String callerDuration; public String callerNum; public String callerName; public String callerDate; @Override public String toString() { // TODO Auto-generated method stub return "callLog_" + this.index + "=" + this.callerName + ";" + this.callerNum + ";" + this.callerDate + ";" + this.callerDuration; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: process <in> <out>"); System.exit(2); } Job job = new Job(conf, "process"); job.setJarByClass(Process.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,
因此修改为第二版后可以对jvm内存自定义配置
标签:des blog java os io for ar art div
原文地址:http://www.cnblogs.com/charlie-badegg/p/3932626.html