org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理。
把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种:
REPLICATING->org.apache.flume.channel.ReplicatingChannelSelector, MULTIPLEXING->org.apache.flume.channel.MultiplexingChannelSelector;
REPLICATING 把Event发送到每一个对应的channel上,每个channel都有完整的一份。
MULTIPLEXING 把Event发送到设置的映射的channel上,类似于hash,每个channel包含一部分
org.apache.flume.channel.MultiplexingChannelSelector会根据header(默认为flume.selector.header),mapping,default,optional的设置获取channel。
这里看下org.apache.flume.channel.ReplicatingChannelSelector的实现,可以看出有两个channel列表,optional和require,分布对应getOptionalChannels和getRequiredChannels方法,如果设置了optional,optionalChannels为optional的设置,requiredChannels为getAllChannels的设置减去optionalChannels的设置
public void configure(Context context) { //通过configure配置requiredChannels
String optionalList = context.getString(CONFIG_OPTIONAL); //根据optional的设置
requiredChannels = new ArrayList<Channel>(getAllChannels()); //初始时requiredChannels 即为getAllChannels
Map<String, Channel> channelNameMap = getChannelNameMap();
if(optionalList != null && !optionalList.isEmpty()) { //如果optional的设置不为空
for(String optional : optionalList.split("\\s+")) { //对optional按空格进行split
Channel optionalChannel = channelNameMap.get(optional);
requiredChannels.remove(optionalChannel); //从requiredChannels 数组中去除optionalChannel
if (!optionalChannels.contains(optionalChannel)) {
optionalChannels.add(optionalChannel); //添加到optionalChannels
}
}
}
}ChannelProcessor的初始调用在SourceRunner中,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:
public void start() {
Source source = getSource(); //通过getSource获取Source对象
ChannelProcessor cp = source.getChannelProcessor(); //获取ChannelProcessor 对象
cp.initialize(); //调用ChannelProcessor.initialize方法
source.start(); //调用Source.start方法
lifecycleState = LifecycleState. START;
}而在org.apache.flume.source.ExecSource.ExecRunnable类中会调用其processEventBatch方法,进行批量插入数据
while ((line = reader.readLine()) != null) {
counterGroup.incrementAndGet("exec.lines.read" );
eventList.add(EventBuilder. withBody(line.getBytes(charset)));
if(eventList.size() >= bufferCount ) {
channelProcessor.processEventBatch(eventList);
eventList.clear();
}
}看下ChannelProcessor的具体实现:
首先两个重要的属性
private final ChannelSelector selector ; private final InterceptorChain interceptorChain ;
initialize方法调用InterceptorChain.initialize方法,初始化interceptorChain
public void initialize() {
interceptorChain.initialize();
}configure方法调用configureInterceptors方法,用于根据interceptors设置InterceptorChain
private void configureInterceptors(Context context) {
List<Interceptor> interceptors = Lists.newLinkedList();
String interceptorListStr = context.getString( "interceptors", "" ); //获取interceptors的设置
if (interceptorListStr.isEmpty()) {
return;
}
String[] interceptorNames = interceptorListStr.split( "\\s+"); //根据空格分隔
Context interceptorContexts =
new Context(context.getSubProperties("interceptors." ));
// run through and instantiate all the interceptors specified in the Context
InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
for (String interceptorName : interceptorNames) {
Context interceptorContext = new Context(
interceptorContexts.getSubProperties(interceptorName + "."));
String type = interceptorContext.getString( "type");
if (type == null) {
LOG.error("Type not specified for interceptor " + interceptorName);
throw new FlumeException("Interceptor.Type not specified for " +
interceptorName);
}
try {
Interceptor.Builder builder = factory.newInstance(type); //根据type的设置获取Interceptor
builder.configure(interceptorContext);
interceptors.add(builder.build());
......
}
interceptorChain.setInterceptors(interceptors);
}另外提供了两个插入数据的方法,processEventBatch和processEvent,processEventBatch用于插入一批Event(参数是List<Event> events),processEvent用于插入一个Event。
看下processEvent的实现:
public void processEvent(Event event) {
event = interceptorChain.intercept(event); //调用InterceptorChain.intercept对Event进行处理
if (event == null) {
return;
}
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event); // 根据ChannelSelector获取requiredChannels
for (Channel reqChannel : requiredChannels) { // 对requiredChannels 中的每一个channel执行对应的put操作,每个操作都在一个事务内
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
reqChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
throw new ChannelException("Unable to put event on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
List<Channel> optionalChannels = selector.getOptionalChannels(event); //同样对optionalChannels做相同的操作
for (Channel optChannel : optionalChannels) {
Transaction tx = null;
try {
tx = optChannel.getTransaction();
tx.begin();
optChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put event on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1618438
原文地址:http://caiguangguang.blog.51cto.com/1652935/1618438