码迷,mamicode.com
首页 > Web开发 > 详细

nbNet

时间:2017-02-13 21:58:33      阅读:232      评论:0      收藏:0      [点我收藏+]

标签:response   utf-8   cep   lis   mac   res   ...   normal   mon   

#!/usr/bin/env python
# coding: utf-8

from daemon import Daemon
import socket
import select
import time
import pdb

__all__ = ["nbNet", "sendData_mh"]
#DEBUG = True

from nbNetUtils import *

class nbNet():
    ‘‘‘non-blocking Net‘‘‘
    def setFd(self, sock):
        tmp_state = STATE()
        tmp_state.sock_obj = sock
        self.conn_state[sock.fileno()] = tmp_state
     

    
    def close(self, fd):
        """fd is fileno() of socket"""
        #pdb.set_trace()
        print "closing", fd, self.conn_state
        try:
            # cancel of listen to event
            sock = self.conn_state[fd].sock_obj
            self.epoll_sock.unregister(fd)
            sock.close()
            self.conn_state.pop(fd)
            tmp_pipe = self.popen_pipe
            self.popen_pipe = 0
            tmp_pipe.close()
        except:
            #dbgPrint("Close fd: %s abnormal" % fd)
            pass
    
    def read(self, fd):
        """fd is fileno() of socket"""
        #pdb.set_trace()
        try:
            sock_state = self.conn_state[fd]
            conn = sock_state.sock_obj
            if sock_state.need_read <= 0:
                raise socket.error

            one_read = conn.recv(sock_state.need_read)
            #dbgPrint("\tread func fd: %d, one_read: %s, need_read: %d" % (fd, one_read, sock_state.need_read))
            if len(one_read) == 0:
                raise socket.error
            # process received data
            sock_state.buff_read += one_read
            sock_state.have_read += len(one_read)
            sock_state.need_read -= len(one_read)
            #sock_state.printState()

            # read protocol header
            if sock_state.have_read == 10:
                header_said_need_read = int(sock_state.buff_read)
                if header_said_need_read <= 0:
                    raise socket.error
                sock_state.need_read += header_said_need_read
                sock_state.buff_read = ‘‘
                # call state machine, current state is read. 
                # after protocol header haven readed, read the real cmd content, 
                # call machine instead of call read() it self in common.
                #sock_state.printState()
                return "readcontent"
            elif sock_state.need_read == 0:
                # recv complete, change state to process it
                return "process"
            else:
                return "readmore"
        except (socket.error, ValueError), msg:
            try:
                if msg.errno == 11:
                    #dbgPrint("11 " + msg)
                    return "retry"
            except:
                pass
            return closing
        

    
    def write(self, fd):
        sock_state = self.conn_state[fd]
        conn = sock_state.sock_obj
        #pdb.set_trace()
        
        if isinstance(sock_state.popen_pipe, file):
            try:
                output = sock_state.popen_pipe.read()
                #print output
            except (IOError, ValueError), msg:
                pass
            #have_send = conn.send("%010d%s" % (len(output), output))
            #todo

        else:
            last_have_send = sock_state.have_write
            try:
                # to send some Bytes, but have_send is the return num of .send()
                have_send = conn.send(sock_state.buff_write[last_have_send:])
                sock_state.have_write += have_send
                sock_state.need_write -= have_send
                # 不需要write 而且不是初始状态 表示write完毕
                if sock_state.need_write == 0 and sock_state.have_write != 0:
                    # send complete, re init status, and listen re-read
                    #sock_state.printState()
                    #dbgPrint(‘\n write data completed!‘)
                    return "writecomplete"
                else:
                    return "writemore"
            except socket.error, msg:
                return "closing"


    def run(self):
        while True:
            print in loop ...
            epoll_list = self.epoll_sock.poll()
            # loop handle every events 
            for fd, events in epoll_list:
                print all socket state:, self.conn_state
                print poll get fd and events:, fd, events
                sock_state = self.conn_state[fd]
                if select.EPOLLHUP & events:
                    print "EPOLLHUP"
                    sock_state.state = "closing"
                elif select.EPOLLERR & events:
                    print "EPOLLERR"
                    sock_state.state = "closing"
                self.state_machine(fd)

    def state_machine(self, fd):
        sock_state = self.conn_state[fd]
        self.sm[sock_state.state](fd)
       
    def __init__(self, addr, port, logic):
        self.conn_state = {}
        self.listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.listen_sock.bind((addr, port))
        self.listen_sock.listen(10)
        self.setFd(self.listen_sock)
        self.epoll_sock = select.epoll()
        # LT for default, ET add ‘ | select.EPOLLET ‘
        self.epoll_sock.register(self.listen_sock.fileno(), select.EPOLLIN )
        self.logic = logic
        self.sm = {
            "accept" : self.accept2read,
            "read"   : self.read2process,
            "write"  : self.write2read,
            "process": self.process,
            "closing": self.close,
        }
        #dbgPrint(‘\n__init__: end, register no: %s‘ % self.listen_sock.fileno() )

    
    def process(self, fd):
        sock_state = self.conn_state[fd]
        response = self.logic(sock_state.buff_read)
        #pdb.set_trace()
        if response == None:
            conn = sock_state.sock_obj
            self.setFd(conn)
            self.conn_state[fd].state = "read"
            self.epoll_sock.modify(fd, select.EPOLLIN)
        else:  
            sock_state.buff_write = "%010d%s" % (len(response), response)
            sock_state.need_write = len(sock_state.buff_write)
            #sock_state.printState()
            #self.state_machine(fd)
            sock_state.state = "write"
            self.epoll_sock.modify(fd, select.EPOLLOUT)
    

    def accept2read(self, fd):
        sock_state = self.conn_state[fd]
        sock = sock_state.sock_obj
        conn, addr = sock.accept()
        conn.setblocking(0)

        self.epoll_sock.register(conn.fileno(), select.EPOLLIN)
        # new client connection fd be initilized 
        self.setFd(conn)
        self.conn_state[conn.fileno()].state = "read"

    
    def read2process(self, fd):
        """fd is fileno() of socket"""
        #pdb.set_trace()
        read_ret = ""
        try:
            read_ret = self.read(fd)
        except (Exception), msg:
            #dbgPrint(msg)
            read_ret = "closing"
        if read_ret == "process":
            # recv complete, change state to process it
            #sock_state.state = "process"
            self.state_machine(fd)
        elif read_ret == "readcontent":
            pass
        elif read_ret == "readmore":
            pass
        elif read_ret == "retry":
            pass
        elif read_ret == "closing":
            self.conn_state[fd].state = closing
            # closing directly when error.
            self.state_machine(fd)
        else:
            raise Exception("impossible state returned by self.read")

    
    def write2read(self, fd):
        try:
            write_ret = self.write(fd)
        except socket.error, msg:
            write_ret = "closing"

        if write_ret == "writemore":
            pass
        elif write_ret == "writecomplete":
            sock_state = self.conn_state[fd]
            conn = sock_state.sock_obj
            self.setFd(conn)
            self.conn_state[fd].state = "read"
            self.epoll_sock.modify(fd, select.EPOLLIN)
        elif write_ret == "closing":
            #dbgPrint(msg)
            self.conn_state[fd].state = closing
            # closing directly when error.
            self.state_machine(fd)
    
counter = 0
if __name__ == __main__:
    
    def logic(d_in):
        global counter
        counter += 1
        if counter % 100000 == 0:
            print counter, time.time()
        return("a")

    reverseD = nbNet(0.0.0.0, 9099, logic)
    reverseD.run()

 

nbNet

标签:response   utf-8   cep   lis   mac   res   ...   normal   mon   

原文地址:http://www.cnblogs.com/newpython/p/6395212.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!