标签:font space back 报错 block fms turn sort strong
jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题
需要对这两个类进行适当的修改来避免上述问题:
storm.kafka.PartitionManager
storm.kafka.ExponentialBackoffMsgRetryManager
1.storm.kafka.PartitionManager的修改
//将变量
private SortedMap<Long, Long> _pending = new TreeMap();
//改为:
private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>());
/**----------------------------------------------------------------------------------------------------**/
//将方法
public long lastCompletedOffset() {
return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue();
}
//改为:
public long lastCompletedOffset() {
synchronized (_pending) {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.firstKey();
}
}
}
2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//将 private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator()); private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap(); //改为: private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator()); private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
标签:font space back 报错 block fms turn sort strong
原文地址:http://www.cnblogs.com/Stubborn-Ant/p/7293987.html