码迷,mamicode.com
首页 > Web开发 > 详细

nbnet

时间:2017-01-27 22:59:03      阅读:534      评论:0      收藏:0      [点我收藏+]

标签:stream   out   end   ddr   pos   response   print   pdb   pytho   

  1 #!/usr/bin/env python
  2 # coding: utf-8
  3 
  4 from daemon import Daemon
  5 import socket
  6 import select
  7 import time
  8 import pdb
  9 
 10 __all__ = ["nbNet", "sendData_mh"]
 11 #DEBUG = True
 12 
 13 from nbNetUtils import *
 14 
 15 class nbNetBase:
 16     ‘‘‘non-blocking Net‘‘‘
 17     def setFd(self, sock):
 18         """sock is class object of socket"""
 19         #dbgPrint("\n -- setFd start!")
 20         tmp_state = STATE()
 21         tmp_state.sock_obj = sock
 22         self.conn_state[sock.fileno()] = tmp_state
 23         #self.conn_state[sock.fileno()].printState()
 24         #dbgPrint("\n -- setFd end!")
 25 
 26     def accept(self, fd): 
 27         """fd is fileno() of socket"""
 28         #dbgPrint("\n -- accept start!")
 29         sock_state = self.conn_state[fd]
 30         sock = sock_state.sock_obj
 31         conn, addr = sock.accept()
 32         # set to non-blocking: 0
 33         conn.setblocking(0)
 34         return conn
 35     
 36     def close(self, fd):
 37         """fd is fileno() of socket"""
 38         #pdb.set_trace()
 39         print "closing", fd, self.conn_state
 40         try:
 41             # cancel of listen to event
 42             sock = self.conn_state[fd].sock_obj
 43             self.epoll_sock.unregister(fd)
 44             sock.close()
 45             self.conn_state.pop(fd)
 46             tmp_pipe = self.popen_pipe
 47             self.popen_pipe = 0
 48             tmp_pipe.close()
 49         except:
 50             #dbgPrint("Close fd: %s abnormal" % fd)
 51             pass
 52     #@profile
 53     def read(self, fd):
 54         """fd is fileno() of socket"""
 55         #pdb.set_trace()
 56         try:
 57             sock_state = self.conn_state[fd]
 58             conn = sock_state.sock_obj
 59             if sock_state.need_read <= 0:
 60                 raise socket.error
 61 
 62             one_read = conn.recv(sock_state.need_read)
 63             #dbgPrint("\tread func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read))
 64             if len(one_read) == 0:
 65                 raise socket.error
 66             # process received data
 67             sock_state.buff_read += one_read
 68             sock_state.have_read += len(one_read)
 69             sock_state.need_read -= len(one_read)
 70             #sock_state.printState()
 71 
 72             # read protocol header
 73             if sock_state.have_read == 10:
 74                 header_said_need_read = int(sock_state.buff_read)
 75                 if header_said_need_read <= 0:
 76                     raise socket.error
 77                 sock_state.need_read += header_said_need_read
 78                 sock_state.buff_read = ‘‘
 79                 # call state machine, current state is read. 
 80                 # after protocol header haven readed, read the real cmd content, 
 81                 # call machine instead of call read() it self in common.
 82                 #sock_state.printState()
 83                 return "readcontent"
 84             elif sock_state.need_read == 0:
 85                 # recv complete, change state to process it
 86                 return "process"
 87             else:
 88                 return "readmore"
 89         except (socket.error, ValueError), msg:
 90             try:
 91                 if msg.errno == 11:
 92                     #dbgPrint("11 " + msg)
 93                     return "retry"
 94             except:
 95                 pass
 96             return closing
 97         
 98 
 99     #@profile
