码迷,mamicode.com
首页 > Web开发 > 详细

Flume、Kafka结合

时间:2015-11-27 17:43:28      阅读:290      评论:0      收藏:0      [点我收藏+]

标签:

Todo:

对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;

在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出

 

编写KafkaSink

从$KAFKA_HOME/lib下复制

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到$FLUME_HOME/lib


在Eclipse新建工程,从$FLUME_HOME/lib下导入

commons-logging-1.1.1.jar

flume-ng-configuration-1.6.0.jar

flume-ng-core-1.6.0.jar

flume-ng-sdk-1.6.0.jar

zkclient-0.3.jar

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到工程。

 

新建文件KafkaSink.java

import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
 
 
public class KafkaSink extends AbstractSink implements Configurable {
    private static final Log logger = LogFactory.getLog(KafkaSink.class);
 
    private String topic;
    private Producer<String, String> producer;
 
    public void configure(Context context) {
        topic = "flume_test";
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", "localhost:9092");
        props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zookeeper.connect", "localhost:2181");
        props.setProperty("num.partitions", "4"); // 
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String, String>(config);
        logger.info("KafkaSink初始化完成.");
 
    }
 
    public Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction tx = channel.getTransaction();
        try {
            tx.begin();
            Event e = channel.take();
            if (e == null) {
                tx.rollback();
                return Status.BACKOFF;
            }
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
            producer.send(data);
            logger.info("flume向kafka发送消息:" + new String(e.getBody()));
            tx.commit();
            return Status.READY;
        } catch (Exception e) {
            logger.error("Flume KafkaSinkException:", e);
            tx.rollback();
            return Status.BACKOFF;
        } finally {
            tx.close();
        }
    }
}

导出jar包,放到$FLUME_HOME/lib下

(File->Export->Jar File 全部默认参数)

 

 

创建kafka.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
 
# Describe the sink
a1.sinks.k1.type = KafkaSink
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

 

测试

启动kafka

cd ~/app/kafka
./bin/zookeeper-server-start.sh ./config/zookeeper.properties> /dev/null &
./bin/kafka-server-start.sh ./config/server.properties > /dev/null &

 

创建topic

~/app/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4  --topic flume_test

 

启动控制台消费者

~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning

 

启动flume agent

flume-ng agent -c conf  -f ~/test/kafka.conf --name a1 -Dflume.root.logger=INFO,console

 

发送消息

echo "hey manhua" |nc localhost 5140
echo "nice shot" |nc localhost 5140

 

flume和kafka结合的一个工具

https://github.com/kevinjmh/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins

 

Flume、Kafka结合

标签:

原文地址:http://www.cnblogs.com/manhua/p/5000939.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!