标签:
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
项目实例:https://github.com/windwant/kafka-test
kafka.properties
value.serializer=org.apache.kafka.common.serialization.StringSerializer key.serializer=org.apache.kafka.common.serialization.StringSerializer request.required.acks=1 bootstrap.servers=localhost:9092 value.deserializer=org.apache.kafka.common.serialization.StringDeserializer key.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=test-consumer-group
Producer:
package org.windwant.kafka; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * Producer */ public class MyKafkaProducer { private Properties props; public static void main(String[] args) throws ConfigurationException { new MyKafkaProducer().start(); } public MyKafkaProducer() throws ConfigurationException { props = new Properties(); PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties"); config.setReloadingStrategy(new FileChangedReloadingStrategy()); //×??ˉ±£′? config.setAutoSave(true); props.put("value.serializer", config.getString("value.serializer")); props.put("key.serializer", config.getString("key.serializer")); props.put("request.required.acks", config.getString("request.required.acks")); props.put("bootstrap.servers", config.getString("bootstrap.servers")); } public void start(){ try { Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) { RecordMetadata result = producer.send(new ProducerRecord<>("mykafka", "kafka key: " + Integer.toString(i), "kafka value: " + Integer.toString(i))).get(); System.out.println("producer send: " + result); Thread.sleep(1000); } producer.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Consumer:
package org.windwant.kafka; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Consumer. */ public class MyKafkaConsumer { private Properties props; public static void main(String[] args) throws ConfigurationException { new MyKafkaConsumer().start(); } public MyKafkaConsumer() throws ConfigurationException { props = new Properties(); PropertiesConfiguration config = new PropertiesConfiguration("kafka.properties"); config.setReloadingStrategy(new FileChangedReloadingStrategy()); //自动保存 config.setAutoSave(true); props.put("value.deserializer", config.getString("value.deserializer")); props.put("key.deserializer", config.getString("key.deserializer")); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("group.id", config.getString("group.id")); } public void start(){ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("mykafka")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); System.out.println(); } } } }
标签:
原文地址:http://www.cnblogs.com/niejunlei/p/5978279.html