标签:xstream abort readline net 定义 模块 reads 特权 计算
3.1 ForkingMixIn - 多进程(限 linux)
以下例子算狭义上实现多用户访问服务,但都是同步执行,也就是一个用户连接关闭,下个用户才可以开始执行向服务发送请求数据。
其实现的核心是服务器接收连接部分写在死循环内,可以一直保持接收新用户端发起的请求的状态。
1 import socket 2 3 HOST = ‘127.0.0.1‘ 4 PORT = 50008 5 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 6 s.bind((HOST, PORT)) 7 s.listen(5) 8 9 while True: 10 print ("开始进入监听状态...") 11 conn, addr = s.accept() 12 print ("接收到连接:", addr) 13 while True: 14 try: 15 data = conn.recv(1024) 16 if not data: 17 print("断开客户端连接") 18 break 19 print ("收到客户端数据:", data.decode("utf-8")) 20 msg = "这是一个循环版多连接服务测试" 21 conn.sendall(msg.encode("utf-8")) 22 except socket.error: 23 break 24 conn.close()
1 import socket 2 3 HOST = ‘127.0.0.1‘ 4 PORT = 50008 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 7 s.connect((HOST, PORT)) 8 times = 3 9 while times>0: 10 cmd = input("向服务器发送数据:") 11 s.sendall(cmd.encode("utf-8")) 12 data = s.recv(1024) 13 print ("接收到服务器端的数据:", data.decode("utf-8")) 14 times -= 1 15 s.close() # 关闭连接
服务端定义一个传输内容得规则;客户端按照此内容进行传输;服务端按照此内容进行解析。
1 import socket, time, socketserver, struct, os, threading 2 3 # 固定的server启动流程 4 host = ‘127.0.0.1‘ 5 port = 12307 6 # 定义socket类型 7 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 8 # 绑定需要监听的ip和端口号 9 s.bind((host, port)) 10 s.listen(1) 11 12 13 # 定义一个线程的任务函数 14 def conn_thread(connection, address): 15 while True: 16 try: 17 connection.settimeout(600) 18 # struct.calcsize--->计算12个字符和1个长整型的数据有多长 19 #12个字符对应的是客户端发来的文件名,1个长整型对应的是文件内容的长度大小 20 fileinfo_size = struct.calcsize(‘36sl‘) # 12s表示12个字符,l表示一个长整型数 21 buf = connection.recv(fileinfo_size) # 正好接收12个字符长度和一个整数的数据 22 # 如果不加这个if,第一个文件传输完成后会自动走到下一句并阻塞,需要拿到文件大小信息才可以继续执行 23 if buf: 24 filename, filesize = struct.unpack(‘36sl‘, buf) 25 filename_f = filename.decode("utf-8").strip(‘\00‘) # C语言中‘\0‘是一个ASCII码为0的字符,在python中表示占一个位置的空字符 26 print("****filename:",filename_f) 27 # 拿到文件名之后,我们要拼接一个新的文件绝对路径,在服务器上保存下来这个问题件 28 filenewname = os.path.join(‘e:\\‘, os.path.basename(filename_f)) 29 print(u‘文件名称: %s , 文件大小: %s‘ % (filenewname, filesize)) 30 recvd_size = 0 # 收的文件内容有多大了 31 file = open(filenewname,‘wb‘) 32 print(u"开始传输文件内容") 33 while not recvd_size == filesize: # 如果收到的文件长度不等于文件真实长度,就一直循环收 34 if filesize - recvd_size > 1024: # 文件大小和已经收的大小相差大于1024字节 35 rdata = connection.recv(1024) # 每次收1024个字节的内容 36 recvd_size += len(rdata) # 收到的长度在recvd_size累加 37 else: 38 # 用实际的文件大小减去已经收的差值去收(收最后剩余的大小) 39 rdata = connection.recv(filesize-recvd_size) 40 recvd_size = filesize # 把实际收到的长度相加,recvd_size == filesize 41 file.write(rdata) # 把文件的内容写进去 42 file.close() 43 print(‘receive done‘) 44 # connection.close() 45 except socket.timeout: 46 connection.close() 47 48 while True: 49 print(u"开始进入监听状态") 50 connection, address = s.accept() 51 print(‘Connected by ‘, address) 52 # 起一个子线程去收文件 53 thread = threading.Thread(target=conn_thread, args=(connection, address)) 54 thread.start() 55 thread.join() # 阻塞,等都收完了才会关掉连接 56 # s.close()
1 import socket, os, struct 2 3 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 s.connect((‘127.0.0.1‘, 12307)) 5 while True: 6 filepath = input(‘请输入要传输的文件绝对路径:‘) 7 print(type(filepath)) 8 print(len(filepath.encode("utf-8"))) 9 if os.path.isfile(filepath): # 判断文件在不在,在就传输。 10 #fileinfo_size = struct.calcsize(‘36sl‘) # 定义打包规则(需与服务器端规则一致) 11 # 定义文件头信息,包含文件名和文件大小 12 fhead = struct.pack(‘36sl‘, filepath.encode("utf-8"), os.stat(filepath).st_size) 13 print(os.stat(filepath).st_size) 14 s.send(fhead) 15 print (u‘文件路径:‘, filepath) 16 # with open(filepath,‘rb‘) as fo: 这样发送文件有问题,发送完成后还会发一些东西过去 17 fo = open(filepath, ‘rb‘) 18 while True: 19 filedata = fo.read(1024) 20 if not filedata: 21 break 22 s.send(filedata) 23 fo.close() 24 print (u‘传输成功‘) 25 # s.close()
Socket 编程在模块创建时无法进行多进程的处理,当有大量请求时,请求就会阻塞在队列中,甚至发生请求丢弃,如果需要大量 socket 就需要许多的 socket 绑定端口,写很多重复性得代码。
SocketServer 简化了网络服务器的编写。在进行 socket 创建时,使用 SocketServer 会大大减少创建的步骤,并且 SocketServer 使用了 select,它有4个类:TCPServer、UDPServer、UnixStreamServer和UnixDatagramServer,这4个类是同步进行处理的,另外通过 ForkingMixIn 和 ThreadingMixIn 类来支持异步。
ForkingMixIn 和 ThreadingMixIn 两个混合类,它们都提供 Server 类中 process_request 方法的新实现,前者在处理每次用户连接的时候都会开启新的进程,而后者会开启新的线程。想要让Server类实现并发处理,只用利用多继承即可,或者直接使用已经混合好的类。
使用步骤:
集成 ThreadingMixIn 类时需要处理异常关闭。daemon_threads 指示服务器是否要等待线程终止,要是线程互相独立,必须要设置为 True,默认是 False。
多个连接同时到达服务器端的时候,主进程都会生成一个子进程专门处理此连接,而主进程则依旧保持监听状态。
因主进程和子进程是同时进行的,所以不会阻塞新的连接。但由于创建进程所消耗的资源比较大,这种处理方式在有大量连接时会带来性能问题。
1 from socketserver import TCPServer, ForkingMixIn, StreamRequestHandler 2 import time 3 4 5 # 自定义服务器类 6 class Server(ForkingMixIn, TCPServer): 7 pass 8 9 # 处理请求的程序类 10 class MyHandler(StreamRequestHandler): 11 12 # 重写父类的handle函数 13 def handle(self): 14 addr = self.request.getpeername() # 获得客户端的地址 15 print(‘接收到连接:‘, addr) # 打印客户端地址 16 data = self.rfile.readline().strip() # 客户端发送的信息必须带有回车,否则会一直等待客户端继续发送数据 17 print("从客户端接收到的请求:", data.decode("utf-8")) 18 time.sleep(1) 19 if data: 20 self.wfile.write(‘这是从服务端进程中发出的消息‘.encode("utf-8")) # 给客户端发送信息 21 22 23 host = "" 24 port = 18001 25 # 实例化一个服务器类,传入服务器的地址和请求处理的程序类 26 server = Server((host, port), MyHandler) 27 print("开始监听状态...") 28 # 开始侦听并处理连接 29 server.serve_forever()
1 if __name__ == ‘__main__‘: 2 import socket 3 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sock.connect((‘127.0.0.1‘, 18001)) 5 import time 6 time.sleep(2) 7 sock.send(‘ls -al /home/wxh‘.encode("utf-8")+"\n".encode("utf-8")) 8 print (sock.recv(1024).decode("utf-8")) 9 sock.close()
[gr@gloryroad juno]$ python3 server.py 开始监听状态... 接收到连接: (‘127.0.0.1‘, 42444) 从客户端接收到的请求: ls -al /home/wxh 接收到连接: (‘127.0.0.1‘, 42446) 从客户端接收到的请求: ls -al /home/wxh
线程是一种轻量级的进程,比 Fork 消耗的资源更少,而且主线程和子线程之间共享相同的内存空间,处理效率高。但大量的使用线程会带来线程之间的数据同步问题,处理不好可能使服务程序失去响应。
以下示例与 Fork 方式中代码基本相同,仅仅是采用的 ThreadingMixIn 类不同。
1 from socketserver import TCPServer, ThreadingMixIn, StreamRequestHandler 2 import time 3 4 5 # 自定义服务器类 6 class Server(ThreadingMixIn, TCPServer): 7 pass 8 9 # 处理请求的程序类 10 class MyHandler(StreamRequestHandler): 11 12 # 重写父类的handle函数 13 def handle(self): 14 addr = self.request.getpeername() # 获得客户端的地址 15 print(‘接收到连接:‘, addr) # 打印客户端地址 16 data = self.rfile.readline().strip() # 客户端发送的信息必须带有回车,否则会一直等待客户端继续发送数据 17 print("从客户端接收到的请求:", data.decode("utf-8")) 18 time.sleep(1) 19 if data: 20 self.wfile.write(‘这是从服务端线程中发出的消息‘.encode("utf-8")) # 给客户端发送信息 21 22 23 host = ‘‘ 24 port = 18001 25 # 实例化一个服务器类,传入服务器的地址和请求处理的程序类 26 server = Server((host, port), MyHandler) 27 print("开始监听状态...") 28 # 开始侦听并处理连接 29 server.serve_forever()
1 if __name__ == ‘__main__‘: 2 import socket 3 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sock.connect((‘127.0.0.1‘, 18001)) 5 import time 6 time.sleep(2) 7 sock.send(‘ls -al /home/wxh‘.encode("utf-8")+"\n".encode("utf-8")) 8 print (sock.recv(1024).decode("utf-8")) 9 sock.close()
1 import socketserver 2 import threading 3 4 # 自定义任务线程类 5 class MyTCPHandler(socketserver.BaseRequestHandler): 6 # 重写 handle 方法 7 def handle(self): 8 while True: 9 print("接收到连接: ", self.client_address) 10 self.data = self.request.recv(1024).strip() 11 cur_thread = threading.current_thread() 12 print("当前线程: ", cur_thread) 13 if not self.data: 14 print("客户端[%s]退出!" % self.client_address[0]) 15 break 16 print("客户端[%s]请求数据: %s" % (self.client_address[0], self.data.decode("utf-8"))) 17 self.request.sendall(self.data.upper()) 18 19 if __name__ == "__main__": 20 HOST, PORT = "", 18001 21 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) 22 print("开始监听状态...") 23 server.serve_forever()
1 if __name__ == ‘__main__‘: 2 import socket 3 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4 sock.connect((‘127.0.0.1‘, 18001)) 5 import time 6 time.sleep(2) 7 sock.send(‘ls -al /home/wxh‘.encode("utf-8")+"\n".encode("utf-8")) 8 print (sock.recv(1024).decode("utf-8")) 9 sock.close()
开始监听状态... 接收到连接: (‘127.0.0.1‘, 17215) 当前线程: <Thread(Thread-1, started 9620)> 客户端[127.0.0.1]请求数据: ls -al /home/wxh 接收到连接: (‘127.0.0.1‘, 17215) 当前线程: <Thread(Thread-1, started 9620)> 客户端[127.0.0.1]退出! 接收到连接: (‘127.0.0.1‘, 17216) 当前线程: <Thread(Thread-2, started 9496)> 客户端[127.0.0.1]请求数据: ls -al /home/wxh 接收到连接: (‘127.0.0.1‘, 17216) 当前线程: <Thread(Thread-2, started 9496)> 客户端[127.0.0.1]退出!
在 python 中,select 函数是一个对底层操作系统的直接访问的接口。它用来监控 sockets、files 和 pipes,等待IO完成(Waiting for I/O completion)。当有可读、可写或是异常事件产生时,select 可以很容易的监控到。
Select 模块在 windows、Unix 和 Linux 下均可使用,但在 windows 下,select 只能用于处理 socket。
Select 模块支持 C 中常用的 I/O 复用:
文件描述符:内核(kernel)利用文件描述符(file descriptor)来访问文件。文件描述符是非负整数。打开现存文件或新建文件时,内核会返回一个文件描述符。
I/O 复用是在单线程模式下实现多线程的效果,即实现一个多 I/O 并发的效果。
进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。
当调用 select() 时:
select.select(rlist, wlist, xlist[, timeout]) 参数含义:
其返回1个 tuple,分别是3个准备好的对象列表,它和前边的参数是一样的顺序。
1 import socket 2 import select 3 4 s = socket.socket() 5 s.bind(("127.0.0.1", 8888)) 6 s.listen(5) 7 8 # 要监控的对象列表 9 r_list = [s, ] 10 num = 0 11 12 # server的死循环 13 while True: 14 print("监听中...") 15 16 # 第1个参数 r_list:可读的对象 17 # 第2个参数:可写的对象(基本不用) 18 # 第3个参数:出现异常的对象(基本不用) 19 # 这三个参数内容都是被操作系统监控的: 20 # 1)当有事件发生时,立马往下执行代码;否则阻塞监控10秒 21 # 2)若监控10秒了仍无事件发生,才往下执行 22 rl, wl, error = select.select(r_list, [], [], 10) 23 # rl 列表一开始为空,只有当s发生事件了(收到连接请求),才会将s加到rl中 24 25 num += 1 26 print("执行次数:%s" % num) 27 print("rl‘s length is: %s" % len(rl)) 28 print("r_list‘s length: %s" % len(r_list)) 29 print("r1中的对象:", [i for i in rl]) 30 31 # 只有发生两种事件(有新连接或收到数据)时,rl列表中才会有对象元素,for循环才会往下执行 32 for fd in rl: 33 # 如果发生事件的对象是服务器端对象(s),则代表有新客户端连接 34 if fd == s: 35 conn, addr = fd.accept() # 建立与客户端的连接 36 r_list.append(conn) # 将连接对象放到监听列表r_list中 37 # 只有当客户端断开连接(close)了,conn才会从r_list中剔除 38 msg = conn.recv(200) # 接收客户端的数据 39 # 把收到的数据变大写返回给客户端 40 conn.sendall(("first received data: %s" % msg.upper()).encode("utf-8")) 41 print("给客户端返回数据:", ("first responsed data: %s" % msg.upper().decode("utf-8"))) 42 # s处理完后,则从rl中剔除了 43 # 如果发生事件的对象是连接对象(conn),则代表收到客户端请求数据 44 else: 45 try: 46 msg = fd.recv(200) 47 fd.sendall(("else received data: %s" % msg.upper()).encode("utf-8")) 48 print("给客户端返回数据:", ("else responsed data: %s" % msg.upper().decode("utf-8"))) 49 except (ConnectionAbortedError, ConnectionResetError): 50 r_list.remove(fd) 51 conn.close() 52 # conn处理完后,则从rl中剔除了 53 s.close()
1 import socket 2 3 HOST = ‘127.0.0.1‘ 4 PORT = 8888 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 7 s.connect((HOST, PORT)) 8 for i in range(4): 9 cmd = input("向服务器发送数据:") 10 s.sendall(cmd.encode("utf-8")) 11 data = s.recv(1024) 12 print ("接收到服务器端的数据:", data.decode("utf-8")) 13 s.close() # 关闭连接
服务器端:
E:\>python server.txt 监听中... 执行次数:1 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中... 执行次数:2 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中... 执行次数:3 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中... 执行次数:4 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中... 执行次数:5 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中... 执行次数:6 rl‘s length is: 1 r_list‘s length: 1 r1中的对象: [<socket.socket fd=504, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888)>] 给客户端返回数据: first responsed data: HELLO 1 监听中... 执行次数:7 rl‘s length is: 0 r_list‘s length: 2 r1中的对象: [] 监听中... 执行次数:8 rl‘s length is: 1 r_list‘s length: 2 r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 18401)>] 给客户端返回数据: else responsed data: HELLO 2 监听中... 执行次数:9 rl‘s length is: 0 r_list‘s length: 2 r1中的对象: [] 监听中... 执行次数:10 rl‘s length is: 1 r_list‘s length: 2 r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 18401)>] 给客户端返回数据: else responsed data: HELLO 3 监听中... 执行次数:11 rl‘s length is: 1 r_list‘s length: 2 r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 18401)>] 给客户端返回数据: else responsed data: HELLO 4 监听中... 执行次数:12 rl‘s length is: 1 r_list‘s length: 2 r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 18401)>] 给客户端返回数据: else responsed data: 监听中... 执行次数:13 rl‘s length is: 1 r_list‘s length: 2 r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 18401)>] 监听中... 执行次数:14 rl‘s length is: 0 r_list‘s length: 1 r1中的对象: [] 监听中...
客户端:
E:\>python client.txt 向服务器发送数据:hello 1 接收到服务器端的数据: first received data: b‘HELLO 1‘ 向服务器发送数据:hello 2 接收到服务器端的数据: else received data: b‘HELLO 2‘ 向服务器发送数据:hello 3 接收到服务器端的数据: else received data: b‘HELLO 3‘ 向服务器发送数据:hello 4 接收到服务器端的数据: else received data: b‘HELLO 4‘
标签:xstream abort readline net 定义 模块 reads 特权 计算
原文地址:https://www.cnblogs.com/juno3550/p/14052221.html