标签:image type ima 目录 cap span 代码 min letter
flume用户自定义拦截器.创建flume-demo的maven项目.
创建项目文件POM.xml.
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency>
package com.kpwong.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; public class CustomInterceptor implements Interceptor { @Override public void initialize() { } //单个事件拦截 @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); String body = new String( event.getBody()); if (body.contains("hello")){ headers.put("topic","letter"); } else { headers.put("topic","number"); } return event; } //多个事件拦截 @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
打包项目jar文件。拷贝文件到/flume/lib目录下
配置conf文件.准备三台机器(hadoop202,hadoop203,hadoop204)
在hadoop202上。配置flume2.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 k2 a2.channels = c1 c2 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = localhost a2.sources.r1.port = 44444 #channel interceptors a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder a2.sources.r1.selector.type = multiplexing a2.sources.r1.selector.header = topic a2.sources.r1.selector.mapping.letter = c1 a2.sources.r1.selector.mapping.number = c2 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop203 a2.sinks.k1.port = 4141 a2.sinks.k2.type=avro a2.sinks.k2.hostname = hadoop204 a2.sinks.k2.port = 4142 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2
拦截器配置代码:
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.letter = c1
a2.sources.r1.selector.mapping.number = c2
hadoop203上配置flume3.conf
a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = avro a3.sources.r1.bind = hadoop203 a3.sources.r1.port = 4141 a3.sinks.k1.type = logger a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1
hadoop204上配置:
a4.sources = r1 a4.sinks = k1 a4.channels = c1 a4.sources.r1.type = avro a4.sources.r1.bind = hadoop204 a4.sources.r1.port = 4142 a4.sinks.k1.type = logger a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100 a4.sinks.k1.channel = c1 a4.sources.r1.channels = c1
在hadoop204上运行:
bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf -n a4 -Dflume.root.logger=INFO,console
在hadoop203上运行:
bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console
在hadoop202上运行:
bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2
nc localhost 44444
实验结果:
标签:image type ima 目录 cap span 代码 min letter
原文地址:https://www.cnblogs.com/kpwong/p/14504079.html