标签:message let metadata ams i++ rgs 分区 core creat
创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
[root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
查看Kafka的主题详情
[root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
查看Kafka所有的主题
[root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
删除Kafka指定的主题
[root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
如删除时提示
Topic guowang1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
请修改Kafka/config/server.properties新增delete.topic.enable=true
kafka_2.10-0.8.2.0.jar
kafka-clients-0.8.2.0.jar
metrics-core-2.2.0.jar
scala-library-2.10.4.jar
zkclient-0.3.jar
zookeeper-3.4.6.jar
1、生产者
1 package storm.test.kafka;
2
3 import java.util.Properties;
4
5 import kafka.javaapi.producer.Producer;
6 import kafka.producer.KeyedMessage;
7 import kafka.producer.ProducerConfig;
8 import kafka.serializer.StringEncoder;
9
10 public class TestProducer {
11
12 public static void main(String[] args) throws Exception {
13 Properties prop = new Properties();
14 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
15 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
16 prop.put("serializer.class", StringEncoder.class.getName());
17 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));
18 int i = 0;
19 while(true){
20 producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));
21 Thread.sleep(1000);
22 }
23 }
24
25 }
2、消费者
1 package storm.test.kafka;
2
3 import java.util.HashMap;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.Properties;
7
8 import kafka.consumer.Consumer;
9 import kafka.consumer.ConsumerConfig;
10 import kafka.consumer.ConsumerIterator;
11 import kafka.consumer.KafkaStream;
12 import kafka.javaapi.consumer.ConsumerConnector;
13 import kafka.serializer.StringEncoder;
14
15 public class TestConsumer {
16
17 static final String topic = "test111";
18
19 public static void main(String[] args) {
20 Properties prop = new Properties();
21 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
22 prop.put("serializer.class", StringEncoder.class.getName());
23 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
24 prop.put("group.id", "group1");
25 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
26 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
27 topicCountMap.put(topic, 1);
28 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
29 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
30 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
31 while (iterator.hasNext()) {
32 String msg = new String(iterator.next().message());
33 System.out.println("收到消息:"+msg);
34 }
35 }
36
37 }
标签:message let metadata ams i++ rgs 分区 core creat
原文地址:http://www.cnblogs.com/kms1989/p/6701067.html