| 
 @InterfaceAudience.Public 
@InterfaceStability.Stable 
public interface Source extends LifecycleAware, NamedComponent { 
  /** 
   * Specifies which channel processor will handle this source‘s events. 
   * 
   * @param channelProcessor 
   */ 
  public void setChannelProcessor(ChannelProcessor channelProcessor); 
  /** 
   * Returns the channel processor that will handle this source‘s events. 
   */ 
  public ChannelProcessor getChannelProcessor(); 
} 
 | 
| 
 public enum LifecycleState { 
  IDLE, START, STOP, ERROR; 
  public static final LifecycleState[] START_OR_ERROR = new LifecycleState[] { 
      START, ERROR }; 
  public static final LifecycleState[] STOP_OR_ERROR = new LifecycleState[] { 
      STOP, ERROR }; 
} 
 | 
| 
 public void start() { 
    logger.info("Exec source starting with command:{}", command); 
    executor = Executors.newSingleThreadExecutor(); 
    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, 
        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); 
    // FIXME: Use a callback-like executor / future to signal us upon failure. 
    runnerFuture = executor.submit(runner); 
    /* 
     * NB: This comes at the end rather than the beginning of the method because 
     * it sets our state to running. We want to make sure the executor is alive 
     * and well first. 
     */ 
    sourceCounter.start(); 
    super.start(); 
    logger.debug("Exec source started"); 
  } 
 | 
| 
 @InterfaceAudience.Public 
@InterfaceStability.Stable 
public interface Channel extends LifecycleAware, NamedComponent { 
  /** 
   * <p>Puts the given event into the channel.</p> 
   * <p><strong>Note</strong>: This method must be invoked within an active 
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable 
   * results.</p> 
   * @param event the event to transport. 
   * @throws ChannelException in case this operation fails. 
   * @see org.apache.flume.Transaction#begin() 
   */ 
  public void put(Event event) throws ChannelException; 
  /** 
   * <p>Returns the next event from the channel if available. If the channel 
   * does not have any events available, this method must return {@code null}. 
   * </p> 
   * <p><strong>Note</strong>: This method must be invoked within an active 
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable 
   * results.</p> 
   * @return the next available event or {@code null} if no events are 
   * available. 
   * @throws ChannelException in case this operation fails. 
   * @see org.apache.flume.Transaction#begin() 
   */ 
  public Event take() throws ChannelException; 
  /** 
   * @return the transaction instance associated with this channel. 
   */ 
  public Transaction getTransaction(); 
} 
 | 
| 
 public interface Transaction { 
public enum TransactionState {Started, Committed, RolledBack, Closed }; 
  /** 
   * <p>Starts a transaction boundary for the current channel operation. If a 
   * transaction is already in progress, this method will join that transaction 
   * using reference counting.</p> 
   * <p><strong>Note</strong>: For every invocation of this method there must 
   * be a corresponding invocation of {@linkplain #close()} method. Failure 
   * to ensure this can lead to dangling transactions and unpredictable results. 
   * </p> 
   */ 
  public void begin(); 
  /** 
   * Indicates that the transaction can be successfully committed. It is 
   * required that a transaction be in progress when this method is invoked. 
   */ 
  public void commit(); 
  /** 
   * Indicates that the transaction can must be aborted. It is 
   * required that a transaction be in progress when this method is invoked. 
   */ 
  public void rollback(); 
  /** 
   * <p>Ends a transaction boundary for the current channel operation. If a 
   * transaction is already in progress, this method will join that transaction 
   * using reference counting. The transaction is completed only if there 
   * are no more references left for this transaction.</p> 
   * <p><strong>Note</strong>: For every invocation of this method there must 
   * be a corresponding invocation of {@linkplain #begin()} method. Failure 
   * to ensure this can lead to dangling transactions and unpredictable results. 
   * </p> 
   */ 
  public void close(); 
} 
 | 
| 
  protected void doBegin() throws InterruptedException {} 
  protected abstract void doPut(Event event) throws InterruptedException; 
  protected abstract Event doTake() throws InterruptedException; 
  protected abstract void doCommit() throws InterruptedException; 
  protected abstract void doRollback() throws InterruptedException; 
  protected void doClose() {} 
 | 