100     def write(self, fd):
101         sock_state = self.conn_state[fd]
102         conn = sock_state.sock_obj
103         #pdb.set_trace()
104         
105         if isinstance(sock_state.popen_pipe, file):
106             try:
107                 output = sock_state.popen_pipe.read()
108                 #print output
109             except (IOError, ValueError), msg:
110                 pass
111             #have_send = conn.send("%010d%s" % (len(output), output))
112             #todo
113 
114         else:
115             last_have_send = sock_state.have_write
116             try:
117                 # to send some Bytes, but have_send is the return num of .send()
118                 have_send = conn.send(sock_state.buff_write[last_have_send:])
119                 sock_state.have_write += have_send
120                 sock_state.need_write -= have_send
121                 if sock_state.need_write == 0 and sock_state.have_write != 0:
122                     # send complete, re init status, and listen re-read
123                     #sock_state.printState()
124                     #dbgPrint(‘\n write data completed!‘)
125                     return "writecomplete"
126                 else:
127                     return "writemore"
128             except socket.error, msg:
129                 return "closing"
130 
131 
132     def run(self):
133         while True:
134             #dbgPrint("\nrun func loop:")
135             # print conn_state
136             #for i in self.conn_state.iterkeys():
137                 #dbgPrint("\n - state of fd: %d" % i)
138                 #self.conn_state[i].printState()
139 
140             epoll_list = self.epoll_sock.poll()
141             for fd, events in epoll_list:
142                 #dbgPrint(‘\n-- run epoll return fd: %d. event: %s‘ % (fd, events))
143                 print self.conn_state
144                 print fd, events
145                 sock_state = self.conn_state[fd]
146                 if select.EPOLLHUP & events:
147                     #dbgPrint("EPOLLHUP")
148                     sock_state.state = "closing"
149                 elif select.EPOLLERR & events:
150                     #dbgPrint("EPOLLERR")
151                     sock_state.state = "closing"
152                 self.state_machine(fd)
153 
154     def state_machine(self, fd):
155         #time.sleep(0.1)
156         #dbgPrint("\n - state machine: fd: %d, status: %s" % (fd, self.conn_state[fd].state))
157         sock_state = self.conn_state[fd]
158         self.sm[sock_state.state](fd)
159 
160 class nbNet(nbNetBase):
161     def __init__(self, addr, port, logic):
162         #dbgPrint(‘\n__init__: start!‘)
163         self.conn_state = {}
164         self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
165         self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
166         self.listen_sock.bind((addr, port))
167         self.listen_sock.listen(10)
168         self.setFd(self.listen_sock)
169         self.epoll_sock = select.epoll()
170         # LT for default, ET add ‘ | select.EPOLLET ‘
171         self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN )
172         self.logic = logic
173         self.sm = {
174             "accept" : self.accept2read,
175             "read"   : self.read2process,
176             "write"  : self.write2read,
177             "process": self.process,
178             "closing": self.close,
179         }
180         #dbgPrint(‘\n__init__: end, register no: %s‘ % self.listen_sock.fileno() )
181 
182     #@profile
183     def process(self, fd):
184         sock_state = self.conn_state[fd]
185         response = self.logic(fd, sock_state.buff_read)
186         #pdb.set_trace()
187         if response == None:
188             conn = sock_state.sock_obj
189             self.setFd(conn)
190             self.conn_state[fd].state = "read"
191             self.epoll_sock.modify(fd, select.EPOLLIN)
192         else:  
193             sock_state.buff_write = "%010d%s" % (len(response), response)
194             sock_state.need_write = len(sock_state.buff_write)
195             #sock_state.printState()
196             #self.state_machine(fd)
197             sock_state.state = "write"
198             self.epoll_sock.modify(fd, select.EPOLLOUT)
199 
200              
201 
202     #@profile
203     def accept2read(self, fd):
204         conn = self.accept(fd)
205         self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
206         # new client connection fd be initilized 
207         self.setFd(conn)
208         self.conn_state[conn.fileno()].state = "read"
209         # now end of accept, but the main process still on ‘accept‘ status
210         # waiting for new client to connect it.
211         #dbgPrint("\n -- accept end!")
212 
213     #@profile
214     def read2process(self, fd):
215         """fd is fileno() of socket"""
216         #pdb.set_trace()
217         read_ret = ""
218         try:
219             read_ret = self.read(fd)
220         except (Exception), msg:
221             #dbgPrint(msg)
222             read_ret = "closing"
223         if read_ret == "process":
224             # recv complete, change state to process it
225             #sock_state.state = "process"
226             self.process(fd)
227         elif read_ret == "readcontent":
228             pass
229         elif read_ret == "readmore":
230             pass
231         elif read_ret == "retry":
232             pass
233         elif read_ret == "closing":
234             self.conn_state[fd].state = closing
235             # closing directly when error.
236             self.state_machine(fd)
237         else:
238             raise Exception("impossible state returned by self.read")
239 
240     #@profile
241     def write2read(self, fd):
242         try:
243             write_ret = self.write(fd)
244         except socket.error, msg:
245             write_ret = "closing"
246 
247         if write_ret == "writemore":
248             pass
249         elif write_ret == "writecomplete":
250             sock_state = self.conn_state[fd]
251             conn = sock_state.sock_obj
252             self.setFd(conn)
253             self.conn_state[fd].state = "read"
254             self.epoll_sock.modify(fd, select.EPOLLIN)
255         elif write_ret == "closing":
256             #dbgPrint(msg)
257             self.conn_state[fd].state = closing
258             # closing directly when error.
259             self.state_machine(fd)
260     
261 counter = 0
262 if __name__ == __main__:
263     
264     def logic(d_in):
265         global counter
266         counter += 1
267         if counter % 100000 == 0:
268             print counter, time.time()
269         return("a")
270 
271     reverseD = nbNet(0.0.0.0, 9099, logic)
272     reverseD.run()

 

nbnet

标签:stream   out   end   ddr   pos   response   print   pdb   pytho   

原文地址:http://www.cnblogs.com/newpython/p/6354017.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!