码迷,mamicode.com
首页 > Web开发 > 详细

flume-ng自定义插件sink

时间:2015-09-25 11:30:11      阅读:340      评论:0      收藏:0      [点我收藏+]

标签:

sink写入到rabbitmq的实例

package org.apache.flume;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class RabbitMQSink extends AbstractSink implements Configurable {
  private static final Logger log = LoggerFactory.getLogger(RabbitMQSink.class);
  private Channel Channel;
  private Connection Connection;
  private ConnectionFactory ConnectionFactory;
  private int connectionTimeout;
  private final CounterGroup CounterGroup;
  // private String ExchangeName;
  private String hostName;
  private String password;
  private String queueName;
  private String userName;
  private String virtualHost;
  // private Map<String, RandomAccessFile> randomAccessFiles = null;
  // private final List<String> fileNames = new ArrayList<String>();
  // private String monitorPath;
  public RabbitMQSink() {
    CounterGroup = new CounterGroup();
  }
  // private Map<String, RandomAccessFile> getRandomAccessFiles() {
  // Map<String, RandomAccessFile> randomAccessFiles = new HashMap<String, RandomAccessFile>();
  // for (int i = 0; i < fileNames.size(); i++) {
  // RandomAccessFile rf;
  // try {
  // rf = new RandomAccessFile(monitorPath + fileNames.get(i) + "_lastFileSize.txt", "rw");
  // randomAccessFiles.put(fileNames.get(i), rf);
  // } catch (FileNotFoundException e) {
  // log.error("failed to initialize RandomAccessFile!" + e);
  // }
  // }
  // return randomAccessFiles;
  // }
  public void close(Connection Connection, Channel Channel) {
    try {
      if (Connection != null) {
        Connection.close();
      }
      if (Channel != null) {
        Channel.close();
      }
    } catch (Exception e) {
      log.error("Exception thrown while closing connection", e);
    }
  }
  @Override
  public void configure(Context context) {
    // this.ExchangeName =
    // context.getString(RabbitMQConstants.CONFIG_EXCHANGENAME,
    // RabbitMQConstants.DEFAULT_EXCHANGENAME);
    // this.monitorPath = context.getString(RabbitMQConstants.CONFIG_MONITORPATH);
    // this.fileNames =
    // Arrays.asList(context.getString(RabbitMQConstants.CONFIG_FILENAMES).split(","));
    this.hostName =
        context.getString(RabbitMQConstants.CONFIG_HOSTNAME, RabbitMQConstants.DEFAULT_HOSTNAME);
    this.virtualHost =
        context.getString(RabbitMQConstants.CONFIG_VIRTUALHOST,
            RabbitMQConstants.DEFAULT_VIRTUALHOST);
    this.userName =
        context.getString(RabbitMQConstants.CONFIG_USERNAME, RabbitMQConstants.DEFAULT_USERNAME);
    this.password =
        context.getString(RabbitMQConstants.CONFIG_PASSWORD, RabbitMQConstants.DEFAULT_PASSWORD);
    this.queueName =
        context.getString(RabbitMQConstants.CONFIG_QUEUENAME, RabbitMQConstants.DEFAULT_QUEUENAME);
    this.connectionTimeout =
        context.getInteger(RabbitMQConstants.CONFIG_CONNECTIONTIMEOUT,
            RabbitMQConstants.DEFAULT_CONNECTIONTIMEOUT);
    ConnectionFactory = new ConnectionFactory();
    ConnectionFactory.setHost(hostName);
    ConnectionFactory.setVirtualHost(virtualHost);
    ConnectionFactory.setUsername(userName);
    ConnectionFactory.setPassword(password);
    ConnectionFactory.setConnectionTimeout(connectionTimeout);
    ConnectionFactory.setPort(5672);
    // randomAccessFiles = getRandomAccessFiles();
  }
  @Override
  public Status process() throws EventDeliveryException {
    if (null == Connection) {
      try {
        if (log.isInfoEnabled()) {
          log.info(this.getName() + " - Opening connection to " + ConnectionFactory.getHost() + ":"
              + ConnectionFactory.getPort());
        }
        Connection = ConnectionFactory.newConnection();
        CounterGroup.incrementAndGet(RabbitMQConstants.COUNTER_NEW_CONNECTION);
        Channel = null;
      } catch (Exception ex) {
        if (log.isErrorEnabled()) {
          log.error(this.getName() + " - Exception while establishing connection.", ex);
        }
        resetConnection();
        return Status.BACKOFF;
      }
    }
    if (null == Channel) {
      try {
        if (log.isInfoEnabled()) {
          log.info(this.getName() + " - creating channel...");
        }
        Channel = Connection.createChannel();
        CounterGroup.incrementAndGet(RabbitMQConstants.COUNTER_NEW_CHANNEL);
        if (log.isInfoEnabled()) {
          log.info(this.getName() + " - Connected to " + ConnectionFactory.getHost() + ":"
              + ConnectionFactory.getPort());
        }
      } catch (Exception ex) {
        if (log.isErrorEnabled()) {
          log.error(this.getName() + " - Exception while creating channel.", ex);
        }
        resetConnection();
        return Status.BACKOFF;
      }
    }
    Transaction tx = getChannel().getTransaction();
    try {
      tx.begin();
      Event e = getChannel().take();
      if (e == null) {
        tx.rollback();
        return Status.BACKOFF;
      }
      try {
        String data = new String(e.getBody());
        // String[] datas = data.split("#");
        // String routingKey = null;
        // // 判断如果是以heartbeat开头的就是心跳信息
        // if (datas[0].equals("heartbeat")) {
        // routingKey = "heartbeat";
        // data = datas[1];
        // } else {
        // routingKey = "log";
        // String[] logInfo = data.split(",");
        // data = logInfo[0];
        // String filePrefix = logInfo[1];
        // String dateString = logInfo[2];
        // String rowNum = logInfo[3];
        // // writeCurrentRownumAnyLogFiles(filePrefix, dateString, rowNum);
        // }
        // Channel.basicPublish(ExchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
        // data.getBytes());
        Channel.queueDeclare(queueName, true, false, false, null);
        Channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,
            data.getBytes());
        tx.commit();
        CounterGroup.incrementAndGet(RabbitMQConstants.COUNTER_PUBLISH);
      } catch (Exception ex) {
        resetConnection();
        throw ex;
      }
      return Status.READY;
    } catch (Exception ex) {
      tx.rollback();
      if (log.isErrorEnabled()) {
        log.error(this.getName() + " - Exception while publishing...", ex);
      }
      return Status.BACKOFF;
    } finally {
      tx.close();
    }
  }
  private void resetConnection() {
    CounterGroup.incrementAndGet(RabbitMQConstants.COUNTER_EXCEPTION);
    if (log.isWarnEnabled()) {
      log.warn(this.getName() + " - Closing RabbitMQ connection and channel due to exception.");
    }
    close(Connection, Channel);
    Connection = null;
    Channel = null;
  }
  @Override
  public synchronized void stop() {
    close(Connection, Channel);
    log.warn("the method stop is called" + this.getName()
        + " - Closing RabbitMQ connection and channel due to exception.");
    super.stop();
  }
  // public void writeCurrentRownumAnyLogFiles(String logFileName, String date, String row) {
  // RandomAccessFile raf = randomAccessFiles.get(logFileName);
  // try {
  // raf.seek(0);
  // // 此处写入的时候加10个空格,为了避免当写入的行数位数小于已有的行数的位数时候不能覆盖问题
  // raf.write((date + "," + row + "          ").getBytes());
  // } catch (IOException e) {
  // log.error("record rowNum is error" + e);
  // }
  // }
}

主要是extends AbstractSink implements Configurable,实现Configurable即可从配置文件取的配置参数,继承AbstractSink即可从channel中取得event。

参考:Flume(ng) 自定义sink实现和属性注入

flume-ng自定义插件sink

标签:

原文地址:http://my.oschina.net/cjun/blog/511020

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!