标签:response utf-8 cep lis mac res ... normal mon
#!/usr/bin/env python # coding: utf-8 from daemon import Daemon import socket import select import time import pdb __all__ = ["nbNet", "sendData_mh"] #DEBUG = True from nbNetUtils import * class nbNet(): ‘‘‘non-blocking Net‘‘‘ def setFd(self, sock): tmp_state = STATE() tmp_state.sock_obj = sock self.conn_state[sock.fileno()] = tmp_state def close(self, fd): """fd is fileno() of socket""" #pdb.set_trace() print "closing", fd, self.conn_state try: # cancel of listen to event sock = self.conn_state[fd].sock_obj self.epoll_sock.unregister(fd) sock.close() self.conn_state.pop(fd) tmp_pipe = self.popen_pipe self.popen_pipe = 0 tmp_pipe.close() except: #dbgPrint("Close fd: %s abnormal" % fd) pass def read(self, fd): """fd is fileno() of socket""" #pdb.set_trace() try: sock_state = self.conn_state[fd] conn = sock_state.sock_obj if sock_state.need_read <= 0: raise socket.error one_read = conn.recv(sock_state.need_read) #dbgPrint("\tread func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read)) if len(one_read) == 0: raise socket.error # process received data sock_state.buff_read += one_read sock_state.have_read += len(one_read) sock_state.need_read -= len(one_read) #sock_state.printState() # read protocol header if sock_state.have_read == 10: header_said_need_read = int(sock_state.buff_read) if header_said_need_read <= 0: raise socket.error sock_state.need_read += header_said_need_read sock_state.buff_read = ‘‘ # call state machine, current state is read. # after protocol header haven readed, read the real cmd content, # call machine instead of call read() it self in common. #sock_state.printState() return "readcontent" elif sock_state.need_read == 0: # recv complete, change state to process it return "process" else: return "readmore" except (socket.error, ValueError), msg: try: if msg.errno == 11: #dbgPrint("11 " + msg) return "retry" except: pass return ‘closing‘ def write(self, fd): sock_state = self.conn_state[fd] conn = sock_state.sock_obj #pdb.set_trace() if isinstance(sock_state.popen_pipe, file): try: output = sock_state.popen_pipe.read() #print output except (IOError, ValueError), msg: pass #have_send = conn.send("%010d%s" % (len(output), output)) #todo else: last_have_send = sock_state.have_write try: # to send some Bytes, but have_send is the return num of .send() have_send = conn.send(sock_state.buff_write[last_have_send:]) sock_state.have_write += have_send sock_state.need_write -= have_send # 不需要write 而且不是初始状态 表示write完毕 if sock_state.need_write == 0 and sock_state.have_write != 0: # send complete, re init status, and listen re-read #sock_state.printState() #dbgPrint(‘\n write data completed!‘) return "writecomplete" else: return "writemore" except socket.error, msg: return "closing" def run(self): while True: print ‘in loop ...‘ epoll_list = self.epoll_sock.poll() # loop handle every events for fd, events in epoll_list: print ‘all socket state:‘, self.conn_state print ‘poll get fd and events:‘, fd, events sock_state = self.conn_state[fd] if select.EPOLLHUP & events: print "EPOLLHUP" sock_state.state = "closing" elif select.EPOLLERR & events: print "EPOLLERR" sock_state.state = "closing" self.state_machine(fd) def state_machine(self, fd): sock_state = self.conn_state[fd] self.sm[sock_state.state](fd) def __init__(self, addr, port, logic): self.conn_state = {} self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listen_sock.bind((addr, port)) self.listen_sock.listen(10) self.setFd(self.listen_sock) self.epoll_sock = select.epoll() # LT for default, ET add ‘ | select.EPOLLET ‘ self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN ) self.logic = logic self.sm = { "accept" : self.accept2read, "read" : self.read2process, "write" : self.write2read, "process": self.process, "closing": self.close, } #dbgPrint(‘\n__init__: end, register no: %s‘ % self.listen_sock.fileno() ) def process(self, fd): sock_state = self.conn_state[fd] response = self.logic(sock_state.buff_read) #pdb.set_trace() if response == None: conn = sock_state.sock_obj self.setFd(conn) self.conn_state[fd].state = "read" self.epoll_sock.modify(fd, select.EPOLLIN) else: sock_state.buff_write = "%010d%s" % (len(response), response) sock_state.need_write = len(sock_state.buff_write) #sock_state.printState() #self.state_machine(fd) sock_state.state = "write" self.epoll_sock.modify(fd, select.EPOLLOUT) def accept2read(self, fd): sock_state = self.conn_state[fd] sock = sock_state.sock_obj conn, addr = sock.accept() conn.setblocking(0) self.epoll_sock.register(conn.fileno(), select.EPOLLIN) # new client connection fd be initilized self.setFd(conn) self.conn_state[conn.fileno()].state = "read" def read2process(self, fd): """fd is fileno() of socket""" #pdb.set_trace() read_ret = "" try: read_ret = self.read(fd) except (Exception), msg: #dbgPrint(msg) read_ret = "closing" if read_ret == "process": # recv complete, change state to process it #sock_state.state = "process" self.state_machine(fd) elif read_ret == "readcontent": pass elif read_ret == "readmore": pass elif read_ret == "retry": pass elif read_ret == "closing": self.conn_state[fd].state = ‘closing‘ # closing directly when error. self.state_machine(fd) else: raise Exception("impossible state returned by self.read") def write2read(self, fd): try: write_ret = self.write(fd) except socket.error, msg: write_ret = "closing" if write_ret == "writemore": pass elif write_ret == "writecomplete": sock_state = self.conn_state[fd] conn = sock_state.sock_obj self.setFd(conn) self.conn_state[fd].state = "read" self.epoll_sock.modify(fd, select.EPOLLIN) elif write_ret == "closing": #dbgPrint(msg) self.conn_state[fd].state = ‘closing‘ # closing directly when error. self.state_machine(fd) counter = 0 if __name__ == ‘__main__‘: def logic(d_in): global counter counter += 1 if counter % 100000 == 0: print counter, time.time() return("a") reverseD = nbNet(‘0.0.0.0‘, 9099, logic) reverseD.run()
标签:response utf-8 cep lis mac res ... normal mon
原文地址:http://www.cnblogs.com/newpython/p/6395212.html