标签:
flume提供了一个度量框架,可以通过http的方式进行展现,当启动agent的时候通过传递参数 -Dflume.monitoring.type=http参数给flume agent:
1
|
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
|
这样flume会在5653端口上启动一个HTTP服务器,访问如下地址,将返回JSON格式的flume相关指标参数:
1
|
demo:
|
Flume也可发送度量信息给Ganglia,用来监控Flume。在任何时候只能启用一个Ganglia或HTTP监控。Flume默认一分钟一次周期性的向Ganglia报告度量:
1
|
demo:
|
1
|
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
|
Flume本身可插拔的架构设计,使得开发自定义插件变得很容易。Flume本身提供了非常丰富的source、channel、sink以及拦截器等插件可供选择,基本可以满足生产需要。具体可以参考Flume用户文档.
plugins.d
是flume事先约定的存放自定义组件的目录。flume在启动的时候会自动将该目录下的文件添加到classpath下,当然你也可以在flume-ng 启动时通过指定--classpath,-C <cp>
参数将自己的文件手动添加到classpath下。
相关目录说明:
1
|
plugins.d/xxx/lib - 插件jar
|
拦截器(Interceptor)是简单插件式组件,设置在Source和Source写入数据的Channel之间。Source接收到的事件在写入对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收的事件。拦截器可以基于任意标准删除或转换事件,但是拦截器必须返回尽可能多(尽可能少)的事件,如同原始传递过来的事件.因为拦截器必须在事件写入Channel之前完成操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此尽量不要在拦截器中做大量耗时的处理操作。如果不得已这么处理了,那么需要相应的调整超时时间属性。Flume自身提供了多种类型的拦截器,比如:时间戳拦截器、主机拦截器、正则过滤拦截器等等。更多内容可以参考Flume Interceptors
拦截器一般用于分析事件以及在需要的时候丢弃事件。编写拦截器时,实现者只需要写以一个实现Interceptor接口的类,同时实现Interceptor$Builder接口的Builer类。所有的Builder类必须有一个公共无参的构造方法,Flume使用该方法来进行实例化。可以使用传递到Builder类的Context实例配置拦截器。所有需要的参数都要传递到Context实例。下面是时间戳拦截器的实现:
1
|
public class TimestampInterceptor implements Interceptor {
|
注:
自定义拦截器的配置方式,interceptors type配置的是XXXInterceptor$Builder
:
1
|
#自定义拦截器 --producer agent名称 --src-1 source名称 —-i1 拦截器名称
|
将自定义代码打包放置到前面的plugins.d/ext-interceptors(可以自己命名)/lib
目录下,启动flume时会自动加载该jar到classpath
Source使用嵌入式的反序列化器读取监控目录下的文件(这里以Spooling Directory Source为例),默认的反序列化器是LineDeserializer。该反序列化器会按行读取文件中的内容,封装成一个Event消息。默认一次读取的最大长度是2048个字符,你可以通过如下配置参数设置改值:
1
|
# --producer agent名称 --src-1 source名称
|
因此在使用LineDeserializer时对源文件内容有个粗略的估计,否则,当某行的内容超出最大长度时。该行内容会被截取成两个部分,封装成两个Event发送到channel中。这样,在某些场景下该行消息相当于非法消息了。如,某个文件按行记录一个http请求的所有内容,而事先我们无法预知一行http请求的最大长度(当然理论上你可以将maxLineLength设置成一个较大的值,解决该问题)。但是这里要说的是另外一种解决方案,很简单,参考LineDeserializer实现一个不限制最大长度的解析器(flume之所以这么设计是出于什么角度考虑?)。反序列化器的定义和前面的拦截器基本相同:
1
|
public class LineDeserializer implements EventDeserializer {
|
接下来的步骤和拦截器一致
1
|
#自定义解析器
|
Flume提供了丰富的source类型,如Avro Source、Exec Source、Spooling Directory Source ….
这里要说的是实际使用过程中遇到的一个问题。还是前面记录http请求内容的场景,为了及时分析http请求的数据,我们将记录http请求的原始文件按照分钟进行切割,然后移动到spooling directory监控目录(如/tmp-logs)下。但是由于一些原因,会出现监控目录下文件重名的情况.
1
|
/tmp-logs/access_2015_10_01_16_30.log.COMPLETED #flume处理完的文件会自动进行重命名.COMPLETED
|
这种情况下后进来的access_2015_10_01_16_30.log,在flume读取完成后会对其进行重命名,但是该文件名已经被占用了,flume就会抛出如下的异常信息,停止处理该监控目录下的其他文件。
1
|
25 九月 2015 16:48:59,228 INFO [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348) - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
|
跟踪抛出异常的源码,SpoolDirectorySource会启动一个线程轮询监控目录下的目标文件,当读取完该文件(readEvents)之后会对该文件进行重名(rollCurrentFile),当重命名失败时会抛出IllegalStateException,被SpoolDirectoryRunnable catch重新抛出RuntimeException,导致当前线程退出,从源码看SpoolDirectoryRunnable是单线程执行的,因此线程结束后,监控目录下其他文件不再被处理:
1
|
# SpoolDirectorySource 启动SpoolDirectoryRunnable
|
现在基本清楚了异常栈的调用逻辑,那么和前面自定义解析器一样,我们可以重写ReliableSpoolingFileEventReader以及SpoolDirectorySource的相关实现,也就是自定义一个spooling source,在rollCurrentFile()重命名失败时,做些处理措施,比如将该文件重新命名为access_2015_10_01_16_30.log(2).COMPLETED(此时文件内容已经读取完毕了)继续处理(注意要是.COMPLETED结尾,不然flume会再次读取该文件)。
改写完成之后,就和前面自定义解析器的处理步骤一样了,打包放在plugins.d目录下,配置:
1
|
producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource
|
基本上flume的各种组件都可以自定义开发,本人使用flume时间也没多久,截止到目前为止遇到问题还有以下几个:
这个坑其实是自己挖的,当时想当然的理解flume的配置参数#producer.sinks.sink-1.requiredAcks = 1(默认是1),我设置成了10,当时使用的kafka sink,由于某个kafka节点出现了问题(还没有仔细验证,是否kafka正常时也会出现该问题?),导致flume一直重发某个时间点的数据,而最新的数据一直被阻塞(可能是被缓存在了channel中)。导致后台接收的一直是某个时间点的消息。后台想到自己改动的这个参数,改回1之后就正常了。下面是官方文档对该参数的说明:
requiredAcks 1 (默认值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
chanel溢出是因为前面的消息重发导致的,当时使用的channle是File Channel,其中有几个配置项值得注意:
配置项 | 默认值 | 说明 |
---|---|---|
transactionCapacity | 10000 | 单个事务中可以写入或读取的事务的最大数量 |
maxFileSize | 2146435071 | 每个数据文件的最大大小(字节),一旦文件达到这个大小(或一旦写入下个文件达到这个大小),该文件保存关闭并在那个目录下创建一个新的数据文件。如果此值设置为高于默认值,仍以默认值为准 |
minimumRequiredSpace | 524288000 | channel继续操作时每个卷所需的最少空间(字节),如果任何一个挂载数据目录的卷只有这么多空间剩余,channel将停止操作来防止损坏和避免不完整的数据被写入 |
capacity | 1000000 | channel可以保存的提交事件的最大数量 |
keep-alive | 3 | 每次写入或读取应该等待完成的最大的时间周期(秒) |
前面的channel溢出推测就是由capacity的达到了限制造成的。
标签:
原文地址:http://www.cnblogs.com/breg/p/5649363.html