本文介绍flume读取kafka数据的方法
代码:
/*******************************************************************************
?* Licensed to the Apache Software Foundation (ASF) under one
?* or more contributor license agreements.? See the NOTICE file
?* distributed with this work for additional information
?* regarding copyright ownership.? The ASF licenses this file
?* to you under the Apache License, Version 2.0 (the
?* "License"); you may not use this file except in compliance
?* with the License.? You may obtain a copy of the License at
?* ?
?* http://www.apache.org/licenses/LICENSE-2.0
?* ?
?* Unless required by applicable law or agreed to in writing,
?* software distributed under the License is distributed on an
?* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
?* KIND, either express or implied.? See the License for the
?* specific language governing permissions and limitations
?* under the License.
?*******************************************************************************/
package org.apache.flume.source.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.SyslogParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
?* A Source for Kafka which reads messages from kafka. I use this in company production environment
?* and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>
?* <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
?* <tt>topic: </tt> the topic to read from kafka.<p>
?* <tt>group.id: </tt> the groupid of consumer group.<p>
?*/
public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
?? ?private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
?? ?private ConsumerConnector consumer;
?? ?private ConsumerIterator<byte[], byte[]> it;
?? ?private String topic;
?? ?
?? ?public Status process() throws EventDeliveryException {
?? ??? ?List<Event> eventList = new ArrayList<Event>();
??????? MessageAndMetadata<byte[],byte[]> message;
?? ??? ?Event event;
?? ??? ?Map<String, String> headers;
??????? String strMessage;
??????? try {
?? ??? ??? ?if(it.hasNext()) {
?? ??? ??? ??? ?message = it.next();
?? ??? ??? ??? ?event = new SimpleEvent();
?? ??? ??? ??? ?headers = new HashMap<String, String>();
?? ??? ??? ??? ?headers.put("timestamp", String.valueOf(System.currentTimeMillis()));
??????????????? strMessage =? String.valueOf(System.currentTimeMillis()) + "|" + new String(message.message());
??????????????? log.debug("Message: {}", strMessage);
??????????????? event.setBody(strMessage.getBytes());
??????????????? //event.setBody(message.message());
?? ??? ??? ??? ?event.setHeaders(headers);
?? ??? ??? ??? ?eventList.add(event);
?? ??? ??? ?}
?? ??? ??? ?getChannelProcessor().processEventBatch(eventList);
?? ??? ??? ?return Status.READY;
?? ??? ?} catch (Exception e) {
?? ??? ??? ?log.error("KafkaSource EXCEPTION, {}", e.getMessage());
?? ??? ??? ?return Status.BACKOFF;
?? ??? ?}
?? ?}
?? ?public void configure(Context context) {
?? ??? ?topic = context.getString("topic");
?? ??? ?if(topic == null) {
?? ??? ??? ?throw new ConfigurationException("Kafka topic must be specified.");
?? ??? ?}
?? ??? ?try {
?? ??? ??? ?this.consumer = KafkaSourceUtil.getConsumer(context);
?? ??? ?} catch (IOException e) {
?? ??? ??? ?log.error("IOException occur, {}", e.getMessage());
?? ??? ?} catch (InterruptedException e) {
?? ??? ??? ?log.error("InterruptedException occur, {}", e.getMessage());
?? ??? ?}
?? ??? ?Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
?? ??? ?topicCountMap.put(topic, new Integer(1));
?? ??? ?Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
?? ??? ?if(consumerMap == null) {
?? ??? ??? ?throw new ConfigurationException("topicCountMap is null");
?? ??? ?}
?? ??? ?List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
?? ??? ?if(topicList == null || topicList.isEmpty()) {
?? ??? ??? ?throw new ConfigurationException("topicList is null or empty");
?? ??? ?}
?? ???? KafkaStream<byte[], byte[]> stream =? topicList.get(0);
?? ???? it = stream.iterator();
?? ?}
?? ?@Override
?? ?public synchronized void stop() {
?? ??? ?consumer.shutdown();
?? ??? ?super.stop();
?? ?}
}
/*******************************************************************************
?* Licensed to the Apache Software Foundation (ASF) under one
?* or more contributor license agreements.? See the NOTICE file
?* distributed with this work for additional information
?* regarding copyright ownership.? The ASF licenses this file
?* to you under the Apache License, Version 2.0 (the
?* "License"); you may not use this file except in compliance
?* with the License.? You may obtain a copy of the License at
?* ?
?* http://www.apache.org/licenses/LICENSE-2.0
?* ?
?* Unless required by applicable law or agreed to in writing,
?* software distributed under the License is distributed on an
?* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
?* KIND, either express or implied.? See the License for the
?* specific language governing permissions and limitations
?* under the License.
?*******************************************************************************/
package org.apache.flume.source.kafka;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaSourceUtil {
?? ?private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);
?? ?public static Properties getKafkaConfigProperties(Context context) {
?? ??? ?log.info("context={}",context.toString());
?? ??? ?Properties props = new Properties();
??????? ImmutableMap<String, String> contextMap = context.getParameters();
??????? for (Map.Entry<String,String> entry : contextMap.entrySet()) {
??????????? String key = entry.getKey();
??????????? if (!key.equals("type") && !key.equals("channel")) {
??????????????? props.setProperty(entry.getKey(), entry.getValue());
??????????????? log.info("key={},value={}", entry.getKey(), entry.getValue());
??????????? }
??????? }
?? ??? ?return props;
?? ?}
?? ?public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {
?? ??? ?ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
?? ??? ?ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
?? ??? ?return consumer;
?? ?}
}
配置文件:( /etc/flume/conf/flume-kafka-file.properties)
agent_log.sources = kafka0
agent_log.channels = ch0
agent_log.sinks = sink0
agent_log.sources.kafka0.channels = ch0
agent_log.sinks.sink0.channel = ch0
agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181
agent_log.sources.kafka0.topic = kkt-test-topic
agent_log.sources.kafka0.group.id= test
agent_log.channels.ch0.type = memory
agent_log.channels.ch0.capacity = 2048
agent_log.channels.ch0.transactionCapacity = 1000
agent_log.sinks.sink0.type=file_roll
agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test
agent_log.sinks.sink0.sink.rollInterval=300
启动脚本:
sudo su? -l -s /bin/bash? flume? -c ‘/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console ‘
注意: 红色字体的功能是对原来数据增加时间戳
??????????? 版本号 flume-1.4.0.2.1.1.0 + kafka2.8.0-0.8.0
??????????? 參考资料:https://github.com/baniuyao/flume-kafka
???????????? 编译用到的库:
??????????? flume-ng-configuration-1.4.0.2.1.1.0-385
??????????? flume-ng-core-1.4.0.2.1.1.0-385
????????? ? flume-ng-sdk-1.4.0.2.1.1.0-385
??????????? flume-tools-1.4.0.2.1.1.0-385
??????????? guava-11.0.2
??????????? kafka_2.8.0-0.8.0
??????????? log4j-1.2.15
??????????? scala-compiler
??????????? scala-library
??????????? slf4j-api-1.6.1
??????????? slf4j-log4j12-1.6.1
??????????? zkclient-0.3
??????????? zookeeper-3.3.4
????????????????