标签:subclass 跳转 result ntc 定义 func depend .com accept
Twisted是用Python实现的基于事件驱动的网络引擎框架,是python中一个强大的异步IO库。理解twisted的一个前提是弄清楚twisted中几个核心的概念: reactor, Protocl, ProtocolFactory, Deffered
1 reactor
twisted.internet.reactor
https://twistedmatrix.com/documents/current/core/howto/reactor-basics.html
reactor是twisted异步框架中的核心组件,是一个基于select,poll或epoll的事件循环,其监听socket的状态,当socket状态有变化时(有新的连接请求,或接受到数据等)时,调用相应的组件来进行处理。如下图的reactor loop一样,不断的循环扫描socket列表中监听对象的状态,当有特定事件发生时(socket状态变化)调用回调函数callback,来处理事件,这时候执行权限交给回调函数,当我们的代码处理完事件后,将执行权返回给reactor,继续进行循环监听。
2,Factory和Protocol
twisted.internet.protocol.Factory
twisted.internet.protocol.Protocol
参考文章:
https://twistedmatrix.com/documents/current/core/howto/servers.html
https://twistedmatrix.com/documents/current/core/howto/clients.html
https://zhuanlan.zhihu.com/p/28763807?utm_source=wechat_session&utm_medium=social
Factory和Protocol都是用来处理一些配置和协议相关的底层业务,如socket之间连接,数据的发送格式等。Factory设置持久的,多个socket可共享的通用配置,Protocol为设置单个socket的特定配置。当有一个socket连接请求时,Factory创建一个Protocol实例,并将该实例的factory属性指向自己,请求断开时,protocol即被销毁。如下第一幅图中,基于twisted的一个异步服务器,其中endpiont为绑定的ip和端口,reactor监听其socket的状态,当有连接请求时,reactor调用Factory来设置相关配置,其随后会创建(buildProtocol)Protocol实例,Protocol实例的Transport属性会处理客户端socket的请求,并执行相应的回调函数。在第二幅图中可以看到,reactor共监听四个socket,一个是服务端listening socket(其绑定的ip和port),和三个客户端socket,而每个客户端socket都有自己的Protocol来处理相应的数据交互请求,这些Protocol都由Factory创建。(也可以有多个Factory,每个Factory创建多个Protocol)
Factory: 主要用来创建protocol,也可以定义其他操作
twisted.internet.protocol.Factory
Factory类的源码如下,其有属性protocol和方法buildProtocol()较为重要,其中protocol指向需要创建的Protocol类,从buildProtocol()方法可以看到其创建了Protocol实例,并且将该实例的factory属性指向了Factory 实例。startFactory()和stopFactory()相当于钩子函数,在factory和端口连接和断开时调用。在实际应用时,一般选择继承Factory的子类,并实现相应的方法,如ClientFactory,SeverFactory。
@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext) class Factory: """ This is a factory which produces protocols. By default, buildProtocol will create a protocol of the class given in self.protocol. """ # put a subclass of Protocol here: protocol = None numPorts = 0 noisy = True @classmethod def forProtocol(cls, protocol, *args, **kwargs): """ Create a factory for the given protocol. It sets the C{protocol} attribute and returns the constructed factory instance. @param protocol: A L{Protocol} subclass @param args: Positional arguments for the factory. @param kwargs: Keyword arguments for the factory. @return: A L{Factory} instance wired up to C{protocol}. """ factory = cls(*args, **kwargs) factory.protocol = protocol return factory def logPrefix(self): """ Describe this factory for log messages. """ return self.__class__.__name__ def doStart(self): """Make sure startFactory is called. Users should not call this function themselves! """ if not self.numPorts: if self.noisy: _loggerFor(self).info("Starting factory {factory!r}", factory=self) self.startFactory() self.numPorts = self.numPorts + 1 def doStop(self): """Make sure stopFactory is called. Users should not call this function themselves! """ if self.numPorts == 0: # this shouldn‘t happen, but does sometimes and this is better # than blowing up in assert as we did previously. return self.numPorts = self.numPorts - 1 if not self.numPorts: if self.noisy: _loggerFor(self).info("Stopping factory {factory!r}", factory=self) self.stopFactory() def startFactory(self): """This will be called before I begin listening on a Port or Connector. It will only be called once, even if the factory is connected to multiple ports. This can be used to perform ‘unserialization‘ tasks that are best put off until things are actually running, such as connecting to a database, opening files, etcetera. """ def stopFactory(self): """This will be called before I stop listening on all Ports/Connectors. This can be overridden to perform ‘shutdown‘ tasks such as disconnecting database connections, closing files, etc. It will be called, for example, before an application shuts down, if it was connected to a port. User code should not call this function directly. """ def buildProtocol(self, addr): """ Create an instance of a subclass of Protocol. The returned instance will handle input on an incoming server connection, and an attribute "factory" pointing to the creating factory. Alternatively, L{None} may be returned to immediately close the new connection. Override this method to alter how Protocol instances get created. @param addr: an object implementing L{twisted.internet.interfaces.IAddress} """ p = self.protocol() p.factory = self return p
继承Factory类,创建protocol实例,有两种方式,一是设置其属性protocol,不覆盖buildProtocol()方法;二是不设置属性protocol,覆盖buildProtocol()方法,在方法内部创建protocol实例,并返回。代码如下:
from twisted.internet.protocol import Factory, Protocol from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTD(Protocol): def connectionMade(self): # self.factory was set by the factory‘s default buildProtocol: self.transport.write(self.factory.quote + ‘\r\n‘) self.transport.loseConnection() class QOTDFactory(Factory): # This will be used by the default buildProtocol to create new protocols: protocol = QOTD def __init__(self, quote=None): self.quote = quote or ‘An apple a day keeps the doctor away‘ endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory("configurable quote")) reactor.run()
from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor class QOTD(Protocol): def connectionMade(self): # self.factory was set by the factory‘s default buildProtocol: self.transport.write(self.factory.quote + ‘\r\n‘) self.transport.loseConnection() class QOTDFactory(Factory): def buildProtocol(self, addr): return QOTD() # 8007 is the port you want to run under. Choose something >1024 endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(QOTDFactory()) reactor.run()
startFactory()和stopFactory()示例
from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver #LineReceiver为一种protocol class LoggingProtocol(LineReceiver): def lineReceived(self, line): self.factory.fp.write(line + ‘\n‘) class LogfileFactory(Factory): protocol = LoggingProtocol def __init__(self, fileName): self.file = fileName def startFactory(self): self.fp = open(self.file, ‘a‘) def stopFactory(self): self.fp.close()
Protocol:主要用来处理连接建立和断开时的操作,以及数据的接受和发送操作
twisted.internet.protocol.Protocol
Protocol继承了BaseProtocol, BaseProtocol和Protocol的源码如下:
class BaseProtocol: """ This is the abstract superclass of all protocols. Some methods have helpful default implementations here so that they can easily be shared, but otherwise the direct subclasses of this class are more interesting, L{Protocol} and L{ProcessProtocol}. """ connected = 0 transport = None def makeConnection(self, transport): """Make a connection to a transport and a server. This sets the ‘transport‘ attribute of this Protocol, and calls the connectionMade() callback. """ self.connected = 1 self.transport = transport self.connectionMade() def connectionMade(self): """Called when a connection is made. This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here. """
@implementer(interfaces.IProtocol, interfaces.ILoggingContext) class Protocol(BaseProtocol): """ This is the base class for streaming connection-oriented protocols. If you are going to write a new connection-oriented protocol for Twisted, start here. Any protocol implementation, either client or server, should be a subclass of this class. The API is quite simple. Implement L{dataReceived} to handle both event-based and synchronous input; output can be sent through the ‘transport‘ attribute, which is to be an instance that implements L{twisted.internet.interfaces.ITransport}. Override C{connectionLost} to be notified when the connection ends. Some subclasses exist already to help you write common types of protocols: see the L{twisted.protocols.basic} module for a few of them. """ def logPrefix(self): """ Return a prefix matching the class name, to identify log messages related to this protocol instance. """ return self.__class__.__name__ def dataReceived(self, data): """Called whenever data is received. Use this method to translate to a higher-level message. Usually, some callback will be made upon the receipt of each complete protocol message. @param data: a string of indeterminate length. Please keep in mind that you will probably need to buffer some data, as partial (or multiple) protocol messages may be received! I recommend that unit tests for protocols call through to this method with differing chunk sizes, down to one byte at a time. """ def connectionLost(self, reason=connectionDone): """Called when the connection is shut down. Clear any circular references here, and any external references to this Protocol. The connection has been closed. @type reason: L{twisted.python.failure.Failure} """
BaseProtocol有两个属性(connected和transport),从其makeConnection()中可以看到,每创建一个连接,conencted值加一,为transport赋值,并调用钩子函数connectionMade().
Protocol有两个钩子函数dataReceived()和connectionLost(), 分别在接受到客户端数据和断开连接时调用,自定义相应的代码来处理数据和断开连接时的清理工作。
twisted.protocols.basic
twisted.words.protocols
除了继承Protocol来定义操作外,还可以继承其他的协议,如twisted.protocols.basic中的LineReceiver, LineReceiver等,twisted.words.protocols.irc中的IRCClient等
可以参见:https://twistedmatrix.com/documents/18.9.0/api/twisted.protocols.basic.html
https://twistedmatrix.com/documents/current/api/twisted.words.protocols.irc.html
继承LineReciver
from twisted.protocols.basic import LineReceiver class Answer(LineReceiver): answers = {‘How are you?‘: ‘Fine‘, None: "I don‘t know what you mean"} def lineReceived(self, line): if line in self.answers: self.sendLine(self.answers[line]) else: self.sendLine(self.answers[None])
基于twisted的服务端
from twisted.internet.protocol import Factory from twisted.protocols.basic import LineReceiver from twisted.internet import reactor class Chat(LineReceiver): def __init__(self, users): self.users = users self.name = None self.state = "GETNAME" def connectionMade(self): self.sendLine("What‘s your name?") def connectionLost(self, reason): if self.name in self.users: del self.users[self.name] def lineReceived(self, line): if self.state == "GETNAME": self.handle_GETNAME(line) else: self.handle_CHAT(line) def handle_GETNAME(self, name): if name in self.users: self.sendLine("Name taken, please choose another.") return self.sendLine("Welcome, %s!" % (name,)) self.name = name self.users[name] = self self.state = "CHAT" def handle_CHAT(self, message): message = "<%s> %s" % (self.name, message) for name, protocol in self.users.iteritems(): if protocol != self: protocol.sendLine(message) class ChatFactory(Factory): def __init__(self): self.users = {} # maps user names to Chat instances def buildProtocol(self, addr): return Chat(self.users) reactor.listenTCP(8123, ChatFactory()) reactor.run()
继承IRCClient
from twisted.words.protocols import irc from twisted.internet import protocol class LogBot(irc.IRCClient): def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def signedOn(self): self.join(self.factory.channel) class LogBotFactory(protocol.ClientFactory): def __init__(self, channel, filename): self.channel = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p
基于twisted的客户端
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ An example IRC log bot - logs a channel‘s events to a file. If someone says the bot‘s name in the channel followed by a ‘:‘, e.g. <foo> logbot: hello! the bot will reply: <logbot> foo: I am a log bot Run this script with two arguments, the channel name the bot should connect to, and file to log to, e.g.: $ python ircLogBot.py test test.log will log channel #test to the file ‘test.log‘. To run the script: $ python ircLogBot.py <channel> <file> """ from __future__ import print_function # twisted imports from twisted.words.protocols import irc from twisted.internet import reactor, protocol from twisted.python import log # system imports import time, sys class MessageLogger: """ An independent logger class (because separation of application and protocol logic is a good thing). """ def __init__(self, file): self.file = file def log(self, message): """Write a message to the file.""" timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time())) self.file.write(‘%s %s\n‘ % (timestamp, message)) self.file.flush() def close(self): self.file.close() class LogBot(irc.IRCClient): """A logging IRC bot.""" nickname = "twistedbot" def connectionMade(self): irc.IRCClient.connectionMade(self) self.logger = MessageLogger(open(self.factory.filename, "a")) self.logger.log("[connected at %s]" % time.asctime(time.localtime(time.time()))) def connectionLost(self, reason): irc.IRCClient.connectionLost(self, reason) self.logger.log("[disconnected at %s]" % time.asctime(time.localtime(time.time()))) self.logger.close() # callbacks for events def signedOn(self): """Called when bot has successfully signed on to server.""" self.join(self.factory.channel) def joined(self, channel): """This will get called when the bot joins the channel.""" self.logger.log("[I have joined %s]" % channel) def privmsg(self, user, channel, msg): """This will get called when the bot receives a message.""" user = user.split(‘!‘, 1)[0] self.logger.log("<%s> %s" % (user, msg)) # Check to see if they‘re sending me a private message if channel == self.nickname: msg = "It isn‘t nice to whisper! Play nice with the group." self.msg(user, msg) return # Otherwise check to see if it is a message directed at me if msg.startswith(self.nickname + ":"): msg = "%s: I am a log bot" % user self.msg(channel, msg) self.logger.log("<%s> %s" % (self.nickname, msg)) def action(self, user, channel, msg): """This will get called when the bot sees someone do an action.""" user = user.split(‘!‘, 1)[0] self.logger.log("* %s %s" % (user, msg)) # irc callbacks def irc_NICK(self, prefix, params): """Called when an IRC user changes their nickname.""" old_nick = prefix.split(‘!‘)[0] new_nick = params[0] self.logger.log("%s is now known as %s" % (old_nick, new_nick)) # For fun, override the method that determines how a nickname is changed on # collisions. The default method appends an underscore. def alterCollidedNick(self, nickname): """ Generate an altered version of a nickname that caused a collision in an effort to create an unused related name for subsequent registration. """ return nickname + ‘^‘ class LogBotFactory(protocol.ClientFactory): """A factory for LogBots. A new protocol instance will be created each time we connect to the server. """ def __init__(self, channel, filename): self.channel = channel self.filename = filename def buildProtocol(self, addr): p = LogBot() p.factory = self return p def clientConnectionLost(self, connector, reason): """If we get disconnected, reconnect to server.""" connector.connect() def clientConnectionFailed(self, connector, reason): print("connection failed:", reason) reactor.stop() if __name__ == ‘__main__‘: # initialize logging log.startLogging(sys.stdout) # create factory protocol and application f = LogBotFactory(sys.argv[1], sys.argv[2]) # connect factory to this host and port reactor.connectTCP("irc.freenode.net", 6667, f) # run bot reactor.run()
3,Deferred 和 DeferredList
参考文章:
https://twistedmatrix.com/documents/current/core/howto/defer.html
https://twistedmatrix.com/documents/current/core/howto/gendefer.html
Deferred
twisted.internet.defer.Deferred
http://krondo.com/a-second-interlude-deferred/
https://twistedmatrix.com/documents/current/api/twisted.internet.defer.Deferred.html
Deferred用来管理回调函数,可以实现回调函数的链式执行。Deferred需要同时加入一对回调函数,一个calllback,正常执行时调用,一个errback,执行异常时调用。可以按顺序,依次加入多个函数对,执行时根据加入顺序依次链式执行。当只加入一个函数时,Deferred会默认加入一个另一个函数(但这个函数什么事情也不做,相当于pass),简单示例如下:
from twisted.internet import reactor from twisted.internet.protocol import Protocol,Factory from twisted.internet.defer import Deferred def down_page(url): print ‘去下载url网页‘ def down_failed(): print ‘若下载网页失败时执行down_failed()‘ def save_page(): print ‘保存网页‘ def close_task(): reactor.stop() d = Deferred() d.addCallbacks(down_page,down_failed) #相当于d.addCallback(down_page)和d.addErrback(down_failed) d.addCallback(save_page) # Deferred默认加入d.addErrback(save_failed),但save_failed什么也不做 d.addBoth(close_task) #相当于d.addCallbacks(close_task,close_task) reactor.callWhenRunning(d.callback,‘www.baidu.com‘)
reactor.run()
Deferred的执行流如下左图所示:其有两条链,一条callbacks链,一条errbacks链,正常情况下,其会按照callback链依次执行加入的每一个callback函数,但若发生异常时,则会跳过该callback,而执行errback。下面的每一个箭头都代表一条可能的执行路径。如右图中的一条路径,依次执行第一个,第二个callback,但由于第二个callback抛出异常,执行第三对回调函数的errback,该errback函数捕获了异常,继续执行第四对回调函数的callback。(若该errback未能处理异常,而继续传递异常,将执行第四对回调函数的errback)
Deferred在执行回调函数时,其内部也会再返回一个Deferred对象,此时外部Deferred会暂停执行,将执行权交给reactor,当reactor执行内部Deferred的回调函数时,内部Deferred最后会调用外部Deferred的回调函数而切换到外部Deferred继续执行。其执行流示意图如下:
Reactor和Deferred间的执行权限切换如下:
DeferredList
twisted.internet.defer.DeferredList
参考文章
http://krondo.com/deferreds-en-masse/
https://twistedmatrix.com/documents/current/api/twisted.internet.defer.DeferredList.html
deferredList用来管理多个deferred对象,监听其状态,得到所有deferred对象的回调函数最后的返回值或异常。deferredList也可以像defer一样添加回调函数,当其监听的所有deferred对象执行完成后,调用deferredList的回调函数。
4,inclineCallbacks
twisted.internet.defer.inlineCallbacks
参考文章:
http://krondo.com/just-another-way-to-spell-callback/
inclineCallbacks可以理解为innerdeferred的回调函数
理解inclineCallbacks,先来看看generator,下面的generator代码的执行结果如下,执行流程中可以看到,每次yield时,就从generator函数内部跳转到函数外部执行,函数外部执行后将结果send到generator内部,或者将异常throw到generator内部,generator内部函数继续执行,此处我们可以想象为generator函数执行到yield时,内部函数挂起,调用外部函数,外部函数执行完成后将结果返回给generator处理,然后内部函数继续执行。
#coding:utf-8 class Malfunction(Exception): pass def my_generator(): print ‘starting up‘ val = yield 1 print ‘got:‘, val val = yield 2 print ‘got:‘, val try: yield 3 except Malfunction: print ‘malfunction!‘ yield 4 print ‘done‘ gen = my_generator() #gen通过send,throw向my_generator函数内部发送值,赋值给val;my_generator函数内部通过yield返回值给gen print gen.next(), ‘from yield‘ # start the generator print gen.send(10), ‘from yield‘ # send the value 10 print gen.send(20), ‘from yield‘ # send the value 20 print gen.throw(Malfunction()), ‘from yield‘ # raise an exception inside the generator try: gen.next() except StopIteration: pass
因此我们可以写出如下的代码,实现内部函数调用。
#coding:utf-8 def func1(): print ‘执行回调函数1‘ return ‘result1‘ def func2(): print ‘执行回调函数2‘ return ‘result2‘ def my_generator(): print ‘内部函数开始执行‘ val1 = yield 1 print ‘回调函数1执行完成,返回结果:‘,val1 val2 = yield 2 print ‘回调函数2执行完成,返回结果:‘,val2 yield 3 print ‘内部函数结束执行‘ gen = my_generator() gen.next() t1 = func1() gen.send(t1) t2 = func2() gen.send(t2) try: gen.next() except StopIteration as e: pass
装饰器@inclineCallbacks必须和yield搭配使用,其作用相当于gen.next, send , throw, 当generator函数内部yield时,其负责拿到外部函数结果并返回给generator。若yield一个单独的值时,inclineCallbacks立即返回该值,继续执行generator函数,若yield 一个deferred对象时,内部函数挂起,等deferred的回调函数执行完毕后,将回调函数的结果或异常返回,generator才继续执行(若时异常时不捕获,yield会抛出该异常)。具体流程如下面代码:
from twisted.internet.defer import inlineCallbacks, Deferred @inlineCallbacks def my_callbacks(): from twisted.internet import reactor print ‘first callback‘ result = yield 1 # yielded values that aren‘t deferred come right back print ‘second callback got‘, result d = Deferred() reactor.callLater(5, d.callback, 2) #d的回调函数在5秒钟后才执行,yield d 会使generator等待d执行完毕 result = yield d # yielded deferreds will pause the generator print ‘third callback got‘, result # the result of the deferred d = Deferred() reactor.callLater(5, d.errback, Exception(3)) try: yield d except Exception, e: result = e print ‘fourth callback got‘, repr(result) # the exception from the deferred reactor.stop() from twisted.internet import reactor reactor.callWhenRunning(my_callbacks) reactor.run()
另外,对于装饰器@inclineCallbacks装饰的generator的返回值也是一个deferred对象:
from twisted.internet.defer import inlineCallbacks, Deferred from twisted.internet import reactor @inlineCallbacks def my_generator(): yield 1 d = Deferred() reactor.callLater(5, d.callback, 2) yield d yield 2 d = my_generator() print d, type(d) reactor.run()
5,框架使用
https://twistedmatrix.com/documents/current/core/howto/application.html
参考:
学习教程:http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/
官方文档:https://twistedmatrix.com/trac/wiki/Documentation
标签:subclass 跳转 result ntc 定义 func depend .com accept
原文地址:https://www.cnblogs.com/silence-cho/p/9898984.html