现在开始介绍KafkaSpout源码了。
开始时,早open方法中做一些初始化,
........................ _state = new ZkState(stateConf); _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } ............
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { try { reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = System.currentTimeMillis(); refreshMillis = hosts.refreshFreqSecs * 1000L; } catch (java.net.SocketTimeoutException e) { LOG.warn("Failed to update brokers", e); } }
//ZkBrokerReader @Override public GlobalPartitionInformation getCurrentBrokers() { long currTime = System.currentTimeMillis(); if (currTime > lastRefreshTimeMs + refreshMillis) { // 当前时间大于和上次更新时间之差大于refreshMillis try { LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; } catch (java.net.SocketTimeoutException e) { LOG.warn("Failed to update brokers", e); } } return cachedBrokers; } // 下面是调用DynamicBrokersReader 的代码 /** * Get all partitions with their current leaders */ public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(); try { int numPartitionsForTopic = getNumPartitions(); String brokerInfoPath = brokerPath(); for (int partition = 0; partition < numPartitionsForTopic; partition++) { int leader = getLeaderFor(partition); String path = brokerInfoPath + "/" + leader; try { byte[] brokerData = _curator.getData().forPath(path); Broker hp = getBrokerHost(brokerData); globalPartitionInformation.addPartition(partition, hp); } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { LOG.error("Node {} does not exist ", path); } } } catch (SocketTimeoutException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); return globalPartitionInformation; }
//storm.kafka.DynamicPartitionConnections static class ConnectionInfo { SimpleConsumer consumer; Set<Integer> partitions = new HashSet(); public ConnectionInfo(SimpleConsumer consumer) { this.consumer = consumer; } }
再看ZkCoordinator类,看其构造函数
//storm.kafka.ZkCoordinator public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; }_refreshFreqMs就是定时更新zk partition到本地的操作,在kafkaSpout中nextTuple方法中每次都会去调用ZkCoordinator的getMyManagedPartitions方法。该方法根据_refreshFreqMs参数定时更新partition信息
//storm.kafka.ZkCoordinator @Override public List<PartitionManager> getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } @Override public void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); Set<Partition> curr = _managers.keySet(); Set<Partition> newPartitions = new HashSet<Partition>(mine); newPartitions.removeAll(curr); Set<Partition> deletedPartitions = new HashSet<Partition>(curr); deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList<PartitionManager>(_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); }
主要做的工作就是获取并行的task数,与当前partition做比较,得出一个COnsumer要负责哪些parititons的读取,具体算法去kafka文档吧
以上在KafkaSpout中做完了初始化操作,下面开始取数据发射数据了,来看nextTuple方法
// storm.kafka.KafkaSpout @Override public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { try { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } catch (FailedFetchException e) { LOG.warn("Fetch failed", e); _coordinator.refresh(); } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } }看完上述代码可知,所有的操作都是在PartitionManager中进行的,PartitionManager中会读取message信息,然后进行发射,主要逻辑在PartitionManager的next方法中
//returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { if (_waitingToEmit.isEmpty()) { fill(); } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }如果_waitingToEmit列表为空,则去读取msg,然后进行逐条发射,每发射一条,break一下,返回EMIT_MORE_LEFT给KafkaSpout的nextTuple方法中,,然后进行判断是否该paritition读取的一次读取的message buffer size是否已发射完毕,如果发射完毕就进行下一个partition 数据读取和发射,
注意的一点是,并不是一次把该partition的所有待发射的msg都发射完再commit offset到zk,而是发射一条,判断一下是否到了该commit的时候了(开始时设置的定时commit时间间隔),笔者认为这样做的原因是为了好控制fail
KafkaSpout中的ack,fail,commit操作全部交给了PartitionManager来做,看代码
@Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } @Override public void deactivate() { commit(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_spoutConfig.scheme.getOutputFields()); } private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } }
原文地址:http://blog.csdn.net/wzhg0508/article/details/40903919