标签:
最近在学习python-twisted库,之前做异步并发编程一直都是使用c++,比如linux下的epoll机制,windows的IOCP机制,到后来经常使用的Boost::Asio库,比较搞的是c++上的异步经验反而有点阻碍我开始学习twisted库,原因如下:
不管是epoll还是IOCP,都是当读操作或者写操作可执行时(IOCP则是完成时)会得到一个通知,然后可以执行自己的下一步代码。这种情景下如果你是做较大数据的收发的话,就可以在得到通知时进行必要的处理后继续发送或者接受下一个数据块,但是twisted的库则不是这样设计的,以twisted.protocols.basic.LineReceiver为例,当异步收到数据后,该类的lineReceived函数或者rawDataReceived(取决于当前的模式)会被触发,但是它并不包含异步写函数,也就是说你无法这样操作:
1、调用异步发送函数发送数据
2、数据发送完成时触发一个回调函数
3、在该回调函数中你决定后续的操作,比如继续发送下一块数据
那么twisted的解决方案是什么?答案是producer/consumer ,producer负责源源不断的产生数据(如不断从file中读取数据),consumer负责处理数据(通过socket发送出去),当consumer在发送时缓存满了它会通知producer暂停提供数据,当缓存清空后继续通知producer产生数据(这就类似我们上面提到的,当异步发送完成时调用一个回调函数),这个的过程不需要我们在代码中显示的控制。由于文件发送应用的普遍性,twisted提供了一个文件发送类FileSender,好啦,描述性的内容就到这里,后续关于producer/consumer我会进一步介绍,下面直接上代码
from twisted.internet.protocol import Protocol,Factory from twisted.protocols.basic import LineReceiver from twisted.protocols.basic import FileSender from twisted.internet.defer import Deferred import pickle,struct import multiprocessing.pool import os,json class TransferFileProtocol(LineReceiver): port_ = 0 def __init__(self): self.handle_f_ = None #-------------------------- self.instruction_ = None self.command_ = None self.size_remain_ = 0 #self.setRawMode() def _monitor(self,data): self.size_remain_-=len(data) return data def cbTransferOver(self, lastSent): print(‘download over!‘) self.transport.loseConnection() def connectionMade(self): print(‘Got Connection from ‘,self.transport.client) print ‘work on port:%d‘ % (self.port_,) def connectionLost(self,reason): print(self.transport.client, ‘ disconnected!‘) if self.handle_f_ != None: self.handle_f_.close() if self.size_remain_ != 0: print ‘transfer file fail!‘ # if self.command_ == ‘put‘: # os. def lineReceived(self, line): self.instruction_ = json.loads(line) self.command_ = self.instruction_[‘command‘] name_file = self.instruction_[‘name_file‘] if self.command_ == ‘put‘: self.size_remain_ = self.instruction_[‘size_file‘] try: self.handle_f_ = open(‘files/%s‘ % (name_file,),‘wb‘) except: print ‘open %s fail!‘ % (name_file,) self.handle_f_ = None # self.transfer.loseConnection() else: self.setRawMode() elif ‘get‘ == self.command_: self.size_remain_ = 0 if os.path.exists(‘files/%s‘ % (name_file,)): self.size_remain_ = os.stat(‘files/%s‘ % (name_file,)).st_size instruction = dict(size_file=self.size_remain_) if self.size_remain_>0: try: self.handle_f_ = open(‘files/%s‘ % (name_file,),‘rb‘) except: self.size_remain_ = 0 self.handle_f_ = None else: self.setRawMode() self.transport.write(json.dumps(instruction)+‘\r\n‘) if self.size_remain_>0: sender = FileSender() sender.CHUNCK_SIZE = 2**16 d = sender.beginFileTransfer(self.handle_f_,self.transport,self._monitor) d.addCallback(self.cbTransferOver) def rawDataReceived(self, data): print ‘length of data:%d‘ % (len(data),) if ‘put‘ == self.command_ and self.handle_f_ != None: self.handle_f_.write(data) self.size_remain_ -= len(data) def recv_func(port): from twisted.internet import epollreactor epollreactor.install() from twisted.internet import reactor TransferFileProtocol.port_ = port factory = Factory() factory.protocol = TransferFileProtocol reactor.listenTCP(port,factory) reactor.run() if __name__ ==‘__main__‘: ports = [6200,6202,6204,6206] pool = multiprocessing.pool.Pool(len(ports)) pool.map(recv_func,ports) # recv_func(6200)
标签:
原文地址:http://www.cnblogs.com/loongxu/p/4465833.html