标签:html5 springmvc mybatis bootstrap shiro
在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。
def hasNext(): Boolean = {
if(state == FAILED) //处于FAILED状态时,另外线程访问会直接异常
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
def maybeComputeNext(): Boolean = {
state = FAILED //重置了状态
nextItem = Some(makeNext())
if(state == DONE) {
false
} else {
state = READY
true
}
}
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don‘t have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take //channel是BlockingQueue这里会阻塞
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
//省略部分代码
}
kafka high-level consumer 多线程访问异常
标签:html5 springmvc mybatis bootstrap shiro
原文地址:http://12262093.blog.51cto.com/12252093/1869961