标签:
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。目前属于apache的一个子项目。
一般来说,部署到服务器上的flume是安装在unix/linux环境下使用的,但是有时为了测试和调试方便,我们也会有在windows系统上安装的需求。对当前flume最新版本1.6来说,在windows上使用相对比较方便,因为其自带了一套执行环境shell。
这是一套使用powershell编制的执行环境,启动程序在apache-flume-1.6.0-bin\bin目录下,flume-ng.cmd。
打开命令行,输入flume-ng.cmd help可以查看该程序使用方法。 如图:
以一个采集脱机目录日志源的flume agent为例,可以以如下命令运行这个agent :
flume-ng.cmd agent --conf ..\conf --conf-file ..\conf\t1.conf --name a1
t1.conf:
a1.sources = r1 a1.channels = memoryChannel a1.sinks = spoolSink a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = spool_dir a1.sources.r1.fileHeader = true a1.sources.r1.channels = memoryChannel #interceptors a1.sources.r1.interceptors = e1 a1.sources.r1.interceptors.e1.type = regex_extractor a1.sources.r1.interceptors.e1.regex = ^([\\D]+[\\d]+[\\s]+[\\d\\:]+)\\s+([\\d\\-]+[\\s]+[\\d\\:]+[\\s]+[\\d\\:]+)\\s([\\S\\-]+)\\s([\\S\\/]+)\\s.[\\w]+\\S([\\d\\.]+)\\S\\s[\\w]+\\S([\\w]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s[\\W]+([\\d\\.]+\\S[\\d]+)\\S\\s\\S([\\d\\/]+[\\s]+[\\d\\:]+)\\s\\S\\s([\\d\\/]+[\\s][\\d\\:]+)\\S\\s[\\w\\s]+\\S([\\d]+)\\s[\\w\\s]+\\S([\\d]+)\\S\\s[\\w]+\\S([\\d\\#]+)[\\w\\s]+\\S([\\d\\.]+);$ a1.sources.r1.interceptors.e1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 s14 s15 a1.sources.r1.interceptors.e1.serializers.s1.name = time1 a1.sources.r1.interceptors.e1.serializers.s2.name = time2 a1.sources.r1.interceptors.e1.serializers.s3.name = xy a1.sources.r1.interceptors.e1.serializers.s4.name = session a1.sources.r1.interceptors.e1.serializers.s5.name = devip a1.sources.r1.interceptors.e1.serializers.s6.name = protocol a1.sources.r1.interceptors.e1.serializers.s7.name = ip1 a1.sources.r1.interceptors.e1.serializers.s8.name = ip2 a1.sources.r1.interceptors.e1.serializers.s9.name = ip3 a1.sources.r1.interceptors.e1.serializers.s10.name = starttime a1.sources.r1.interceptors.e1.serializers.s11.name = endtime a1.sources.r1.interceptors.e1.serializers.s12.name = srcvpn a1.sources.r1.interceptors.e1.serializers.s13.name = desvpn a1.sources.r1.interceptors.e1.serializers.s14.name = status a1.sources.r1.interceptors.e1.serializers.s15.name = username #channels a1.channels.memoryChannel.type = memory a1.channels.memoryChannel.capacity = 300 a1.channels.memoryChannel.transactionCapacity= 300 #sink a1.sinks.spoolSink.type = com.hzfi.flume.PatternTestSink a1.sinks.spoolSink.channel = memoryChannel
上例中使用了一个interceptor regex_extractor来对脱机目录下的日志中的记录进行正则表达式模式识别,将记录切分为15个子模式,分别加入到flume event的header里边。
sink为一个自定义的PatternTestSink,代码如下:
package com.hzfi.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class PatternTestSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; try { transaction.begin(); event = channel.take(); if (event != null) { String body = new String(event.getBody(), "utf-8"); System.out.println("----->event headers..."); System.out.println("header content:[" + event.getHeaders().toString() + "]"); System.out.println("----->event body..."); System.out.println("body content:[" + body + "]"); } else { result = Status.BACKOFF; } transaction.commit(); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to got pattern event: " + event, ex); } finally { transaction.close(); } return result; } @Override public void configure(Context arg0) { // TODO Auto-generated method stub } }
标签:
原文地址:http://www.cnblogs.com/lyhero11/p/5104773.html