标签:
生产者
1 package com; 2 import java.util.Properties; 3 import java.util.concurrent.TimeUnit; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 11 12 13 public class kafkaProducer extends Thread{ 14 15 private String topic; 16 17 public kafkaProducer(String topic){ 18 super(); 19 this.topic = topic; 20 } 21 22 23 @Override 24 public void run() { 25 Producer producer = createProducer(); 26 int i=0; 27 while(true){ 28 producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++)); 29 System.out.println("message: " + i); 30 try { 31 TimeUnit.SECONDS.sleep(1); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 } 36 } 37 38 private Producer createProducer() { 39 Properties properties = new Properties(); 40 properties.put("zookeeper.connect", "192.168.130.130:2181");//声明zk 41 properties.put("serializer.class", StringEncoder.class.getName()); 42 properties.put("metadata.broker.list", "192.168.130.130:9092");// 声明kafka broker 43 return new Producer<Integer, String>(new ProducerConfig(properties)); 44 } 45 46 47 public static void main(String[] args) { 48 new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test 49 // System.out.println("111"); 50 51 } 52 53 } 54
消费者
1 package com; 2 import java.util.HashMap; 3 import java.util.List; 4 import java.util.Map; 5 import java.util.Properties; 6 7 import kafka.consumer.Consumer; 8 import kafka.consumer.ConsumerConfig; 9 import kafka.consumer.ConsumerIterator; 10 import kafka.consumer.KafkaStream; 11 import kafka.javaapi.consumer.ConsumerConnector; 12 13 14 15 16 /** 17 * 接收数据 18 * 接收到: message: 10 19 接收到: message: 11 20 接收到: message: 12 21 接收到: message: 13 22 接收到: message: 14 23 * @author zm 24 * 25 */ 26 public class kafkaConsumer extends Thread{ 27 28 private String topic; 29 30 public kafkaConsumer(String topic){ 31 super(); 32 this.topic = topic; 33 } 34 35 36 @Override 37 public void run() { 38 ConsumerConnector consumer = createConsumer(); 39 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 40 topicCountMap.put(topic, 1); // 一次从主题中获取一个数据 41 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 42 KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 43 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 44 while(iterator.hasNext()){ 45 String message = new String(iterator.next().message()); 46 System.out.println("接收到: " + message); 47 } 48 } 49 50 private ConsumerConnector createConsumer() { 51 Properties properties = new Properties(); 52 //设置提交时间 53 properties.put("auto.commit.interval.ms", "1000"); 54 properties.put("zookeeper.connect", "192.168.130.130:2181");//声明zk 55 properties.put("group.id", "group2");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据 56 return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); 57 } 58 59 60 public static void main(String[] args) { 61 new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test 62 63 } 64 65 }
----解决:关闭 Linux 的防火墙
---/etc/init.d/iptables status 会得到一系列信息,说明防火墙开着
---/etc/rc.d/init.d/iptables stop 关闭防火墙
----进到kafka bin 目录下输入命令查看消费状态
sufi@bogon bin]$ ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect 192.168.130.130:2181 --group group1
注意
标签:
原文地址:http://www.cnblogs.com/sufi/p/5201748.html