上一篇请参考【Flume NG用户指南】(1)设置
前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件中读取的。
要在一个Flume Agent中定义数据流,你需要通过一个Channel将Source和Sink连接起来。你需要列出给定Agent的Source、Sink和Channel。一个Source可以指定多个Channel,但是一个Sink只能指定一个Channel。格式如下:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>例如,一个叫做agent_foo的Agent从一个外部的Avro客户端读取数据,然后通过Memory Channel将数据发送到HDFS上。配置文件如下:
# list the sources, sinks and channels for the agent agent_foo.sources = avro-appserver-src-1 agent_foo.sinks = hdfs-sink-1 agent_foo.channels = mem-channel-1 # set channel for source agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1 # set channel for sink agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1这个配置将会使Event通过一个叫mem-channel-1的Memory Channel从avro-AppSrv-source流向hdfs-Cluster1-sink。当Agent使用此配置文件启动的时候,它就会实例化这个数据流。
定义数据流之后,你需要设置每一个Source、Sink和Channel的属性。属性位于每个组件类型配置的层次命名空间下。
# properties for sources <Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
Flume的每一个组件都需要设置“type”属性,以便理解到底需要的是那种组件对象。每一个Source、Sink和Channel类型都有它自己的属性集。这些属性都需要根据需要设置。就像前边的一个通过mem-channel-1的Memory Channel从avro-AppSrv-source流向hdfs-Cluster1-sink的数据流的例子,下边是这些组件配置的例子:
agent_foo.sources = avro-AppSrv-source agent_foo.sinks = hdfs-Cluster1-sink agent_foo.channels = mem-channel-1 # set channel for sources, sinks # properties of avro-AppSrv-source agent_foo.sources.avro-AppSrv-source.type = avro agent_foo.sources.avro-AppSrv-source.bind = localhost agent_foo.sources.avro-AppSrv-source.port = 10000 # properties of mem-channel-1 agent_foo.channels.mem-channel-1.type = memory agent_foo.channels.mem-channel-1.capacity = 1000 agent_foo.channels.mem-channel-1.transactionCapacity = 100 # properties of hdfs-Cluster1-sink agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata #...
一个简单的Flume Agent可以包含多个独立的数据流配置。你可以在配置中列出多个Source、Sink和Channel。这些组件能够互联起来形成数据流:
# list the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Source2> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2>之后你就可以连接Source和Sink到它们各自的Channel上,从而形成不同的数据流。例如,如果你需要在一个Agent中设置两个数据流,一个从外部Avro客户端流向外部HDFS,另一个从tail的输出流向Avro Sink,下边就是实现这么个数据流的配置:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 exec-tail-source2 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # flow #1 configuration agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 configuration agent_foo.sources.exec-tail-source2.channels = file-channel-2 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
为设置一个多层的数据流,你需要在第一跳上使用一个Avro/Thrift Sink指向下一跳的Avro/Thrift Source。这就会使第一个Flume Agent传输Event到下一个Flume Agent。例如,如果你在使用Avro客户端周期性的发送文件(每个Event包含一个文件)到本地Flume Agent,然后本地Agent又把Event传输到另一个和最终存储系统挂载的Flume Agent上。看下边的例子
Weblog Agent配置:
# list sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source agent_foo.sinks = avro-forward-sink agent_foo.channels = file-channel # define the flow agent_foo.sources.avro-AppSrv-source.channels = file-channel agent_foo.sinks.avro-forward-sink.channel = file-channel # avro sink properties agent_foo.sources.avro-forward-sink.type = avro agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100 agent_foo.sources.avro-forward-sink.port = 10000 # configure other pieces #...HDFS Agent配置:
# list sources, sinks and channels in the agent agent_foo.sources = avro-collection-source agent_foo.sinks = hdfs-sink agent_foo.channels = mem-channel # define the flow agent_foo.sources.avro-collection-source.channels = mem-channel agent_foo.sinks.hdfs-sink.channel = mem-channel # avro sink properties agent_foo.sources.avro-collection-source.type = avro agent_foo.sources.avro-collection-source.bind = 10.1.1.100 agent_foo.sources.avro-collection-source.port = 10000 # configure other pieces #...这里我们连接Weblog Agent的avro-forward-sink到HDFS Agent的avro-collection-source。这就会使来自外部appserver Source的Event最终存储到HDFS中。
前边的章节已经讨论过,Flume支持从一个Source扇出数据到多个Channel中。有两种扇出模式:复制(replicating)和多路(multiplexing)。在复制流中,Event被发送到所有配置Channel中。在多路流中,Event被发送到一个特定的子集。为扇出数据流,你需要为这个Source指定一个Channel列表以及扇出的策略。这是通过添加一个Channel “selector”(可以是复制或者多路)实现的。然后如果多路的模式,还需进一步指定选择规则。如果不指定“selector”,默认是复制:
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
多路选择还有一些其他的配置来使数据流分叉。这就需要指定一个Event属性到一个Channel集合的映射。选择器会检查Event Header上配置的每一个属性。如果它匹配了指定的值,那么这个Event就会被发送到跟这个值映射的所有Channel上。如果没有任何匹配,这个Event就会被发送到发送到默认配置的Channel集合上:
# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2>为每个值的映射的Channel集合允许重叠。
下边的例子是一个单一流,它多路输出到两个路径上。名叫agent_foo的Agent有一个单一的Avro Source和两个Channel连接到两个Sink上:
# list the sources, sinks and channels in the agent agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # set channels for source agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # set channel for sinks agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1多路选择器检查Event中一个叫做“State”的Header属性。如果它的值是“CA”,这个Event就会被发送到mem-channel-1,如果它是“AZ”,这个Event就会被发送到file-channel-2,或者如果它是“NY”,那么这个Event就会被发送到这两个Channel上。如果“State” Header属性没有设置或者没有匹配上以上3个的任何一个,这个Event就被发送到mem-channel-1上,它是默认的。
多路选择器也支持可选的Channel。为了为Header指定可选的Channel,“optional”配置参数需要像下边的方式一样使用:
# channel selector configuration agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1多路选择器将会首先尝试向必需的Channel上写入,即使这些必需的Channel中有任何一个没有成功消费Event,这整个事务将会失败。事务将会在所有的必需的Channel上重试,一旦多有必需的Channel都成功的消费了Event,多路选择器才会尝试向可选的Channel上写入。并且可选的Channel中有任何消费Event失败的,Flume也会简单忽略它并且不会重试。
如果一个特殊Header的可选Channel集合和必选Channel集合有重叠的,那么这些Channel就被认为是必选的,那自然在这些Channel的失败会导致所有Channel的重试。例如,上边的例子中,“CA” Header对应的mem-channel-1就被认为是必选的Channel,尽管它同时被标记为必选和可选的,对这个Channel的写入失败将会导致跟这个选择器关联的所有Channel上重试。
注意如果一个Header并没有配置任何必选的Channel,那么这个Event将会被写入默认的Channel上,并且将会尝试写入到跟这个Header关联的可选Channel上。如果指定了可选的Channel,而没有指定必选的Channel,依然会导致Event被发送到默认的Channel上。如果没有Channel被指定为默认的并且也没有必选的,选择器会尝试将Event写入到可选的Channel中。这种情况下,任何的失败都会被简单忽略并且不在重试。
下一篇请参考【Flume NG用户指南】(3)Flume Sources
【Flume NG用户指南】(2)配置,布布扣,bubuko.com
原文地址:http://blog.csdn.net/zhoubangtao/article/details/28277575