标签:
最近想测试一下kafka的性能,折腾了大半天才把kafka安装到window上。下文提供安装的整个过程,绝对是可用的完整的,同时提供完整的kafka java客户端代码,用于与kafka沟通。在这里必须吐槽一下,网上大部分关于如何把kafka安装到window上的文章,要么不完整,要么kafka客户端代码是错误的或者不是基于0.8版本的。但是必须提醒一下,这篇文章只是介绍了其中的一种安装方法,可能不是最简洁的。
package com.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class Sender { public static void main(String[] args) { Properties prop = new Properties(); prop.put("metadata.broker.list", "127.0.0.1:9092"); prop.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(prop); Producer<String, String> producer = new<String, String> Producer(producerConfig); String topic = "hellotest"; KeyedMessage<String, String> message = new<String, String> KeyedMessage(topic, "Sam Hello Test message2"); producer.send(message); producer.close(); } }
package com.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String zookeeper, String groupid, String aTopic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerProps(zookeeper, groupid)); this.topic = aTopic; } public void run(int threads) { Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(threads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threads); int numThread = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerDemoRun(stream, numThread)); numThread++; } } private static ConsumerConfig ConsumerProps(String zookeeper, String groupid) { Properties properties = new Properties(); // config properties file properties.put("zookeeper.connect", zookeeper); properties.put("group.id", groupid); properties.put("zookeeper.session.timeout.ms", "400"); properties.put("zookeeper.sync.time.ms", "200"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "smallest"); return new ConsumerConfig(properties); } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public static void main(String[] args) { String zookeeper = "localhost:2181"; String groupid = "group1"; String topic = "hellotest"; int threads = 1; ConsumerDemo test = new ConsumerDemo(zookeeper, groupid, topic); test.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } test.shutdown(); } }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.1</version> </dependency>
标签:
原文地址:http://blog.csdn.net/rrz634171/article/details/51443732