欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量。
Kafka底层是基于Scala语言实现的,但是也提供了Java的API接口。
Java实现的消息生产者
package com.lucl.kafka.simple; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; /** * <p> Copyright: Copyright (c) 2015 </p> * * <p> Date : 2015-11-17 21:42:50 </p> * * <p> Description : JavaApi for kafka producer </p> * * @author luchunli * * @version 1.0 * */ public class SimpleKafkaProducer { private static final Logger logger = Logger.getLogger(SimpleKafkaProducer.class); /** * */ private void execMsgSend() { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.137.117:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props); logger.info("set config info(" + config + ") ok."); Producer<String, String> procuder = new Producer<>(config); String topic = "mytopic"; for (int i = 1; i <= 10; i++) { String value = "value_" + i; KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, value); procuder.send(msg); } logger.info("send message over."); procuder.close(); } /** * @param args */ public static void main(String[] args) { SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer(); simpleProducer.execMsgSend(); } }
此时通过控制台方式启动的消费者,可以看到生产者生产的数据被消费者消费了:
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-consumer.sh --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --topic mytopic --from-beginning hello world this is my first message value_1 value_2 value_3 value_4 value_5 value_6 value_7 value_8 value_9 value_10
Java实现的消息消费者
package com.lucl.kafka.simple; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.serializer.Decoder; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import org.apache.log4j.Logger; /** * <p> Copyright: Copyright (c) 2015 </p> * * <p> Date : 2015-11-17 21:42:50 </p> * * <p> Description : JavaApi for kafka consumer </p> * * @author luchunli * * @version 1.0 * */ public class SimpleKafkaConsumer { private static final Logger logger = Logger.getLogger(SimpleKafkaConsumer.class); /** * */ private void execMsgConsume() { Properties props = new Properties(); props.put("zookeeper.connect", "nnode:2181,dnode1:2181,dnode2:2181"); props.put("group.id", "group-1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("mytopic", 1); Decoder<String> keyDecoder = new StringDecoder(new VerifiableProperties()); Decoder<String> valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> createMessageStreams = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); for (Iterator<String> it = createMessageStreams.keySet().iterator(); it.hasNext(); ) { String key = it.next(); logger.info("The key of the createMessageStreams is " + key); List<KafkaStream<String, String>> values = createMessageStreams.get(key); for (KafkaStream<String, String> value : values) { ConsumerIterator<String, String> consumerIt = value.iterator(); while (consumerIt.hasNext()) { MessageAndMetadata<String, String> data = consumerIt.next(); logger.info("The message got by consuer is " + data.message()); } } } } /** * @param args */ public static void main(String[] args) { SimpleKafkaConsumer simpleConsumer = new SimpleKafkaConsumer(); simpleConsumer.execMsgConsume(); } }
启动Consumer程序,然后再启动Producer程序,此时Consumer端输出内容如下:
23:37:30,411 INFO SimpleKafkaConsumer:55 - The key of the createMessageStreams is mytopic 23:37:30,433 INFO VerifiableProperties:68 - Verifying properties 23:37:30,433 INFO VerifiableProperties:68 - Property client.id is overridden to group-1 23:37:30,433 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to nnode:9092 23:37:30,433 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000 23:37:30,451 INFO ClientUtils$:68 - Fetching metadata from broker id:117,host:nnode,port:9092 with correlation id 0 for 1 topic(s) Set(mytopic) 23:37:30,453 INFO SyncProducer:68 - Connected to nnode:9092 for producing 23:37:30,486 INFO SyncProducer:68 - Disconnecting from nnode:9092 23:37:30,528 INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-group-1_LuchunliPC-1447947448911-f949268d-0-117], Starting 23:37:30,546 INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1447947449115] Added fetcher for partitions ArrayBuffer([[mytopic,0], initOffset -1 to broker id:117,host:nnode,port:9092] ) 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_1 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_2 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_3 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_4 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_5 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_6 23:37:52,466 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_7 23:37:52,469 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_8 23:37:52,469 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_9 23:37:52,469 INFO SimpleKafkaConsumer:61 - The message got by consuer is value_10 23:39:11,351 INFO ClientCnxn:1096 - Client session timed out, have not heard from server in 4000ms for sessionid 0x3512026596f0001, closing socket connection and attempting reconnect 23:39:11,452 INFO ZkClient:449 - zookeeper state changed (Disconnected)
本文出自 “闷葫芦的世界” 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1714857
Kafka-2.11学习笔记(三)JavaApi访问kafka
原文地址:http://luchunli.blog.51cto.com/2368057/1714857