标签:code 自己的 let except 否则 ant play href conf
在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者。
kafka Producer发布消息记录到Kakfa集群。生产者是线程安全的,在线程之间共享生产者实例。一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main
方法里就能直接运行,
public class ProducerDemo { private static final String KAFKA_TOPIC="kafka-topic"; public static void main(String[] args) { Map<String, Object> configs = new HashMap<String, Object>(); configs.put("bootstrap.servers", "localhost:9092"); configs.put("acks", "all"); configs.put("retries", 0); configs.put("batch.size", 16384); configs.put("linger.ms", 1); configs.put("buffer.memory", 33554432); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer=new KafkaProducer<String, String>(configs); ProducerRecord<String, String> record=null; for (int i = 0; i <10; i++) { record=new ProducerRecord<String, String>(KAFKA_TOPIC, "record-"+i); Future<RecordMetadata> future=producer.send(record); try { RecordMetadata recordMetadata=future.get(); System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(),recordMetadata.offset()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } producer.close(); } }
kafka的producer的send()方法提供多种重载:
send是异步的,一旦消息被保存在等待发送的消息缓存中,此方法就立即返回,这样可以你并行发送多条消息而不阻塞去等待每一条消息的响应。发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,时间戳是broker的本地时间。由于send调用是异步的,它将为分配消息的此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。如果要模拟一个简单的阻塞调用,你可以调用get()方法。
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();
完全无阻塞的话,可以利用参数提供的回调函数处理请求完成时的回调通知。
record=new ProducerRecord<String, String>(KAFKA_TOPIC, "CallbackRecord-"+i); producer.send(record,new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.format("PARTITION: %d OFFSET: %d\n", metadata.partition(),metadata.offset()); } });
发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果需要执行阻塞或耗时的回调,建议在callback主体中使用自己的Executor来并行处理。
标签:code 自己的 let except 否则 ant play href conf
原文地址:http://www.cnblogs.com/wxgblogs/p/6718940.html