Producer.java
package com.favccxx.isoft.favkafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Hello Producer * * @author favccxx * */ public class Producers { private static Logger logger = LoggerFactory.getLogger(Producers.class); public static void main(String[] args) { try { // Get the Producer Properties props = new Properties(); props.put("bootstrap.servers", "10.0.10.5:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("fav-topic", "boy-" + i, "gril-" + i)); producer.close(); } catch (Exception e) { e.printStackTrace(); } } }
输出内容
[TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-1, value=gril-1 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] [TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-2, value=gril-2 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] [TRACE~16:30][main] Sending record ProducerRecord(topic=fav-topic, partition=null, key=boy-3, value=gril-3 with callback null to topic fav-topic partition 0 [org.apache.kafka.clients.producer.KafkaProducer][2016-03-31] ... |
Consumer.java
package com.favccxx.isoft.favkafka; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer { static Logger logger = LoggerFactory.getLogger(Consumer.class); private static KafkaConsumer<String, String> consumer; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "10.0.10.5:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("fav-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) logger.debug("@@@@@@@@@@@@@@@@@@" + record.key() + ":" + record.value()); } } }
输出内容
[DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-0:gril-0 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] [DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-1:gril-1 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] [DEBUG~16:30][main] @@@@@@@@@@@@@@@@@@boy-2:gril-2 [com.favccxx.isoft.favkafka.Consumer][2016-03-31] ... |
备注:如果在练习过程中出现java.net.ConnectException: Connection timed out: no further information这样的信息,一个是服务器地址不正确,另外可能就是启动Kafka服务器时没有指定host.name和advertised.host.name,具体操作在上篇文章中有详细描述。
本文出自 “这个人的IT世界” 博客,请务必保留此出处http://favccxx.blog.51cto.com/2890523/1763531
原文地址:http://favccxx.blog.51cto.com/2890523/1763531