标签:override atomic close next logs put rand comm getc
package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * 自定义分区方式 */ public class CustomPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); public CustomPartitioner() { } public void configure(Map<String, ?> configs) { } /** * 自定义分区规则 * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if(keyBytes == null) { int nextValue = this.nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if(availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic); if(null == counter) { counter = new AtomicInteger((new Random()).nextInt()); AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter); if(currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() { } }
package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * 消息生产者 * @author xiaojf 2017/3/22 14:27 */ public class MsgProducer extends Thread { private final KafkaProducer<String, String> producer; private final String topic; private final boolean isAsync; public MsgProducer(String topic, boolean isAsync) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址 properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数 // properties.load("properties配置文件"); this.producer = new KafkaProducer<String, String>(properties); this.topic = topic; this.isAsync = isAsync; } @Override public void run() { int msgNo = 0; while (true) { String msg = "Msg: " + msgNo; String key = msgNo + ""; if (isAsync) {//异步 producer.send(new ProducerRecord<String, String>(this.topic,msg)); // producer.send(new ProducerRecord<String, String>(this.topic, key, msg)); } else {//同步 producer.send(new ProducerRecord<String, String>(this.topic, key, msg), new MsgProducerCallback(System.currentTimeMillis(), key, msg)); } } } /** * 消息发送后的回调函数 */ class MsgProducerCallback implements Callback { private final long startTime; private final String key; private final String msg; public MsgProducerCallback(long startTime, String key, String msg) { this.startTime = startTime; this.key = key; this.msg = msg; } public void onCompletion(RecordMetadata recordMetadata, Exception e) { long elapsedTime = System.currentTimeMillis() - startTime; if (recordMetadata != null) { System.out.println(msg + " be sended to partition no : " + recordMetadata.partition()); } } } public static void main(String args[]) { new MsgProducer("my-replicated-topic",true).start();//开始发送消息 } }
标签:override atomic close next logs put rand comm getc
原文地址:http://www.cnblogs.com/xiaojf/p/6602691.html