标签:
使用java实现Kafka的生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | package com.lisg.kafkatest; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.Partitioner; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; /** * Kafka生产者 * @author lisg * */ public class KafkaProducer { public static void main(String[] args) { Properties props = new Properties(); //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个 props.put( "metadata.broker.list" , "vm1:9092,vm2:9092" ); //消息传递到broker时的序列化方式 props.put( "serializer.class" , StringEncoder. class .getName()); //zk集群 props.put( "zookeeper.connect" , "vm1:2181" ); //是否获取反馈 //0是不获取反馈(消息有可能传输失败) //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败) //-1是所有in-sync replicas接受到消息时的反馈 props.put( "request.required.acks" , "1" ); // props.put("partitioner.class", MyPartition.class.getName()); //创建Kafka的生产者, key是消息的key的类型, value是消息的类型 Producer<Integer, String> producer = new Producer<Integer, String>( new ProducerConfig(props)); int count = 0 ; while ( true ) { String message = "message-" + ++count; //消息主题是test KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>( "test" , message); //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区 // KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message); producer.send(keyedMessage); System.out.println( "send: " + message); try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } // producer.close(); } } /** * 自定义分区类 * */ class MyPartition implements Partitioner { public int partition(Object key, int numPartitions) { return key.hashCode()%numPartitions; } } |
标签:
原文地址:http://www.cnblogs.com/lishouguang/p/4560559.html