标签:
public class MySpout extends BaseRichSpout { private static final long serialVersionUID = 5028304756439810609L; // key:messageId,Data private HashMap<String, String> waitAck = new HashMap<String, String>(); private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { String sentence = "the cow jumped over the moon"; String messageId = UUID.randomUUID().toString().replaceAll("-", ""); waitAck.put(messageId, sentence); //指定messageId,开启ackfail机制 collector.emit(new Values(sentence), messageId); } @Override public void ack(Object msgId) { System.out.println("消息处理成功:" + msgId); System.out.println("删除缓存中的数据..."); waitAck.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("消息处理失败:" + msgId); System.out.println("重新发送失败的信息..."); //重发如果不开启ackfail机制,那么spout的map对象中的该数据不会被删除的,而且下游 collector.emit(new Values(waitAck.get(msgId)),msgId); } }
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } m.ack(id.offset); public void ack(Long offset) { _pending.remove(offset);//处理成功移除offset numberAcked++; }
public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } m.fail(id.offset); public void fail(Long offset) { failed.add(offset);//处理失败添加offset numberFailed++; } SortedSet<Long> _pending = new TreeSet<Long>(); SortedSet<Long> failed = new TreeSet<Long>();
关于kafkaspot的源码解析大家可以看这边博客:http://www.cnblogs.com/cruze/p/4241181.html
源码解析中涉及了很多kafka的概念,所以仅仅理解kafka的概念想完全理解kafkaspot源码是很难的,如果不理解kafka概念,那么就只需要在理解storm的ack机制上明白kafkaspot做了上面的两件事就可以了。
标签:
原文地址:http://www.cnblogs.com/intsmaze/p/5947078.html