码迷,mamicode.com
首页 > Web开发 > 详细

flume sink运行过程简单分析

时间:2015-02-05 21:47:15      阅读:211      评论:0      收藏:0      [点我收藏+]

标签:

没有运行,直接看源码得到sink简单运行过程

SinkRunner负责运行sink程序

内部类

PollingRunner implements Runnable

{

   private SinkProcessor policy;

}

负责运行sink

run方法

 

while (!shouldStop.get()) {
  try {
    if (policy.process().equals(Sink.Status.BACKOFF)) {
      counterGroup.incrementAndGet("runner.backoffs");

      Thread.sleep(Math.min(counterGroup.incrementAndGet("runner.backoffs.consecutive")* backoffSleepIncrement, maxBackoffSleep));
    } else {
      counterGroup.set("runner.backoffs.consecutive", 0L);
    }
  } catch (InterruptedException e) {
      logger.debug("Interrupted while processing an event. Exiting.");
      counterGroup.incrementAndGet("runner.interruptions");
  } catch (Exception e) {
      logger.error("Unable to deliver event. Exception follows.", e);
      if (e instanceof EventDeliveryException) {
        counterGroup.incrementAndGet("runner.deliveryErrors");
      } else {
        counterGroup.incrementAndGet("runner.errors");
      }
  try {
      Thread.sleep(maxBackoffSleep);
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
    }
  }

 

policy 对应具体的sink处理器,这里以FailoverSinkProcessor举例子

这里面,针对FailoverSinkProcessor可以参照 http://blog.csdn.net/simonchi/article/details/42520193讲解,这里大致说下便可

 

configure方法

liveSinks = new TreeMap<Integer, Sink>();
failedSinks = new PriorityQueue<FailedSink>();

从配置文件中定义的sinks中遍历每一个sink,获得其优先级,然后放到liveSinks中,无论sink是否可用。

最后,activeSink = liveSinks.get(liveSinks.lastKey());,从liveSinks按照key排序,获得最后一个key(优先级,最大)对应的sink初始化 activeSink 

 

 

policy.process().equals(Sink.Status.BACKOFF))执行的是FailoverSinkProcessor的process()方法

process()方法

首先一个while循环,遍历所有的failedSinks ,拿出每一个failed的sink,如果拿出来的failed sink能够访问了,则把他付给activeSink ,并return sink.process()的状态。在轮询的过程中,如果failed sink还是不能到达,则重新放入到failedSinks 中并刷新时间,否则,如果能够联通,但是状态不是READY,也放入到failedSinks 中且不刷新。

 

之后,是对activeSink进行while循环,调用activeSink中的每一个sink.proccess().调用成功,则return状态。否则,出现异常,将当前active的sink移动到failedSinks 中,同时获得下一个active的sink从activeSink中。继续while判断

 

函数的最后是一个异常,即没有任何一个sink可用。

 

sink.process()是啥?是从channel中拿出数据的。

这里以NullSink为例

根据事务和batchsize从chanel中拿出数据来,并写入到相应的位置

public Status process() throws EventDeliveryException {
    Status status = Status.READY;

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    long eventCounter = counterGroup.get("events.success");

    try {
        transaction.begin();
        int i = 0;
        for (i = 0; i < batchSize; i++) {
          event = channel.take();
          if (++eventCounter % logEveryNEvents == 0) {
            logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);
          }
          if(event == null) {
              status = Status.BACKOFF;
              break;
          }
        }
        transaction.commit();
        counterGroup.addAndGet("events.success", (long) Math.min(batchSize, i));
        counterGroup.incrementAndGet("transaction.success");
     } catch (Exception ex) {
        transaction.rollback();
        counterGroup.incrementAndGet("transaction.failed");
        logger.error("Failed to deliver event. Exception follows.", ex);
        throw new EventDeliveryException("Failed to deliver event: " + event, ex);
    } finally {
        transaction.close();
    }

      return status;
  }

 

flume sink运行过程简单分析

标签:

原文地址:http://www.cnblogs.com/kanliwei/p/4275974.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!