| 
  private final LinkedBlockingDeque<FlumeEventPointer> takeList; 
    private final LinkedBlockingDeque<FlumeEventPointer> putList; 
 | 
| 
 protected void doPut(Event event) throws InterruptedException { 
      channelCounter.incrementEventPutAttemptCount(); 
      if(putList.remainingCapacity() == 0) { 
        throw new ChannelException("Put queue for FileBackedTransaction " + 
            "of capacity " + putList.size() + " full, consider " + 
            "committing more frequently, increasing capacity or " + 
            "increasing thread count. " + channelNameDescriptor); 
      } 
      // this does not need to be in the critical section as it does not 
      // modify the structure of the log or queue. 
      if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { 
        throw new ChannelFullException("The channel has reached it‘s capacity. " 
            + "This might be the result of a sink on the channel having too " 
            + "low of batch size, a downstream system running slower than " 
            + "normal, or that the channel capacity is just too low. " 
            + channelNameDescriptor); 
      } 
      boolean success = false; 
      log.lockShared(); 
      try { 
        FlumeEventPointer ptr = log.put(transactionID, event); 
        Preconditions.checkState(putList.offer(ptr), "putList offer failed " 
          + channelNameDescriptor); 
        queue.addWithoutCommit(ptr, transactionID); 
        success = true; 
      } catch (IOException e) { 
        throw new ChannelException("Put failed due to IO error " 
                + channelNameDescriptor, e); 
      } finally { 
        log.unlockShared(); 
        if(!success) { 
          // release slot obtained in the case 
          // the put fails for any reason 
          queueRemaining.release(); 
        } 
      } 
    } 
 | 
| 
  protected Event doTake() throws InterruptedException { 
      channelCounter.incrementEventTakeAttemptCount(); 
      if(takeList.remainingCapacity() == 0) { 
        throw new ChannelException("Take list for FileBackedTransaction, capacity " + 
            takeList.size() + " full, consider committing more frequently, " + 
            "increasing capacity, or increasing thread count. " 
               + channelNameDescriptor); 
      } 
      log.lockShared(); 
      /* 
       * 1. Take an event which is in the queue. 
       * 2. If getting that event does not throw NoopRecordException, 
       * then return it. 
       * 3. Else try to retrieve the next event from the queue 
       * 4. Repeat 2 and 3 until queue is empty or an event is returned. 
       */ 
      try { 
        while (true) { 
          FlumeEventPointer ptr = queue.removeHead(transactionID); 
          if (ptr == null) { 
            return null; 
          } else { 
            try { 
              // first add to takeList so that if write to disk 
              // fails rollback actually does it‘s work 
              Preconditions.checkState(takeList.offer(ptr), 
                "takeList offer failed " 
                  + channelNameDescriptor); 
              log.take(transactionID, ptr); // write take to disk 
              Event event = log.get(ptr); 
              return event; 
            } catch (IOException e) { 
              throw new ChannelException("Take failed due to IO error " 
                + channelNameDescriptor, e); 
            } catch (NoopRecordException e) { 
              LOG.warn("Corrupt record replaced by File Channel Integrity " + 
                "tool found. Will retrieve next event", e); 
              takeList.remove(ptr); 
            } catch (CorruptEventException ex) { 
              if (fsyncPerTransaction) { 
                throw new ChannelException(ex); 
              } 
              LOG.warn("Corrupt record found. Event will be " + 
                "skipped, and next event will be read.", ex); 
              takeList.remove(ptr); 
            } 
          } 
        } 
      } finally { 
        log.unlockShared(); 
      } 
    } 
 | 
