1. asyncore.loop([timeout[, use_poll[, map[, count]]]]) 进入轮询循环直到所有打开的通道已被关闭或计数通过。 所有的参数都是可选的。 count参数默认为None,只有当所有通道都被关闭时循环才会终止。 timeout参数设置为select()或poll()调用设置超时,以秒为单位,默认为30秒。 use_poll参数,如果为true ,则表示 poll()优先于select(),默认值为False。 map是包含监控的channel的字典。channel关闭时会从map中删除。不指定map时会使用全局map。 Channel(asyncore.dispatcher , asynchat.async_chat和其子类的实例)可以自由地混合在map上)。 2. class asyncore.dispatcher: dispatcher底层socket的轻便包装。它有一些事件处理供异步循环调用。 否则,它可以被视为一个正常的非阻塞socket对象。底层事件的触发或者连接状态改变通知异步循环发生了高层事件, 高层事件有: . handle_connect():Implied by the first read or write event。 . handle_close():Implied by a read event with no data available。 . handle_accept():Implied by a read event on a listening socket。 3. asyncore.dispatcher的新方法: 在异步处理,每个映射通道的readable()和writable()方法用于确定通道的socket 是否应该被添加到select()或poll()通道的频道列表中以读写事件。因此通道事件比基本socket 事件要多。 在子类中重写的方法如下: . handle_read():当异步循环检测到通道的read()将成功时调用。 . handle_write():当异步循环检测到通道的write()将成功时调用。需要缓冲以提高性能。 def handle_write(self): sent = self.send(self.buffer) self.buffer = self.buffer[sent:]
The Python asyncore and aynchat modules The Python standard library provides two modules—asyncore and asynchat—to help in writing concurrent network servers using event-based designs. The documentation does not give good examples, so I am making some notes.
1. Overview
The basic idea behind the asyncore module is that: . there is a function, asyncore.loop() that does select() on a bunch of ‘channels’. Channels are thin wrappers around sockets. . when select returns, it reports which sockets have data waiting to be read, which ones are now free to send more data, and which ones have errors; loop() examines the event and the socket’s state to create a higher level event; . it then calls a method on the channel corresponding to the higher level event.
asyncore provides a low-level, but flexible API to build network servers. asynchat builds upon asyncore and provides an API that is more suitable for request/response type of protocols.
2. aysncore
The asyncore module’s API consists of: . the loop() method, to be called by a driver program written by you; . the dispatcher class, to be subclassed by you to do useful stuff. The dispatcher class is what is called ‘channel’ elsewhere. +-------------+ +--------+ | driver code |---------> | loop() | +-------------+ +--------+ | | | | loop-dispatcher API (a) | | | +--------------+ | | dispatcher | +----------------->| subclass | +--------------+ | | dispatcher-logic API (b) | +--------------+ | server logic | +--------------+ This is all packaged nicely in an object oriented way. So, we have the dispatcher class, that extends/wraps around the socket class (from the socket module in the Python standard library). It provides all the socket class’ methods, as well as methods to handle the higher level events. You are supposed to subclass dispatcher and implement the event handling methods to do something useful.
3. The loop-dispatcher API
The loop function looks like this: loop( [timeout[, use_poll[, map[,count]]]])
What is the map? It is a dictionary whose keys are the file-descriptors, or fds, of the socket (i.e., socket.fileno()), and whose values are the dispatcher objects which you want to handle events on that socket/fd.
When we create a new dispatcher object, it automatically gets added to a global list of sockets (which is invisible to us, and managed behind the scenes). The loop() function does a select() on this list.
We can over-ride the list that loop looks at, by providing an explicit map. But then, we would need to add/remove dispatchers we create to/from this map ourselves. (Hmm… we might always wantto use explicit maps; then our loop calls will be thread safe and we will be able to launch multiple threads, each calling loop on different maps.)
4. Methods a dispatcher subclass should implement
loop() needs the dispatcher to implement some methods: . readable(): should return True, if you want the fd to be observed for read events; . writable(): should return True, if you want the fd to be observed for write events;
If either readable or writable returns True, the corresponding fd will be examined for errors also. Obviously, it makes no sense to have a dispatcher which returns False for both readable and writable.
Some other methods that loop calls on dispatchers are: . handle_read: socket is readable; dispatcher.recv() can be used to actually get the data . handle_write: socket is writable; dispatcher.send(data) can be used to actually send the data . handle_error: socket encountered an error . handle_expt: socket received OOB data (not really used in practice) . handle_close: socket was closed remotely or locally
For server dispatchers, loop calls one more event: . handle_accept: a new incoming connection can be accept()ed. Call the accept() method really accept the connection. To create a server socket, call the bind() and listen() methods on it first. Client sockets get this event: . handle_connect: connection to remote endpoint has been made. To initiate the connection, first call the connect() method on it. Client sockets are discussed in the asyncore documentation so I will not discuss them here.
Other socket methods are available in dispatcher: create_socket, close, set_resue_addr. They are not called by loop but are available so that your code can call them when it needs to create a new socket, close an existing socket, and tell the OS to set the SO_REUSEADDR flag on the server socket.
5. How to write a server using asyncore
The standard library documentation gives a client example, but not a server example. Here are some notes on the latter. 1. Subclass dispatcher to create a listening socket 2. In its handle_accept method, create new dispatchers. They’ll get added to the global socket map.
Note: the handlers must not block or take too much time… or the server won’t be concurrent. This is because when multiple sockets get an event, loop calls their dispatchers one-by-one, in the same thread.
The socket-like functions that dispatcher extends should not be bypassed in order to access the low level socket functions. They do funky things to detect higher level events. For e.g., how does asyncore figure out that the socket is closed? If I remember correctly, there are two ways to detect whether a non-blocking socket is closed:
select() returns a read event, but when you call recv()/read() you get zero bytes; you call send()/write() and it fails with an error (sending zero bytes is not an error). (I wish I had a copy of Unix Network Programming by Stevens handy right now.) dispatcher will detect both events above and if any one of them occurs, will call handle_close. This frees you from having to look at low-level events, and think in terms of higher level events.
The code for a server based on asyncore is below: asyncore_echo_server.py import logging import asyncore import socket
logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s") log = logging.getLogger(__name__) BACKLOG = 5 SIZE = 1024 class EchoHandler(asyncore.dispatcher): def __init__(self, conn_sock, client_address, server): self.server = server self.client_address = client_address self.buffer = "" # We dont have anything to write, to start with self.is_writable = False # Create ourselves, but with an already provided socket asyncore.dispatcher.__init__(self, conn_sock) log.debug("created handler; waiting for loop") def readable(self): return True # We are always happy to read def writable(self): return self.is_writable # But we might not have # anything to send all the time def handle_read(self): log.debug("handle_read") data = self.recv(SIZE) log.debug("after recv") if data: log.debug("got data") self.buffer += data self.is_writable = True # sth to send back now else: log.debug("got null data") def handle_write(self): log.debug("handle_write") if self.buffer: sent = self.send(self.buffer) log.debug("sent data") self.buffer = self.buffer[sent:] else: log.debug("nothing to send") if len(self.buffer) == 0: self.is_writable = False # Will this ever get called? Does loop() call # handle_close() if we called close, to start with? def handle_close(self): log.debug("handle_close") log.info("conn_closed: client_address=%s:%s" % \ (self.client_address[0], self.client_address[1])) self.close() #pass class EchoServer(asyncore.dispatcher): allow_reuse_address = False request_queue_size = 5 address_family = socket.AF_INET socket_type = socket.SOCK_STREAM def __init__(self, address, handlerClass=EchoHandler): self.address = address self.handlerClass = handlerClass asyncore.dispatcher.__init__(self) self.create_socket(self.address_family, self.socket_type) if self.allow_reuse_address: self.set_reuse_addr() self.server_bind() self.server_activate() def server_bind(self): self.bind(self.address) log.debug("bind: address=%s:%s" % (self.address[0], self.address[1])) def server_activate(self): self.listen(self.request_queue_size) log.debug("listen: backlog=%d" % self.request_queue_size) def fileno(self): return self.socket.fileno() def serve_forever(self): asyncore.loop() # TODO: try to implement handle_request() # Internal use def handle_accept(self): (conn_sock, client_address) = self.accept() if self.verify_request(conn_sock, client_address): self.process_request(conn_sock, client_address) def verify_request(self, conn_sock, client_address): return True def process_request(self, conn_sock, client_address): log.info("conn_made: client_address=%s:%s" % \ (client_address[0], client_address[1])) self.handlerClass(conn_sock, client_address, self) def handle_close(self): self.close() and to use it:
interface = "0.0.0.0" port = 8080 server = asyncore_echo_server.EchoServer((interface, port)) server.serve_forever()