码迷,mamicode.com
首页 > 编程语言 > 详细

使用java创建kafka的生产者和消费者

时间:2015-05-24 17:20:41      阅读:3423      评论:0      收藏:0      [点我收藏+]

标签:

    创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
        [root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
    查看Kafka的主题详情
        [root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
    查看Kafka所有的主题    
        [root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
    添加如下依赖jar
        kafka_2.10-0.8.2.0.jar
        kafka-clients-0.8.2.0.jar
        metrics-core-2.2.0.jar
        scala-library-2.10.4.jar
        zkclient-0.3.jar
        zookeeper-3.4.6.jar
    
    1、生产者
        package storm.test.kafka;

        import java.util.Properties;

        import kafka.javaapi.producer.Producer;
        import kafka.producer.KeyedMessage;
        import kafka.producer.ProducerConfig;
        import kafka.serializer.StringEncoder;

        public class TestProducer {

            public static void main(String[] args) throws Exception {
                Properties prop = new Properties();
                prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
                prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
                prop.put("serializer.class", StringEncoder.class.getName());
                Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));
                int i = 0;
                while(true){
                    producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));
                    Thread.sleep(1000);
                }
            }

        }
    2、消费者
        package storm.test.kafka;

        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        import java.util.Properties;

        import kafka.consumer.Consumer;
        import kafka.consumer.ConsumerConfig;
        import kafka.consumer.ConsumerIterator;
        import kafka.consumer.KafkaStream;
        import kafka.javaapi.consumer.ConsumerConnector;
        import kafka.serializer.StringEncoder;

        public class TestConsumer {

            static final String topic = "test111";
            
            public static void main(String[] args) {
                Properties prop = new Properties();
                prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
                prop.put("serializer.class", StringEncoder.class.getName());
                prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
                prop.put("group.id", "group1");
                ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(topic, 1);
                Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
                final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
                ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
                while (iterator.hasNext()) {
                    String msg = new String(iterator.next().message());
                    System.out.println("收到消息:"+msg);
                }
            }

        }

使用java创建kafka的生产者和消费者

标签:

原文地址:http://www.cnblogs.com/mengyao/p/4526075.html

(2)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!