标签:
Todo:
对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;
在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出
编写KafkaSink
从$KAFKA_HOME/lib下复制
kafka_2.10-0.8.2.1.jar
kafka-clients-0.8.2.1.jar
scala-library-2.10.4.jar
到$FLUME_HOME/lib
在Eclipse新建工程,从$FLUME_HOME/lib下导入
commons-logging-1.1.1.jar
flume-ng-configuration-1.6.0.jar
flume-ng-core-1.6.0.jar
flume-ng-sdk-1.6.0.jar
zkclient-0.3.jar
kafka_2.10-0.8.2.1.jar
kafka-clients-0.8.2.1.jar
scala-library-2.10.4.jar
到工程。
新建文件KafkaSink.java
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class KafkaSink extends AbstractSink implements Configurable { private static final Log logger = LogFactory.getLog(KafkaSink.class); private String topic; private Producer<String, String> producer; public void configure(Context context) { topic = "flume_test"; Properties props = new Properties(); props.setProperty("metadata.broker.list", "localhost:9092"); props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); props.put("zookeeper.connect", "localhost:2181"); props.setProperty("num.partitions", "4"); // props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); logger.info("KafkaSink初始化完成."); } public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction tx = channel.getTransaction(); try { tx.begin(); Event e = channel.take(); if (e == null) { tx.rollback(); return Status.BACKOFF; } KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody())); producer.send(data); logger.info("flume向kafka发送消息:" + new String(e.getBody())); tx.commit(); return Status.READY; } catch (Exception e) { logger.error("Flume KafkaSinkException:", e); tx.rollback(); return Status.BACKOFF; } finally { tx.close(); } } }
导出jar包,放到$FLUME_HOME/lib下
(File->Export->Jar File 全部默认参数)
创建kafka.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = KafkaSink # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
测试
启动kafka
cd ~/app/kafka ./bin/zookeeper-server-start.sh ./config/zookeeper.properties> /dev/null & ./bin/kafka-server-start.sh ./config/server.properties > /dev/null &
创建topic
~/app/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic flume_test
启动控制台消费者
~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test --from-beginning
启动flume agent
flume-ng agent -c conf -f ~/test/kafka.conf --name a1 -Dflume.root.logger=INFO,console
发送消息
echo "hey manhua" |nc localhost 5140 echo "nice shot" |nc localhost 5140
flume和kafka结合的一个工具
https://github.com/kevinjmh/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins
标签:
原文地址:http://www.cnblogs.com/manhua/p/5000939.html