| 
 protected void doCommit() throws InterruptedException { 
      int puts = putList.size(); 
      int takes = takeList.size(); 
      if(puts > 0) { 
        Preconditions.checkState(takes == 0, "nonzero puts and takes " 
                + channelNameDescriptor); 
        log.lockShared(); 
        try { 
          log.commitPut(transactionID); 
          channelCounter.addToEventPutSuccessCount(puts); 
          synchronized (queue) { 
            while(!putList.isEmpty()) { 
              if(!queue.addTail(putList.removeFirst())) { 
                StringBuilder msg = new StringBuilder(); 
                msg.append("Queue add failed, this shouldn‘t be able to "); 
                msg.append("happen. A portion of the transaction has been "); 
                msg.append("added to the queue but the remaining portion "); 
                msg.append("cannot be added. Those messages will be consumed "); 
                msg.append("despite this transaction failing. Please report."); 
                msg.append(channelNameDescriptor); 
                LOG.error(msg.toString()); 
                Preconditions.checkState(false, msg.toString()); 
              } 
            } 
            queue.completeTransaction(transactionID); 
          } 
        } catch (IOException e) { 
          throw new ChannelException("Commit failed due to IO error " 
                  + channelNameDescriptor, e); 
        } finally { 
          log.unlockShared(); 
        } 
      } else if (takes > 0) { 
        log.lockShared(); 
        try { 
          log.commitTake(transactionID); 
          queue.completeTransaction(transactionID); 
          channelCounter.addToEventTakeSuccessCount(takes); 
        } catch (IOException e) { 
          throw new ChannelException("Commit failed due to IO error " 
              + channelNameDescriptor, e); 
        } finally { 
          log.unlockShared(); 
        } 
        queueRemaining.release(takes); 
      } 
      putList.clear(); 
      takeList.clear(); 
      channelCounter.setChannelSize(queue.getSize()); 
    } 
 | 
| 
   protected void doRollback() throws InterruptedException { 
      int puts = putList.size(); 
      int takes = takeList.size(); 
      log.lockShared(); 
      try { 
        if(takes > 0) { 
          Preconditions.checkState(puts == 0, "nonzero puts and takes " 
              + channelNameDescriptor); 
          synchronized (queue) { 
            while (!takeList.isEmpty()) { 
              Preconditions.checkState(queue.addHead(takeList.removeLast()), 
                  "Queue add failed, this shouldn‘t be able to happen " 
                      + channelNameDescriptor); 
            } 
          } 
        } 
        putList.clear(); 
        takeList.clear(); 
        queue.completeTransaction(transactionID); 
        channelCounter.setChannelSize(queue.getSize()); 
        log.rollback(transactionID); 
      } catch (IOException e) { 
        throw new ChannelException("Commit failed due to IO error " 
            + channelNameDescriptor, e); 
      } finally { 
        log.unlockShared(); 
        // since rollback is being called, puts will never make it on 
        // to the queue and we need to be sure to release the resources 
        queueRemaining.release(puts); 
      } 
    } 
 | 
| 
 @InterfaceAudience.Public 
@InterfaceStability.Stable 
public interface Sink extends LifecycleAware, NamedComponent { 
  /** 
   * <p>Sets the channel the sink will consume from</p> 
   * @param channel The channel to be polled 
   */ 
  public void setChannel(Channel channel); 
  /** 
   * @return the channel associated with this sink 
   */ 
  public Channel getChannel(); 
  /** 
   * <p>Requests the sink to attempt to consume data from attached channel</p> 
   * <p><strong>Note</strong>: This method should be consuming from the channel 
   * within the bounds of a Transaction. On successful delivery, the transaction 
   * should be committed, and on failure it should be rolled back. 
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if 
   * no data could be retrieved from the channel feeding this sink 
   * @throws EventDeliveryException In case of any kind of failure to 
   * deliver data to the next hop destination. 
   */ 
  public Status process() throws EventDeliveryException; 
  public static enum Status { 
    READY, BACKOFF 
  } 
} 
 | 
| 
 public Status process() throws EventDeliveryException { 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 
    List<BucketWriter> writers = Lists.newArrayList(); 
    transaction.begin(); 
     ………………………… 
     transaction.commit(); 
      if (txnEventCount < 1) { 
        return Status.BACKOFF; 
      } else { 
        sinkCounter.addToEventDrainSuccessCount(txnEventCount); 
        return Status.READY; 
      } 
    } catch (IOException eIO) { 
      transaction.rollback(); 
      LOG.warn("HDFS IO error", eIO); 
      return Status.BACKOFF; 
    } catch (Throwable th) { 
      transaction.rollback(); 
      LOG.error("process failed", th); 
      if (th instanceof Error) { 
        throw (Error) th; 
      } else { 
        throw new EventDeliveryException(th); 
      } 
    } finally { 
      transaction.close(); 
    } 
  } 
 | 
【Flume】【*】深入flume-ng的三大组件——source,channel,sink
原文地址:http://blog.csdn.net/simonchi/article/details/43308677