标签:
最近想测试一下kafka的性能,折腾了大半天才把kafka安装到window上。下文提供安装的整个过程,绝对是可用的完整的,同时提供完整的kafka java客户端代码,用于与kafka沟通。在这里必须吐槽一下,网上大部分关于如何把kafka安装到window上的文章,要么不完整,要么kafka客户端代码是错误的或者不是基于0.8版本的。但是必须提醒一下,这篇文章只是介绍了其中的一种安装方法,可能不是最简洁的。
package com.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Sender {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "127.0.0.1:9092");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String, String> producer = new<String, String> Producer(producerConfig);
String topic = "hellotest";
KeyedMessage<String, String> message = new<String, String> KeyedMessage(topic, "Sam Hello Test message2");
producer.send(message);
producer.close();
}
}
package com.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerDemo {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerDemo(String zookeeper, String groupid, String aTopic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerProps(zookeeper, groupid));
this.topic = aTopic;
}
public void run(int threads) {
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, new Integer(threads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(threads);
int numThread = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerDemoRun(stream, numThread));
numThread++;
}
}
private static ConsumerConfig ConsumerProps(String zookeeper, String groupid) {
Properties properties = new Properties(); // config properties file
properties.put("zookeeper.connect", zookeeper);
properties.put("group.id", groupid);
properties.put("zookeeper.session.timeout.ms", "400");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "smallest");
return new ConsumerConfig(properties);
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public static void main(String[] args) {
String zookeeper = "localhost:2181";
String groupid = "group1";
String topic = "hellotest";
int threads = 1;
ConsumerDemo test = new ConsumerDemo(zookeeper, groupid, topic);
test.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
test.shutdown();
}
}
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.1</version> </dependency>
标签:
原文地址:http://blog.csdn.net/rrz634171/article/details/51443732