码迷,mamicode.com
首页 > 编程语言 > 详细

kafka java实例

时间:2016-02-19 19:04:14      阅读:191      评论:0      收藏:0      [点我收藏+]

标签:

生产者

 1 package com;
 2 import java.util.Properties;  
 3 import java.util.concurrent.TimeUnit;  
 4   
 5 import kafka.javaapi.producer.Producer;  
 6 import kafka.producer.KeyedMessage;  
 7 import kafka.producer.ProducerConfig;  
 8 import kafka.serializer.StringEncoder;  
 9   
10   
11   
12   
13 public class kafkaProducer extends Thread{  
14   
15     private String topic;  
16       
17     public kafkaProducer(String topic){  
18         super();  
19         this.topic = topic;  
20     }  
21       
22       
23     @Override  
24     public void run() {  
25         Producer producer = createProducer();  
26         int i=0;  
27         while(true){  
28             producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++)); 
29             System.out.println("message: " + i);
30             try {  
31                 TimeUnit.SECONDS.sleep(1);  
32             } catch (InterruptedException e) {  
33                 e.printStackTrace();  
34             }  
35         }  
36     }  
37   
38     private Producer createProducer() {  
39         Properties properties = new Properties();  
40         properties.put("zookeeper.connect", "192.168.130.130:2181");//声明zk  
41         properties.put("serializer.class", StringEncoder.class.getName());  
42         properties.put("metadata.broker.list", "192.168.130.130:9092");// 声明kafka broker  
43         return new Producer<Integer, String>(new ProducerConfig(properties));  
44      }  
45       
46       
47     public static void main(String[] args) {  
48         new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test 
49        // System.out.println("111");
50           
51     }  
52        
53 }  
54   

 

消费者

 1 package com;
 2 import java.util.HashMap;  
 3 import java.util.List;  
 4 import java.util.Map;  
 5 import java.util.Properties;  
 6   
 7 import kafka.consumer.Consumer;  
 8 import kafka.consumer.ConsumerConfig;  
 9 import kafka.consumer.ConsumerIterator;  
10 import kafka.consumer.KafkaStream;  
11 import kafka.javaapi.consumer.ConsumerConnector;  
12   
13   
14   
15   
16 /** 
17  * 接收数据 
18  * 接收到: message: 10 
19 接收到: message: 11 
20 接收到: message: 12 
21 接收到: message: 13 
22 接收到: message: 14 
23  * @author zm 
24  * 
25  */  
26 public class kafkaConsumer extends Thread{  
27   
28     private String topic;  
29       
30     public kafkaConsumer(String topic){  
31         super();  
32         this.topic = topic;  
33     }  
34       
35       
36     @Override  
37     public void run() {  
38         ConsumerConnector consumer = createConsumer();  
39         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
40         topicCountMap.put(topic, 1); // 一次从主题中获取一个数据  
41          Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
42          KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  
43          ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
44          while(iterator.hasNext()){  
45              String message = new String(iterator.next().message());  
46              System.out.println("接收到: " + message);  
47          }  
48     }  
49   
50     private ConsumerConnector createConsumer() {  
51         Properties properties = new Properties();  
52         //设置提交时间
53         properties.put("auto.commit.interval.ms", "1000");
54         properties.put("zookeeper.connect", "192.168.130.130:2181");//声明zk  
55         properties.put("group.id", "group2");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
56         return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
57      }  
58       
59       
60     public static void main(String[] args) {  
61         new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test   
62           
63     }  
64        
65 }  

 

技术分享

   ----解决:关闭 Linux 的防火墙

         ---/etc/init.d/iptables status 会得到一系列信息,说明防火墙开着

         ---/etc/rc.d/init.d/iptables stop 关闭防火墙

 

   ----进到kafka  bin 目录下输入命令查看消费状态

sufi@bogon bin]$ ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.130.130:2181 --group group1

技术分享

 

 

注意

技术分享

kafka java实例

标签:

原文地址:http://www.cnblogs.com/sufi/p/5201748.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!