标签:flume ng
上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式。笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式。
整体的结构图如下:
1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现。这里的Thrift Service是由ThriftSourceProtocol定义
2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到ThriftSource的Thrfit Service的服务端,完成应用层序日志的收集
先来看下ThriftSourceProtocol定义的Thrfit服务。Thrift服务定义在flume-ng-sdk工程的flume.thrift中
1. 定义了ThriftFlumeEvent数据结构,日志封装成Event来Flume NG中传递
2. 定义了ThriftSourceProtocol服务,有两个接口,append和appendBatch
namespace java org.apache.flume.thrift struct ThriftFlumeEvent { 1: required map <string, string> headers, 2: required binary body, } enum Status { OK, FAILED, ERROR, UNKNOWN } service ThriftSourceProtocol { Status append(1: ThriftFlumeEvent event), Status appendBatch(1: list<ThriftFlumeEvent> events), }
public class ThriftSourceProtocol { public interface Iface { public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException; public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException; } 。。。。。。 }
1. 把ThriftFlumeEvent转化成SimpleEvent
2. 修改计数器
3. 把SimpleEvent交给ChannelProcessor来处理,传递到下游的Channel中去
可以看到ThriftSouceHandler的实现逻辑和ExecRunnable的逻辑基本是一样的
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface { @Override public Status append(ThriftFlumeEvent event) throws TException { Event flumeEvent = EventBuilder.withBody(event.getBody(), event.getHeaders()); sourceCounter.incrementAppendReceivedCount(); sourceCounter.incrementEventReceivedCount(); try { getChannelProcessor().processEvent(flumeEvent); } catch (ChannelException ex) { logger.warn("Thrift source " + getName() + " could not append events " + "to the channel.", ex); return Status.FAILED; } sourceCounter.incrementAppendAcceptedCount(); sourceCounter.incrementEventAcceptedCount(); return Status.OK; } @Override public Status appendBatch(List<ThriftFlumeEvent> events) throws TException { sourceCounter.incrementAppendBatchReceivedCount(); sourceCounter.addToEventReceivedCount(events.size()); List<Event> flumeEvents = Lists.newArrayList(); for(ThriftFlumeEvent event : events) { flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } try { getChannelProcessor().processEventBatch(flumeEvents); } catch (ChannelException ex) { logger.warn("Thrift source %s could not append events to the " + "channel.", getName()); return Status.FAILED; } sourceCounter.incrementAppendBatchAcceptedCount(); sourceCounter.addToEventAcceptedCount(events.size()); return Status.OK; } } }
有了Thrfit服务实现后,ThrfitSource定义了Thrfit Server。默认是TThreadedSelectorServer,当TThreadedSelectorServer ClassNotFound后,创建TThreadPoolServer,还是没找到的话,那么ThriftSource启动失败。
关于Thrfit Server的更多介绍可以看这篇Thrift源码分析(七)-- TServer服务器分析
Thrift Server的创建和启动主要做了几件事情
1. 创建ServerSocket,这里是TNonblockingServerSocket,非阻塞的ServerSocket
2. 创建服务器参数类TNonblockingServer.AbstractNonblockingServerArgs,所有的服务器的属性设置都是在Args类里传递的
3. TThreadedSelectorServer是一个Reactor模式的服务器实现,需要传递一个线程池。这里是Executors.newFixedThreadPool(maxThreads, threadFactory);
4. 设置编解码协议,这里是TFastFramedTransport协议
5. 设置Thrift服务的实现类Processor,这里是上面定义的ThrfitSourceHandler类
6. 启动Thrift服务器,这里在单独的线程中启动了Thrift服务器。servingExecutor.submit(new Runnable() {public void run() {server.serve();}})
在单独的线程启动Thrift服务器主要的目的是在原来的线程中可以处理一下Thrfit服务器停止后的清理工作。
Class<?> serverClass = null; Class<?> argsClass = null; TServer.AbstractServerArgs args = null; /* * Use reflection to determine if TThreadedSelectServer is available. If * it is not available, use TThreadPoolServer */ try { serverClass = Class.forName("org.apache.thrift" + ".server.TThreadedSelectorServer"); argsClass = Class.forName("org.apache.thrift" + ".server.TThreadedSelectorServer$Args"); // Looks like TThreadedSelectorServer is available, so continue.. ExecutorService sourceService; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( "Flume Thrift IPC Thread %d").build(); if (maxThreads == 0) { sourceService = Executors.newCachedThreadPool(threadFactory); } else { sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory); } serverTransport = new TNonblockingServerSocket( new InetSocketAddress(bindAddress, port)); args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass .getConstructor(TNonblockingServerTransport.class) .newInstance(serverTransport); Method m = argsClass.getDeclaredMethod("executorService", ExecutorService.class); m.invoke(args, sourceService); } try { args.protocolFactory(new TCompactProtocol.Factory()); args.inputTransportFactory(new TFastFramedTransport.Factory()); args.outputTransportFactory(new TFastFramedTransport.Factory()); args.processor(new ThriftSourceProtocol .Processor<ThriftSourceHandler>(new ThriftSourceHandler())); server = (TServer) serverClass.getConstructor(argsClass).newInstance (args); } catch (Throwable ex) { throw new FlumeException("Cannot start Thrift Source.", ex); } servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss") .build()); /** * Start serving. */ servingExecutor.submit(new Runnable() { @Override public void run() { server.serve(); } });
1. 定义RPC服务来收集日志
2. 实现RPC服务,并提供客户端给应用程序。应用程序使用客户端来将日志封装成Event,通过RPC调用传递给RPC类型的Source
3. RPC类型的Source启动RPC Server,提供RPC服务,将接收到的Event传递给下游的Channel
Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志
标签:flume ng
原文地址:http://blog.csdn.net/iter_zc/article/details/46535251