标签:解释 step 目的 ready 推断 并发 auto code 设置
前些天在公司这边写了个豌豆荚的爬虫。用到了分区思想和自己实现的线程池。我自己认为从这个过程中学到了非常多东西。包含怎样去设计接口和方便扩展以及代码的规范化。之前用小数据量測试了发现没什么问题,后来拿了W级以上的问题,发现插入的数码条目的量级和输入量级有非常大差异,就算算上失效的URL也不应出现这种情况,于是開始排查。反重复复看各个模块的代码。相应日志信息查看。最后发现时死锁问题导致的。
死锁(英语:Deadlock),又译为死锁。计算机科学名词。
当两个以上的运算单元,两方都在等待对方停止执行,以取得系统资源,可是>没有一方提前退出时,这样的状况,就称为死锁。在多任务操作系统中,操作系统为了协调不同进程,是否能取得系统资源时。为了让系统运
作,就必需要解决问题。
这里指的是进程死锁,是个计算机技术名词。它是操作系统或软件执行的一种状态:在多任务系统下,当一个或多个进程等待系统资源,
而资源又被进程本身或其他进程占用时,就形成了死锁。
上面的解释引自维基,死锁有进程间的死锁和线程间的死锁。仅仅要是并发情况。而且两方都在占有资源的情况下等待对方的资源。就会发生死锁。在实际情况下。非常easy不注意锁。条件变量的时候而导致死锁。
这次死锁发生在什么情况下呢?在最開始写线程池的时候。我设计了线程是可重用的。主要是通过Event信号实现,通过在每一个线程核心工作代码运行完成后会将自己归还到池中,然后等待Event信号。
主线程会以循环超时堵塞的方式监视一个任务队列,当发现有任务时便会从线程池中取出一个线程。并设置它的任务和目标函数,然后去start或者resume,resume就是会设置Event信号让线程不再堵塞,这里,从池中取线程的方法_get和归还线程方法returnThread都已经加锁。_get和returnThread使用同一把相互排斥锁。由于在_get和returnThread方法里面对线程池对象以及分区对象都有状态改动而且有些操作有条件推断,因此必须加锁保证线程安全和同步。
这样就真正线程安全了吗?能够顺利依照预期运行了吗?看起来好像没有问题。而且我这里设置的分区数目是4。分区的初始容量是5,最大容量为20,故池的总大小为4*20=80,这样对于小数目的測试确实发现不了死锁问题。
考虑以下一种情况:
我们通过上面的描写叙述,发现死锁的发生是由于条件等待时没有释放锁资源。细致思考这句话。会发现事实上我们也熟知的Condition就是为了解决问题的。
python里面的threading.Condition里面会内置Lock/RLock锁。而且可以条件等待时释放锁资源,这样,将之前的淡出的相互排斥锁改成condition,而且在queue.get方法时。先推断条件是否满足(有可用线程),假设可用则直接往后运行,否则cond.wait堵塞而且释放锁。还有一方面,正在运行的thread通过returnThread时。也是通过cond.acquire来加锁,然后这样当主线程cond.wait的时候可以有机会获得锁,然后运行。当returnThread快要结束,已经归还后。cond.notify/conf.notify_all来通知在等待该条件的主线程。
这样就行顺利运行。
def get(self):
self.cond.acquire()
try:
if self._shutdown:
raise RuntimeError(‘ThreadPool already shutdown.‘)
else:
for partition in self.partitions:
logger.info("parition #%d status:" % partition.get_partition_no())
logger.info("current load: %.2f" % partition.get_load())
logger.info("used size: %d" % partition.get_used_size())
logger.info("max size: %d" % partition.get_max_size())
partition = self.get_best_partition()
logger.info("best partition: %d" % partition.get_partition_no())
if partition.get_load() >= self.config.get_partition_upper_load_factor():
self.expand_pool_for_partition(partition, self.config.get_partition_increase_step())
logger.debug("partition avail size: %d" % partition.get_avail_size())
if partition.get_avail_size() == 0:
self.cond.wait()
tid = partition.take()
thread = self.object_pool[tid]
del self.object_pool[tid]
#update access time
thread.set_atime(time.time())
partition.increase_active_thread_count()
logger.debug("active thread count after get: %d" % self.get_active_thread_count())
return thread
finally:
self.cond.release()
def _return(self,thread):
self.cond.acquire()
logger.info("return back...")
try:
if (time.time() - thread.get_atime()) > self.config.get_timeout():
logger.info("destroy thread #%d" % thread.ident)
self.factory.destroy(thread)
else:
self.object_pool[thread.tid] = thread
self.partitions[thread.get_partition()].put(thread.tid)
self.partitions[thread.get_partition()].decrease_active_thread_count()
logger.info("return thread #%d back to pool on partition #%d" % (thread.ident,thread.get_partition()))
for partition in self.partitions:
logger.info("partition #%d status:" % partition.get_partition_no())
logger.info("current load: %.2f" % partition.get_load())
logger.info("used size: %d" % partition.get_used_size())
logger.info("max size: %d" % partition.get_max_size())
logger.debug("active thread count after return: %d" % self.get_active_thread_count())
self.cond.notify()
except Exception,e:
print e
#if return error, we should kill this thread
self.factory.destroy(thread)
finally:
self.cond.release()
今天后面測试的时候发如今抓了8000条左右后出现了堵塞。仍然是在池中的线程用完的时候,大家不知道从上面的代码中发现问题没有?
原因在于上面的代码尽管有wait和notify,可是假设return的线程并非best partition中的线程,那么partition.take依旧会堵塞,这就是问题所在!
??确实非常easy看花眼添加一个get_avail_parition函数。查找pool中第一个可用的parition,然后从这个分区取
?def get_avail_partition(self):
for partition in self.partitions:
if partition.get_avail_size()>0:
return partition
if partition.get_avail_size()==0:
self.cond.wait()
** partition = self.get_avail_partition()**
标签:解释 step 目的 ready 推断 并发 auto code 设置
原文地址:http://www.cnblogs.com/llguanli/p/7111346.html