标签:
agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = spooldir agent.sources.origin.spoolDir = /export/data/trivial/weblogs agent.sources.origin.channels = memorychannel agent.sources.origin.deserializer.maxLineLength = 2048 agent.sources.origin.interceptors = i2 agent.sources.origin.interceptors.i2.type = host agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 #agent.sources.origin.interceptors = i1 i2 #agent.sources.origin.interceptors.i1.type = timestamp #agent.sources.origin.interceptors.i2.type = host #agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = hdfs agent.sinks.target.channel = memorychannel agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S agent.sinks.target.hdfs.filePrefix = data-%{hostname} agent.sinks.target.hdfs.rollInterval = 60 agent.sinks.target.hdfs.rollSize = 1073741824 agent.sinks.target.hdfs.rollCount = 1000000 agent.sinks.target.hdfs.round = true agent.sinks.target.hdfs.roundValue = 10 agent.sinks.target.hdfs.roundUnit = minute agent.sinks.target.hdfs.useLocalTimeStamp = true agent.sinks.target.hdfs.minBlockReplicas=1 agent.sinks.target.hdfs.writeFormat=Text agent.sinks.target.hdfs.fileType=DataStream
package com.guludada.clickstream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.dataparser.WebLogParser; public class logClean { public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> { private NullWritable v = NullWritable.get(); private Text word = new Text(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //将一行内容转成string String line = value.toString(); String cleanContent = webLogParser.parser(line); if(cleanContent != "") { word.set(cleanContent); try { context.write(word,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(cleanMap.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定job的输入原始文件所在目录 Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/")); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.guludada.javabean.WebLogBean; /** * 用正则表达式匹配出合法的日志记录 * * */ public class WebLogParser { public String parser(String weblog_origin) { WebLogBean weblogbean = new WebLogBean(); // 获取IP地址 Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); Matcher IPMatcher = IPPattern.matcher(weblog_origin); if(IPMatcher.find()) { String IPAddr = IPMatcher.group(0); weblogbean.setIP_addr(IPAddr); } else { return "" } // 获取时间信息 Pattern TimePattern = Pattern.compile("\\[(.+)\\]"); Matcher TimeMatcher = TimePattern.matcher(weblog_origin); if(TimeMatcher.find()) { String time = TimeMatcher.group(1); String[] cleanTime = time.split(" "); weblogbean.setTime(cleanTime[0]); } else { return ""; } //获取其余请求信息 Pattern InfoPattern = Pattern.compile( "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")"); Matcher InfoMatcher = InfoPattern.matcher(weblog_origin); if(InfoMatcher.find()) { String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim(); String[] requestInfoArry = requestInfo.split(" "); weblogbean.setMethod(requestInfoArry[0]); weblogbean.setRequest_URL(requestInfoArry[1]); weblogbean.setRequest_protocol(requestInfoArry[2]); String status_code = InfoMatcher.group(2); weblogbean.setRespond_code(status_code); String respond_data = InfoMatcher.group(3); weblogbean.setRespond_data(respond_data); String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim(); weblogbean.setRequst_come_from(request_come_from); String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim(); weblogbean.setBrowser(browserInfo); } else { return ""; } return weblogbean.toString(); } }
package com.guludada.javabean; public class WebLogBean { String IP_addr; String time; String method; String request_URL; String request_protocol; String respond_code; String respond_data; String requst_come_from; String browser; public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getRequest_protocol() { return request_protocol; } public void setRequest_protocol(String request_protocol) { this.request_protocol = request_protocol; } public String getRespond_code() { return respond_code; } public void setRespond_code(String respond_code) { this.respond_code = respond_code; } public String getRespond_data() { return respond_data; } public void setRespond_data(String respond_data) { this.respond_data = respond_data; } public String getRequst_come_from() { return requst_come_from; } public void setRequst_come_from(String requst_come_from) { this.requst_come_from = requst_come_from; } public String getBrowser() { return browser; } public void setBrowser(String browser) { this.browser = browser; } @Override public String toString() { return IP_addr + " " + time + " " + method + " " + request_URL + " " + request_protocol + " " + respond_code + " " + respond_data + " " + requst_come_from + " " + browser; } }
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.WebLogSessionBean; public class logSession { public static class sessionMapper extends Mapper<Object,Text,Text,Text> { private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //将一行内容转成string String line = value.toString(); String[] weblogArry = line.split(" "); IPAddr.set(weblogArry[0]); content.set(line); try { context.write(IPAddr,content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SessionParser sessionParser = new SessionParser(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Date sessionStartTime = null; String sessionID = UUID.randomUUID().toString(); //将IP地址所对应的用户的所有浏览记录按时间排序 ArrayList<WebLogSessionBean> sessionBeanGroup = new ArrayList<WebLogSessionBean>(); for(Text browseHistory : values) { WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString()); sessionBeanGroup.add(sessionBean); } Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() { public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) { Date date1 = sessionBean1.getTimeWithDateFormat(); Date date2 = sessionBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); for(WebLogSessionBean sessionBean : sessionBeanGroup) { if(sessionStartTime == null) { //当天日志中某用户第一次访问网站的时间 sessionStartTime = timeTransform(sessionBean.getTime()); content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { Date sessionEndTime = timeTransform(sessionBean.getTime()); long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime); if(sessionStayTime > 30 * 60 * 1000) { //将当前浏览记录的时间设为下一个session的开始时间 sessionStartTime = timeTransform(sessionBean.getTime()); sessionID = UUID.randomUUID().toString(); continue; } content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private Date timeTransform(String time) { Date standard_time = null; try { standard_time = sdf.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return standard_time; } private long timeDiffer(Date start_time,Date end_time) { long diffTime = 0; diffTime = end_time.getTime() - start_time.getTime(); return diffTime; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(sessionMapper.class); job.setReducerClass(sessionReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/")); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import com.guludada.javabean.WebLogSessionBean; public class SessionParser { SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String parser(WebLogSessionBean sessionBean,String sessionID) { sessionBean.setSession(sessionID); return sessionBean.toString(); } public WebLogSessionBean loadBean(String sessionContent) { WebLogSessionBean weblogSession = new WebLogSessionBean(); String[] contents = sessionContent.split(" "); weblogSession.setTime(timeTransform(contents[1])); weblogSession.setIP_addr(contents[0]); weblogSession.setRequest_URL(contents[3]); weblogSession.setReferal(contents[7]); return weblogSession; } private String timeTransform(String time) { Date standard_time = null; try { standard_time = sdf_origin.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return sdf_final.format(standard_time); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class WebLogSessionBean { String time; String IP_addr; String session; String request_URL; String referal; public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public Date getTimeWithDateFormat() { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(this.time != null && this.time != "") { try { return sdf_final.parse(this.time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } @Override public String toString() { return time + " " + IP_addr + " " + session + " " + request_URL + " " + referal; } }
时间 | IP | SessionID | 请求页面URL | Referal URL |
2015-05-30 19:38:00 | 192.168.12.130 | Session1 | /blog/me | www.baidu.com |
2015-05-30 19:39:00 | 192.168.12.130 | Session1 | /blog/me/details | www.mysite.com/blog/me |
2015-05-30 19:38:00 | 192.168.12.40 | Session2 | /blog/me | www.baidu.com |
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.clickstream.logSession.sessionMapper; import com.guludada.clickstream.logSession.sessionReducer; import com.guludada.dataparser.PageViewsParser; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViews { public static class pageMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); public void map(Object key,Text value,Context context) { String line = value.toString(); String[] webLogContents = line.split(" "); //根据session来分组 word.set(webLogContents[2]); try { context.write(word,value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text session = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); PageViewsParser pageViewsParser = new PageViewsParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //上一条记录的访问信息 PageViewsBean lastStayPageBean = null; Date lastVisitTime = null; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //将session所对应的所有浏览记录按时间排序 ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>(); for(Text pageView : values) { PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString()); pageViewsBeanGroup.add(pageViewsBean); } Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() { public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) { Date date1 = pageViewsBean1.getTimeWithDateFormat(); Date date2 = pageViewsBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); //计算每个页面的停留时间 int step = 0; for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); if(lastStayPageBean != null) { //计算前后两次访问记录相差的时间,单位是秒 Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000); //根据当前记录的访问信息更新上一条访问记录中访问的页面的停留时间 lastStayPageBean.setStayTime(timeDiff.toString()); } //更新访问记录的步数 step++; pageViewsBean.setStep(step+""); //更新上一条访问记录的停留时间后,将当前访问记录设定为上一条访问信息记录 lastStayPageBean = pageViewsBean; lastVisitTime = curVisitTime; //输出pageViews信息 content.set(pageViewsParser.parser(pageViewsBean)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(PageViews.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(pageMapper.class); job.setReducerClass(pageReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/")); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViewsParser { /** * 根据logSession的输出数据加载PageViewsBean * * */ public PageViewsBean loadBean(String sessionContent) { PageViewsBean pageViewsBean = new PageViewsBean(); String[] contents = sessionContent.split(" "); pageViewsBean.setTime(contents[0] + " " + contents[1]); pageViewsBean.setIP_addr(contents[2]); pageViewsBean.setSession(contents[3]); pageViewsBean.setVisit_URL(contents[4]); pageViewsBean.setStayTime("0"); pageViewsBean.setStep("0"); return pageViewsBean; } public String parser(PageViewsBean pageBean) { return pageBean.toString(); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class PageViewsBean { String session; String IP_addr; String time; String visit_URL; String stayTime; String step; public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getVisit_URL() { return visit_URL; } public void setVisit_URL(String visit_URL) { this.visit_URL = visit_URL; } public String getStayTime() { return stayTime; } public void setStayTime(String stayTime) { this.stayTime = stayTime; } public String getStep() { return step; } public void setStep(String step) { this.step = step; } public Date getTimeWithDateFormat() { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(this.time != null && this.time != "") { try { return sdf_final.parse(this.time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } @Override public String toString() { return session + " " + IP_addr + " " + time + " " + visit_URL + " " + stayTime + " " + step; } }第三次日志清洗产生的PageViews数据结构如下图:
SessionID | IP | 访问时间 | 访问页面 | 停留时间 | 第几步 |
Session1 | 192.168.12.130 | 2016-05-30 15:17:30 | /blog/me | 30000 | 1 |
Session1 | 192.168.12.130 | 2016-05-30 15:18:00 | /blog/me/admin | 30000 | 2 |
Session1 | 192.168.12.130 | 2016-05-30 15:18:30 | /home | 30000 | 3 |
Session2 | 192.168.12.150 | 2016-05-30 15:16:30 | /products | 30000 | 1 |
Session2 | 192.168.12.150 | 2016-05-30 15:17:00 | /products/details | 30000 | 2 |
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.PageViews.pageMapper; import com.guludada.clickstream.PageViews.pageReducer; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.dataparser.PageViewsParser; import com.guludada.dataparser.VisitsInfoParser; import com.guludada.javabean.PageViewsBean; public class VisitsInfo { public static class visitMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); public void map(Object key,Text value,Context context) { String line = value.toString(); String[] webLogContents = line.split(" "); //根据session来分组 word.set(webLogContents[2]); try { context.write(word,value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class visitReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text content = new Text(); private NullWritable v = NullWritable.get(); VisitsInfoParser visitsParser = new VisitsInfoParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); PageViewsParser pageViewsParser = new PageViewsParser(); Map<String,Integer> viewedPagesMap = new HashMap<String,Integer>(); String entry_URL = ""; String leave_URL = ""; int total_visit_pages = 0; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //将session所对应的所有浏览记录按时间排序 ArrayList<String> browseInfoGroup = new ArrayList<String>(); for(Text browseInfo : values) { browseInfoGroup.add(browseInfo.toString()); } Collections.sort(browseInfoGroup,new Comparator<String>() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public int compare(String browseInfo1, String browseInfo2) { String dateStr1 = browseInfo1.split(" ")[0] + " " + browseInfo1.split(" ")[1]; String dateStr2 = browseInfo2.split(" ")[0] + " " + browseInfo2.split(" ")[1]; Date date1; Date date2; try { date1 = sdf.parse(dateStr1); date2 = sdf.parse(dateStr2); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); return 0; } } }); //统计该session访问的总页面数,第一次进入的页面,跳出的页面 for(String browseInfo : browseInfoGroup) { String[] browseInfoStrArr = browseInfo.split(" "); String curVisitURL = browseInfoStrArr[3]; Integer curVisitURLInteger = viewedPagesMap.get(curVisitURL); if(curVisitURLInteger == null) { viewedPagesMap.put(curVisitURL, 1); } } total_visit_pages = viewedPagesMap.size(); String visitsInfo = visitsParser.parser(browseInfoGroup, total_visit_pages+""); content.set(visitsInfo); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(VisitsInfo.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(visitMapper.class); job.setReducerClass(visitReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/visitsinfo"+dateStr+"/")); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.util.ArrayList; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.VisitsInfoBean; import com.guludada.javabean.WebLogSessionBean; public class VisitsInfoParser { public String parser(ArrayList<String> pageViewsGroup,String totalVisitNum) { VisitsInfoBean visitsBean = new VisitsInfoBean(); String entryPage = pageViewsGroup.get(0).split(" ")[4]; String leavePage = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[4]; String startTime = pageViewsGroup.get(0).split(" ")[0] + " " + pageViewsGroup.get(0).split(" ")[1]; String endTime = pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[0] + " " +pageViewsGroup.get(pageViewsGroup.size()-1).split(" ")[1]; String session = pageViewsGroup.get(0).split(" ")[3]; String IP = pageViewsGroup.get(0).split(" ")[2]; String referal = pageViewsGroup.get(0).split(" ")[5]; visitsBean.setSession(session); visitsBean.setStart_time(startTime); visitsBean.setEnd_time(endTime); visitsBean.setEntry_page(entryPage); visitsBean.setLeave_page(leavePage); visitsBean.setVisit_page_num(totalVisitNum); visitsBean.setIP_addr(IP); visitsBean.setReferal(referal); return visitsBean.toString(); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class VisitsInfoBean { String session; String start_time; String end_time; String entry_page; String leave_page; String visit_page_num; String IP_addr; String referal; public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getStart_time() { return start_time; } public void setStart_time(String start_time) { this.start_time = start_time; } public String getEnd_time() { return end_time; } public void setEnd_time(String end_time) { this.end_time = end_time; } public String getEntry_page() { return entry_page; } public void setEntry_page(String entry_page) { this.entry_page = entry_page; } public String getLeave_page() { return leave_page; } public void setLeave_page(String leave_page) { this.leave_page = leave_page; } public String getVisit_page_num() { return visit_page_num; } public void setVisit_page_num(String visit_page_num) { this.visit_page_num = visit_page_num; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } @Override public String toString() { return session + " " + start_time + " " + end_time + " " + entry_page + " " + leave_page + " " + visit_page_num + " " + IP_addr + " " + referal; } }
SessionID | 访问时间 | 离开时间 | 第一次访问页面 | 最后一次访问的页面 | 访问的页面总数 |
IP | Referal |
Session1 | 2016-05-30 15:17:00 | 2016-05-30 15:19:00 | /blog/me | /blog/others | 5 | 192.168.12.130 | www.baidu.com |
Session2 | 2016-05-30 14:17:00 | 2016-05-30 15:19:38 | /home | /profile | 10 | 192.168.12.140 | www.178.com |
Session3 | 2016-05-30 12:17:00 | 2016-05-30 15:40:00 | /products | /detail | 6 | 192.168.12.150 | www.78dm.net |
#!/bin/bash export HADOOP_HOME=/home/ymh/apps/hadoop-2.6.4 #start hdfs /home/ymh/apps/hadoop-2.6.4/sbin/start-dfs.sh #start yarn if [[ 0 == $? ]] then /home/ymh/apps/hadoop-2.6.4/sbin/start-yarn.sh fi #start flume #if [[ 0 == $? ]] #then #start flume #$nohup ~/apache-flume-1.6.0-bin/bin/flume-ng agent -n agent -c conf -f ~/apache-flume-1.6.0-bin/conf/flume-conf.properties & #fi #start mysql if [ 0 = $? ] then service mysqld start fi #start HIVE SERVER if [ 0 = $? ] then $nohup /apps/apache-hive-1.2.1-bin/bin/hiveserver2 & fi</span>
#!/bin/bash CURDATE=$(date +%y-%m-%d) CURDATEHIVE=$(date +%Y-%m-%d) /home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -df /flume/events/$CURDATE if [[ 1 -ne $? ]] then /home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logClean fi if [[ 1 -ne $? ]] then /home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.logSession fi if [[ 1 -ne $? ]] then /home/ymh/apps/hadoop-2.6.4/bin/hadoop jar /export/data/mydata/clickstream.jar com.guludada.clickstream.PageViews fi #Load today's data if [[ 1 -ne $? ]] then /home/ymh/apps/hadoop-2.6.4/bin/hdfs dfs -chmod 777 /clickstream/pageviews/$CURDATE/ echo "load data inpath '/clickstream/pageviews/$CURDATE/' into table pageviews partition(inputDate='$CURDATEHIVE');" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000 fi #Create fact table and its dimension tables if [[ 1 -ne $? ]] then echo "insert into table ods_pageviews partition(inputDate='$CURDATEHIVE') select pv.session,pv.ip,concat(pv.requestdate,'-',pv.requesttime) as viewtime,pv.visitpage,pv.staytime,pv.step from pageviews as pv where pv.inputDate='$CURDATEHIVE';" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000 fi if [[ 1 -ne $? ]] then echo "insert into table ods_dim_pageviews_time partition(inputDate='$CURDATEHIVE') select distinct pv.viewtime, substring(pv.viewtime,0,4),substring(pv.viewtime,6,2),substring(pv.viewtime,9,2),substring(pv.viewtime,12,2),substring(pv.viewtime,15,2),substring(pv.viewtime,18,2) from ods_pageviews as pv;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000 fi if [[ 1 -ne $? ]] then echo "insert into table ods_dim_pageviews_url partition(inputDate='$CURDATEHIVE') select distinct pv.visitpage,b.host,b.path,b.query from pageviews pv lateral view parse_url_tuple(concat('https://localhost',pv.visitpage),'HOST','PATH','QUERY') b as host,path,query;" | /apps/apache-hive-1.2.1-bin/bin/beeline -u jdbc:hive2://localhost:10000 fi </span>
$vi root_crontab_hadoop $echo "0 1 * * * /myShells/dataAnalyseTask.sh" >> root_crontab_hadoop $crontab root_crontab_hadoop
标签:
原文地址:http://blog.csdn.net/ymh198816/article/details/51540715