标签:prope cond try bytes ide rop unp serial apr
kafka_2.12-2.0.0.jar、kafka-clients-2.0.0.jar、log4j-1.2.17.jar、slf4j-api-1.7.25.jar、slf4j-log4j12-1.7.25.jar
package kafka_proj; public interface IkafkaConstants { public static String KAFKA_BROKERS = "192.168.65.130:9092"; // public static String KAFKA_BROKERS = "192.168.65.130:9092, 192.168.65.131:9092, 192.168.65.132:9092"; public static Integer MESSAGE_COUNT = 1000; public static String CLIENT_ID = "0"; public static String TOPIC_NAME = "java"; public static String GROUP_ID_CONFIG = "Group1"; public static Integer MAX_NO_MESSAGE_FOUND_COUNT = 100; public static String OFFSET_RESET_LATEST = "latest"; public static String OFFSET_RESET_EARLIER = "earliest"; public static Integer MAX_POLL_RECORDS = 1; }
package kafka_proj; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import kafka_proj.IkafkaConstants; public class ConsumerCreator { public static Consumer<Long, String> createConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IkafkaConstants.KAFKA_BROKERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, IkafkaConstants.GROUP_ID_CONFIG); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, IkafkaConstants.MAX_POLL_RECORDS); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IkafkaConstants.OFFSET_RESET_EARLIER); Consumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(IkafkaConstants.TOPIC_NAME)); return consumer; } }
package kafka_proj; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class CustomPartitioner implements Partitioner{ private static final int PARTITION_COUNT=6; @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer keyInt=Integer.parseInt(key.toString()); return keyInt % PARTITION_COUNT; } @Override public void close() { } }
package kafka_proj; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import kafka_proj.IkafkaConstants; public class ProducerCreator { public static Producer<Long, String> createProducer(){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IkafkaConstants.KAFKA_BROKERS); props.put(ProducerConfig.CLIENT_ID_CONFIG, IkafkaConstants.CLIENT_ID); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName()); return new KafkaProducer<>(props); } }
package kafka_proj; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import kafka_proj.IkafkaConstants; import kafka_proj.ConsumerCreator; import kafka_proj.ProducerCreator; public class App { public static void main(String[] args) { //runProducer(); runConsumer(); } static void runConsumer() { Consumer<Long, String> consumer = ConsumerCreator.createConsumer(); int noMessageFound = 0; while (true) { System.out.println("true"); ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); // 1000 is the time in milliseconds consumer will wait if no record is found at broker. if (consumerRecords.count() == 0) { noMessageFound++; if (noMessageFound > IkafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT) // If no message found count is reached to threshold exit loop. break; else continue; } //print each record. consumerRecords.forEach(record -> { System.out.println("Record Key " + record.key()); System.out.println("Record value " + record.value()); System.out.println("Record partition " + record.partition()); System.out.println("Record offset " + record.offset()); }); // commits the offset of record to broker. consumer.commitAsync(); } consumer.close(); } static void runProducer() { Producer<Long, String> producer = ProducerCreator.createProducer(); for (int index = 0; index < IkafkaConstants.MESSAGE_COUNT; index++) { ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(IkafkaConstants.TOPIC_NAME, "This is record " + index); try { RecordMetadata metadata = producer.send(record).get(); System.out.println("Record sent with key " + index + " to partition " + metadata.partition() + " with offset " + metadata.offset()); } catch (ExecutionException e) { System.out.println("Error in sending record"); System.out.println(e); } catch (InterruptedException e) { System.out.println("Error in sending record"); System.out.println(e); } } } }
标签:prope cond try bytes ide rop unp serial apr
原文地址:https://www.cnblogs.com/ryu-manager/p/9436708.html