[root@com21 apache-flume-1.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume-1.5.2-bin/conf/flume-env.sh + exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume-1.5.2-bin/conf:/home/flume/apache-flume-1.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1
从这里可以看出flume的启动入口是:org.apache.flume.node.Application
下面我们就来看该入口程序是如何来运行的
附:flume每次启动都会先判断有没有与当前配置的三大组件同名的组件存在,存在的话先停掉该组件,顺序为source,sink,channel
其次是启动所有当前配置的组件,启动顺序为channel,sink,source
通过这个启动停止的顺序可以看出flume也是对数据一致性做了保证的。
if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); }这个if的作用就是是否30秒读一下配置,判断是否有更新
主要看一下对于配置内容的处理,两个分支虽然从代码上看逻辑不一样,但是处理的逻辑是一样的
我们看else分支的代码吧:
public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for(String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap. get(channelName); if(channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache. get(channelComponent.channel.getClass()); if(nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
public static SourceRunner forSource(Source source) { SourceRunner runner = null; if (source instanceof PollableSource) { runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) { runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else { throw new IllegalArgumentException("No known runner type for source " + source); } return runner; }这个方法里面通过对source的类型判断来选择使用哪种SourceRunner
public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; }
那么什么时候来调呢?一旦调用这个方法,source与channel的交互就开始了
switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start();上面的代码出现在LifecycleSupervisor类中的内部静态类MonitorRunnable的run方法中,再来看这个线程类谁来调用?
MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);在LifecycleSupervisor类中supervise方法
Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false;那么上面的代码所在方法又是被谁调用的呢?
是Application
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }这样的话,整个链就串起来了
看完source和channel的交互,再来看sink和channel的交互
到这里再看sink就很简单了,因为flume中三大组件都实现自接口LifecycleAware
所以从flume的入口Application来看,从start开始最终都是到LifecycleSupervisor类的supervise方法,而该方法同样:
MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);这串逻辑,不分具体的source,sink,同样是3秒执行一次。
至此,flume中三大组件的交互以及交互频率就说完了,望各位网友不吝指教!!
【Flume】从入口Application来分析flume的source和sink是如何与channel交互的
原文地址:http://blog.csdn.net/simonchi/article/details/42970373