FailoverSinkProcessor顾名思义是flume中sink输出容错的处理器
继承自AbstractSinkProcessor
先看下整体源码
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flume.sink; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.Queue; import java.util.SortedMap; import java.util.TreeMap; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.Sink.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * FailoverSinkProcessor maintains a prioritized list of sinks, * guarranteeing that so long as one is available events will be processed. * * The failover mechanism works by relegating failed sinks to a pool * where they are assigned a cooldown period, increasing with sequential * failures before they are retried. Once a sink succesfully sends an * event it is restored to the live pool. * * FailoverSinkProcessor is in no way thread safe and expects to be run via * SinkRunner Additionally, setSinks must be called before configure, and * additional sinks cannot be added while running * * To configure, set a sink groups processor to "failover" and set priorities * for individual sinks, all priorities must be unique. Furthermore, an * upper limit to failover time can be set(in miliseconds) using maxpenalty * * Ex) * * host1.sinkgroups = group1 * * host1.sinkgroups.group1.sinks = sink1 sink2 * host1.sinkgroups.group1.processor.type = failover * host1.sinkgroups.group1.processor.priority.sink1 = 5 * host1.sinkgroups.group1.processor.priority.sink2 = 10 * host1.sinkgroups.group1.processor.maxpenalty = 10000 * */ public class FailoverSinkProcessor extends AbstractSinkProcessor { private static final int FAILURE_PENALTY = 1000; private static final int DEFAULT_MAX_PENALTY = 30000; private class FailedSink implements Comparable<FailedSink> { private Long refresh; private Integer priority; private Sink sink; private Integer sequentialFailures; public FailedSink(Integer priority, Sink sink, int seqFailures) { this.sink = sink; this.priority = priority; this.sequentialFailures = seqFailures; adjustRefresh(); } @Override public int compareTo(FailedSink arg0) { return refresh.compareTo(arg0.refresh); } public Long getRefresh() { return refresh; } public Sink getSink() { return sink; } public Integer getPriority() { return priority; } public void incFails() { sequentialFailures++; adjustRefresh(); logger.debug("Sink {} failed again, new refresh is at {}, " + "current time {}", new Object[] { sink.getName(), refresh, System.currentTimeMillis()}); } private void adjustRefresh() { refresh = System.currentTimeMillis() + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); } } private static final Logger logger = LoggerFactory .getLogger(FailoverSinkProcessor.class); private static final String PRIORITY_PREFIX = "priority."; private static final String MAX_PENALTY_PREFIX = "maxpenalty"; private Map<String, Sink> sinks; private Sink activeSink; private SortedMap<Integer, Sink> liveSinks; private Queue<FailedSink> failedSinks; private int maxPenalty; @Override public void configure(Context context) { liveSinks = new TreeMap<Integer, Sink>(); failedSinks = new PriorityQueue<FailedSink>(); Integer nextPrio = 0; String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX); if(maxPenaltyStr == null) { maxPenalty = DEFAULT_MAX_PENALTY; } else { try { maxPenalty = Integer.parseInt(maxPenaltyStr); } catch (NumberFormatException e) { logger.warn("{} is not a valid value for {}", new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); maxPenalty = DEFAULT_MAX_PENALTY; } } for (Entry<String, Sink> entry : sinks.entrySet()) { String priStr = PRIORITY_PREFIX + entry.getKey(); Integer priority; try { priority = Integer.parseInt(context.getString(priStr)); } catch (Exception e) { priority = --nextPrio; } if(!liveSinks.containsKey(priority)) { liveSinks.put(priority, sinks.get(entry.getKey())); } else { logger.warn("Sink {} not added to FailverSinkProcessor as priority" + "duplicates that of sink {}", entry.getKey(), liveSinks.get(priority)); } } activeSink = liveSinks.get(liveSinks.lastKey()); } @Override public Status process() throws EventDeliveryException { // Retry any failed sinks that have gone through their "cooldown" period Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { FailedSink cur = failedSinks.poll(); Status s; try { s = cur.getSink().process(); if (s == Status.READY) { liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } } throw new EventDeliveryException("All sinks failed to process, " + "nothing left to failover to"); } private Sink moveActiveToDeadAndGetNext() { Integer key = liveSinks.lastKey(); failedSinks.add(new FailedSink(key, activeSink, 1)); liveSinks.remove(key); if(liveSinks.isEmpty()) return null; if(liveSinks.lastKey() != null) { return liveSinks.get(liveSinks.lastKey()); } else { return null; } } @Override public void setSinks(List<Sink> sinks) { // needed to implement the start/stop functionality super.setSinks(sinks); this.sinks = new HashMap<String, Sink>(); for (Sink sink : sinks) { this.sinks.put(sink.getName(), sink); } } }
该类中有个内部类FailSink,是对失败的sink的定义
private Long refresh; private Integer priority; private Sink sink; private Integer sequentialFailures;这是变量的定义
1、选定当前激活的sink的系统时间
2、sink优先级
3、具体sink
4、失败次数
public void incFails() { sequentialFailures++; adjustRefresh(); logger.debug("Sink {} failed again, new refresh is at {}, " + "current time {}", new Object[] { sink.getName(), refresh, System.currentTimeMillis()}); }该方法是sink失败时触发的方法。
private static final String PRIORITY_PREFIX = "priority."; private static final String MAX_PENALTY_PREFIX = "maxpenalty"; private Map<String, Sink> sinks; private Sink activeSink; private SortedMap<Integer, Sink> liveSinks; private Queue<FailedSink> failedSinks; private int maxPenalty;变量定义。
public void configure(Context context) { liveSinks = new TreeMap<Integer, Sink>(); failedSinks = new PriorityQueue<FailedSink>(); Integer nextPrio = 0; String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX); if(maxPenaltyStr == null) { maxPenalty = DEFAULT_MAX_PENALTY; } else { try { maxPenalty = Integer.parseInt(maxPenaltyStr); } catch (NumberFormatException e) { logger.warn("{} is not a valid value for {}", new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX }); maxPenalty = DEFAULT_MAX_PENALTY; } } for (Entry<String, Sink> entry : sinks.entrySet()) { String priStr = PRIORITY_PREFIX + entry.getKey(); Integer priority; try { priority = Integer.parseInt(context.getString(priStr)); } catch (Exception e) { priority = --nextPrio; } if(!liveSinks.containsKey(priority)) { liveSinks.put(priority, sinks.get(entry.getKey())); } else { logger.warn("Sink {} not added to FailverSinkProcessor as priority" + "duplicates that of sink {}", entry.getKey(), liveSinks.get(priority)); } } activeSink = liveSinks.get(liveSinks.lastKey()); }该方法主要是读取配置,并且初始化一些变量
1、liveSinks,failedSinks初始化为空的map和queue
2、读取maxpenalty
3、初始化sinks,这里通过setSinks方法初始化的,内部逻辑是读取conf配置文件的【具体过程可以查看源码AbstractConfigurationProvider.getConfiguration(),FlumeConfiguration.getConfigurationFor()一步步往后看就知道了】
4、初始化liveSinks赋值,将配置中的sink全部添加到liveSinks中
5、从liveSinks中选出最后一个sink作为当前激活状态的sink来处理数据输出
下面再来看具体的处理逻辑:
public Status process() throws EventDeliveryException { // Retry any failed sinks that have gone through their "cooldown" period Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { FailedSink cur = failedSinks.poll(); Status s; try { s = cur.getSink().process(); if (s == Status.READY) { liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } } throw new EventDeliveryException("All sinks failed to process, " + "nothing left to failover to"); }到目前为止failedSinks还是空的,所以优先执行后半部分代码
Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } }1、当前激活状态的sink不为空
2、调用当前sink进行处理
3、如果处理发生异常,则将当前的sink添加到failedSinks中,并从liveSinks中删除
private Sink moveActiveToDeadAndGetNext() { Integer key = liveSinks.lastKey(); failedSinks.add(new FailedSink(key, activeSink, 1)); liveSinks.remove(key); if(liveSinks.isEmpty()) return null; if(liveSinks.lastKey() != null) { return liveSinks.get(liveSinks.lastKey()); } else { return null; } }
4、返回一个可用的sink
如果出现了一次失败,再来看process中的前半部分代码的执行逻辑:
Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { FailedSink cur = failedSinks.poll(); Status s; try { s = cur.getSink().process(); if (s == Status.READY) { liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } }
前提条件:failedSinks不为空且队头的sink的激活时间小于当前时间
1、poll出队列的头个FailedSInk2、使用当前的sink处理,如果说处理成功了,则将该sink重新添加到liveSinks中,并将activeSinks赋值为当前sink
3、如果处理失败了,重新添加会failedSinks队列中
4、异常情况,则触发incFails(),同样重新添加会failedSinks队列中
以上这段逻辑是核心内容,也就是一个backoff的机制,如果说failedSinks队列中的sink可以继续处理了,我会回收使用,并不会去惩罚它
private void adjustRefresh() { refresh = System.currentTimeMillis() + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); }一个失败过的sink会不会被再次选中来处理,得看上面的条件refresh<now才行
【Flume】flume中FailoverSinkProcessor容错处理机制源码分析
原文地址:http://blog.csdn.net/simonchi/article/details/42520193