标签:
定义一个procucer
package cn.vko.common.kafka; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class VkoProducer { // private Logger log = LoggerFactory.getLogger(VkoProducer.class); private String metadataBrokerList; private Producer<String, String> producer; public VkoProducer(String metadataBrokerList) { super(); if(StringUtils.isEmpty(metadataBrokerList)){ String message = "metadataBrokerList 不可以为空"; // log.error(message); throw new RuntimeException(message); } this.metadataBrokerList = metadataBrokerList; // 设置配置属性 Properties props = new Properties(); props.put("metadata.broker.list", metadataBrokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); //props.put("producer.type", "async"); props.put("queue.buffering.max.ms", "5000"); props.put("queue.buffering.max.messages", "30000"); props.put("queue.enqueue.timeout.ms", "-1"); props.put("batch.num.messages", "1"); // 可选配置,如果不配置,则使用默认的partitioner //props.put("partitioner.class", "cn.vko.kafka.PartitionerDemo"); // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失 // 值为0,1,-1,可以参考 // http://kafka.apache.org/08/configuration.html ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } /** * 单条插入队列 * @param msg * @param topic 主题 * @return */ public String send(String topic, String msg) { // Long start = System.currentTimeMillis(); KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); producer.send(data); // log.info("发送消息耗时:{}",System.currentTimeMillis()- start); return "ok"; } }
定义一个receiver
package cn.vko.common.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor; import cn.vko.component.pageframework.util.StringUtil; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class Receiver { private Logger log = LoggerFactory.getLogger(Receiver.class); private String zookeeperConnect; private String groupId; private String topic; private VkoConsumer vkoConsumer; /** * 创建收件人 * @param zookeeperConnect zk集群地址,逗号分隔 * @param groupId 组id * @param topic 主题 * @param vkoConsumer 处理器 */ public Receiver(String zookeeperConnect, String groupId, String topic,VkoConsumer vkoConsumer) { super(); if(StringUtil.isEmpty(zookeeperConnect)){ String message = "zookeeperConnect 不可以为空"; log.error(message); throw new RuntimeException(message); } if(StringUtil.isEmpty(groupId)){ String message = "groupId 不可以为空"; log.error(message); throw new RuntimeException(message); } if(StringUtil.isEmpty(topic)){ String message = "topic 不可以为空"; log.error(message); throw new RuntimeException(message); } if(vkoConsumer == null){ String message = "vkoConsumer 不可以为空"; log.error(message); throw new RuntimeException(message); } this.zookeeperConnect = zookeeperConnect; this.groupId = groupId; this.topic = topic; this.vkoConsumer = vkoConsumer; log.info("kafka vkoConsumer 创建完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); receive(); } private void receive(){ Properties props = new Properties(); props.put("zookeeper.connect", zookeeperConnect); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "14000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector cc = Consumer.createJavaConsumerConnector(conf); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); // 目前每个topic都是2个分区 topicCountMap.put(topic,2); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (final KafkaStream<byte[], byte[]> stream : streams) { new Thread(){ public void run(){ ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ String msg = new String(it.next().message()); try{ vkoConsumer.dealMsg(msg); }catch(Exception e){ log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e); } log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg); } } }.start(); log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); } log.info("kafka vkoConsumer 准备接收消息:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); } }
package cn.vko.common.kafka; public interface VkoConsumer { public void dealMsg(String strings); }
在需要consumer的程序中定义一个消费实现类,并注入到receiver中,这样spring容器启动时会自动创建一个receiver,进行对特定的topic消费
<!-- 定义消息处理器 --> <bean id="sellStatConsumer" class="cn.vko.sell.kafka.consumer.SellStatConsumer" ></bean> <!-- 定义收信人 receiver --> <bean id="sellStatReceiver" class="cn.vko.common.kafka.Receiver"> <constructor-arg index="0" value="${zookeeper.connect}" /><!-- _zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_--> <constructor-arg index="1" value="${sellstat.group.id}" /><!-- _消费者所属组id字符串 ,如:vko_group_article_read_count_--> <constructor-arg index="2" value="${kafka.sellstat.topics}"/><!-- _要消费的消息主题,如:vko_group_--> <constructor-arg index="3" ref="sellStatConsumer" /> <!--_上面定义的消息处理器_--> </bean>
标签:
原文地址:http://www.cnblogs.com/lilixin/p/5725593.html