PartitionManager算是storm-kafka的核心类了,现在开始简单分析一下。还是先声明一下,metric部分这里不做分析。
PartitionManager主要负责的是消息的发送、容错处理,所以PartitionManager会有三个集合
public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } }
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.partition); _state = state; _stormConf = stormConf; numberAcked = numberFailed = 0; String jsonTopologyId = null; Long jsonOffset = null; String path = committedPath(); try { Map<Object, Object> json = _state.readJSON(path); // 从zk读取offset LOG.info("Read partition information from: " + path + " --> " + json ); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } /** * 根据用户设置的startOffsetTime,值来读取offset(-2 从kafka头开始 -1 是从最新的开始 0 =无 从ZK开始) **/ Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? _committedTo = currentOffset; LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); } /** * 下面这个if判断是如果当前读取的offset值与提交到zk的值不一致,且相差Long.MAX_VALUE,就认为中间很大部分msg发射了没有提交,就把这部分全部放弃,避免重发 * 令_committedTo = currentOffset, 这个是新修复的bug,之前maxOffsetBehind=100000(好像是这个值,这个太小),我后面看下这个bug issue再把这部分解释清楚一下,现在有点迷糊了 **/ if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { LOG.info("Last commit offset from zookeeper: " + _committedTo); _committedTo = currentOffset; LOG.info("Commit offset " + _committedTo + " is more than " + spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime); } LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo); _emittedToOffset = _committedTo; _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); }刚开始的时候需要读取message,放到_waitingToEmit中,这是fill的过程,看代码
private void fill() { long start = System.nanoTime(); long offset; // 首先要判断是否有fail的offset, 如果有的话,在需要从这个offset开始往下去读取message,所以这里有重发的可能 final boolean had_failed = !failed.isEmpty(); // Are there failed tuples? If so, fetch those first. if (had_failed) { offset = failed.first(); // 取失败的最小的offset值, } else { offset = _emittedToOffset; } ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); _fetchAPILatencyMean.update(millis); _fetchAPICallCount.incr(); if (msgs != null) { int numMessages = 0; for (MessageAndOffset msg : msgs) { final Long cur_offset = msg.offset(); if (cur_offset < offset) { // Skip any old offsets. continue; } /** * 只要是没有失败的或者失败的set中含有该offset(因为失败msg有很多,我们只是从最小的offset开始读取msg的) * ,就把这个message放到待发射的list中 **/ if (!had_failed || failed.contains(cur_offset)) { numMessages += 1; _pending.add(cur_offset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (had_failed) { // 如果失败列表中含有该offset,就移除,因为要重新发射了。 failed.remove(cur_offset); } } } _fetchAPIMessageCount.incrBy(numMessages); } }
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) { ByteBufferMessageSet msgs = null; String topic = config.topic; int partitionId = partition.partition; for (int errors = 0; errors < 2 && msgs == null; errors++) { //容忍两次错误 FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). clientId(config.clientId).maxWait(config.fetchMaxWait).build(); FetchResponse fetchResponse; try { fetchResponse = consumer.fetch(fetchRequest); } catch (Exception e) { if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException || e instanceof UnresolvedAddressException ) { LOG.warn("Network error when fetching messages:", e); throw new FailedFetchException(e); } else { throw new RuntimeException(e); } } if (fetchResponse.hasError()) { // 主要处理offset outofrange的case,通过getOffset从earliest或latest读 KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) { long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime); LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + "retrying with default start offset time from configuration. " + "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]"); offset = startOffset; } else { String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; LOG.error(message); throw new FailedFetchException(message); } } else { msgs = fetchResponse.messageSet(topic, partitionId); } } return msgs; }
//returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); // 开始时获取message } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //每次读取一条 if (toEmit == null) { return EmitState.NO_EMITTED; } // 如果忘记了,可以再返回看下自定义scheme这篇 : http://blog.csdn.net/wzhg0508/article/details/40874155 Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { //这个地方在讲述自定义Scheme时,提到了 collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; // 这里就是每成功发射一天msg,就break掉,返回emitstate给kafkaSpout的nextTuple中做判断和定时commit成功处理的offset到zk } else { ack(toEmit.offset); // ack 做清除工作 } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
public void ack(Long offset) { if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) { // Too many things pending! _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear(); } _pending.remove(offset); numberAcked++; }
public void commit() { long lastCompletedOffset = lastCompletedOffset(); if (_committedTo != lastCompletedOffset) { LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } }
public void fail(Long offset) { if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { LOG.info( "Skipping failed tuple at offset=" + offset + " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + _emittedToOffset ); } else { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); failed.add(offset); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); } } }之前storm-kafka-0.8plus的版本是这样的(摘自storm-kafka-0.8-plus 源码解析)
首先作者没有cache message,而只是cache offset
所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉
所以他的做法是,当一个offset fail的时候, 直接将_emittedToOffset回滚到当前fail的这个offset
下次从Kafka fetch的时候会从_emittedToOffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题
所以使用时,一定要考虑,是否可以接受重复问题
public void fail(Long offset) { //TODO: should it use in-memory ack set to skip anything that's been acked but not committed??? // things might get crazy with lots of timeouts if (_emittedToOffset > offset) { _emittedToOffset = offset; _pending.tailSet(offset).clear(); } }
(六)storm-kafka源码走读之PartitionManager
原文地址:http://blog.csdn.net/wzhg0508/article/details/40928125