码迷,mamicode.com
首页 > 编程语言 > 详细

Kafka-2.11学习笔记(三)JavaApi访问kafka

时间:2015-11-20 00:18:40      阅读:382      评论:0      收藏:0      [点我收藏+]

标签:java api   kafka   

欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量。



Kafka底层是基于Scala语言实现的,但是也提供了Java的API接口。

Java实现的消息生产者

package com.lucl.kafka.simple;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;

/**
 * <p> Copyright: Copyright (c) 2015 </p>
 * 
 * <p> Date : 2015-11-17 21:42:50 </p>
 * 
 * <p> Description : JavaApi for kafka producer </p>
 *
 * @author luchunli
 * 
 * @version 1.0
 *
 */
public class SimpleKafkaProducer {
    private static final Logger logger = Logger.getLogger(SimpleKafkaProducer.class);
    /**
     * 
     */
    private void execMsgSend() {
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.137.117:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "0");
        
        ProducerConfig config = new ProducerConfig(props); 
        
        logger.info("set config info(" + config + ") ok.");
        
        Producer<String, String> procuder = new Producer<>(config);
        
        String topic = "mytopic";
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, value);
            procuder.send(msg);
        }
        logger.info("send message over.");
            
        procuder.close();
    }
    
    /**
     * @param args
     */
    public static void main(String[] args) {
        SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();
        simpleProducer.execMsgSend();
    }

}

此时通过控制台方式启动的消费者,可以看到生产者生产的数据被消费者消费了:

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-console-consumer.sh --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --topic mytopic --from-beginning
hello world
this is my first message
value_1
value_2
value_3
value_4
value_5
value_6
value_7
value_8
value_9
value_10


Java实现的消息消费者

package com.lucl.kafka.simple;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import org.apache.log4j.Logger;

/**
 * <p> Copyright: Copyright (c) 2015 </p>
 * 
 * <p> Date : 2015-11-17 21:42:50 </p>
 * 
 * <p> Description : JavaApi for kafka consumer </p>
 *
 * @author luchunli
 * 
 * @version 1.0
 *
 */
public class SimpleKafkaConsumer {
    private static final Logger logger = Logger.getLogger(SimpleKafkaConsumer.class);
    
    /**
     * 
     */
    private void execMsgConsume() {
        Properties props = new Properties();
        props.put("zookeeper.connect", "nnode:2181,dnode1:2181,dnode2:2181");
        props.put("group.id", "group-1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
        
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("mytopic", 1);
        Decoder<String> keyDecoder = new StringDecoder(new VerifiableProperties());
        Decoder<String> valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream<String, String>>> createMessageStreams = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
        for (Iterator<String> it = createMessageStreams.keySet().iterator(); it.hasNext(); ) {
            String key = it.next();
            logger.info("The key of the createMessageStreams is " + key);
            List<KafkaStream<String, String>> values = createMessageStreams.get(key);
            for (KafkaStream<String, String> value : values) {
                 ConsumerIterator<String, String> consumerIt = value.iterator();
                 while (consumerIt.hasNext()) {
                     MessageAndMetadata<String, String> data = consumerIt.next();
                     logger.info("The message got by consuer is " + data.message());
                 }
            }
        }
        
    }
    
    /**
     * @param args
     */
    public static void main(String[] args) {
        SimpleKafkaConsumer simpleConsumer = new SimpleKafkaConsumer();
        simpleConsumer.execMsgConsume();
    }

}

启动Consumer程序,然后再启动Producer程序,此时Consumer端输出内容如下:

23:37:30,411  INFO SimpleKafkaConsumer:55 - The key of the createMessageStreams is mytopic
23:37:30,433  INFO VerifiableProperties:68 - Verifying properties
23:37:30,433  INFO VerifiableProperties:68 - Property client.id is overridden to group-1
23:37:30,433  INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to nnode:9092
23:37:30,433  INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000
23:37:30,451  INFO ClientUtils$:68 - Fetching metadata from broker id:117,host:nnode,port:9092 with correlation id 0 for 1 topic(s) Set(mytopic)
23:37:30,453  INFO SyncProducer:68 - Connected to nnode:9092 for producing
23:37:30,486  INFO SyncProducer:68 - Disconnecting from nnode:9092
23:37:30,528  INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-group-1_LuchunliPC-1447947448911-f949268d-0-117], Starting 
23:37:30,546  INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1447947449115] Added fetcher for partitions ArrayBuffer([[mytopic,0], initOffset -1 to broker id:117,host:nnode,port:9092] )
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_1
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_2
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_3
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_4
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_5
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_6
23:37:52,466  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_7
23:37:52,469  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_8
23:37:52,469  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_9
23:37:52,469  INFO SimpleKafkaConsumer:61 - The message got by consuer is value_10
23:39:11,351  INFO ClientCnxn:1096 - Client session timed out, have not heard from server in 4000ms for sessionid 0x3512026596f0001, closing socket connection and attempting reconnect
23:39:11,452  INFO ZkClient:449 - zookeeper state changed (Disconnected)


本文出自 “闷葫芦的世界” 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1714857

Kafka-2.11学习笔记(三)JavaApi访问kafka

标签:java api   kafka   

原文地址:http://luchunli.blog.51cto.com/2368057/1714857

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!