标签:string target kafka byte cli clu log lan client
自定义分区器:
import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; /** * @author King老师 */ public class SelfPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //拿到 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); //TODO 分区数 int num = partitionInfos.size(); //TODO 根据value与分区数求余的方式得到分区ID int parId = ((String)value).hashCode()%num; return parId; } public void close() { //do nothing } public void configure(Map<String, ?> configs) { //do nothing } }
配置到kafka:
ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.enjoyedu.selfpartition.SelfPartitioner"
更多配置参考:《kafka2.5.0生产者与消费者配置详解》
end.
标签:string target kafka byte cli clu log lan client
原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13191646.html