在最近的项目中,需要用到flume。使用的是非常常见的结构:netcat source开启监听端口,接收发送来的报文消息,通过memory channel与sink(重写的roll file sink)写到本地磁盘。特别的是,这里需要根据报文的类型来发往不同的sink(暂且命名为sink1与sink2)。根据该需求,考虑有两种解决方案。
在一个flume的agent中,启用2个source,2个channel以及2个sink。组成两条独立的flow。一条flow接收一种报文类型,互不干扰。这种方案无需重写任何flume的组件,仅需修改flume的配置文件。发送方根据报文类型的不同(这里要求发送方自己必须了解报文类型)发往不同的flume监听端口(即不同flow的netcat source)。
采用selector multiplexing的方式进行选择。对收到的报文进行分类,发往不同的channel,最终送给相应的sink。
官网对于selector multiplexing的介绍大致是:selector会根据event中某个header对应的value来将event发往不同的channel(header与value就是KV结构)。刚看到这里的时候我就有个疑惑,这个header在哪里进行设置的呢?
后来查看源码后,我猜测是source在收到报文后,封装event时,打入的header。这也就意味着如果是这样的话,需要改写项目中的netcat source。netcat source需要能够区分报文的类型,或者能够得到报文发送方提供的报文类型信息,并将报文类型设置到event的header中。完成以上功能,将flume提供的NetcatSource中原来生成event的地方修改为:
bytes.get(body);
String line = new String(body);
String[] records = line.split("\t", 2);
String header = records[0];
String strBody = records[1];
Map<String, String> headers = new HashMap<String, String>();
headers.put("LOG_FILE", header);
这个headers就是一个KV结构的map。
改写好之后,只需修改配置文件即可实现
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called ‘agent‘
agent1.sources = seqGenSrc
agent1.channels = memoryChannel1 memoryChannel2
agent1.sinks = msgRollingSink1 msgRollingSink2
# For each one of the sources, the type is defined
agent1.sources.seqGenSrc.type = com.flume.source.NetcatSource
agent1.sources.seqGenSrc.bind = 192.168.19.107
agent1.sources.seqGenSrc.port = 44444
agent1.sources.seqGenSrc.header = LOG_TYPE
agent1.sources.seqGenSrc.selector.type = multiplexing
agent1.sources.seqGenSrc.selector.header = LOG_TYPE
agent1.sources.seqGenSrc.selector.mapping.CREDIT = memoryChannel1
agent1.sources.seqGenSrc.selector.mapping.OTHER = memoryChannel2
agent1.sources.seqGenSrc.selector.default = memoryChannel2
# The channel can be defined as follows.
agent1.sources.seqGenSrc.channels = memoryChannel1 memoryChannel2
# Each sink‘s type must be defined
#agent1.sinks.msgRollingSink.type = logger
agent1.sinks.msgRollingSink1.type = com.flume.sink.RollingFileSink
agent1.sinks.msgRollingSink1.sink.directory = /home/disk1/somebody/multiplexing/credit_log
#agent1.sinks.msgRollingSink.sink.directory = /home/somebody/realtime-charge-stat/input_test
agent1.sinks.msgRollingSink1.sink.rollInterval = 60
#Specify the channel the sink should use
agent1.sinks.msgRollingSink1.channel = memoryChannel1
根据如上配置文件。客户端在发送报文到flume服务器的时候,仅需在报文正文前加上CREDIT
或OTHER
的报文头,与报文正文用"\t"分隔开来。这样改写的netcat
source即可将报文头打入event的header,而后selector再根据header发往不同的channel/sink。
flume-ng 中 selector multiplexing 的使用,布布扣,bubuko.com
flume-ng 中 selector multiplexing 的使用
原文地址:http://blog.csdn.net/churylin/article/details/38732323