标签:使用 自己的 怎么 rpc odi 图片 工作流 client ISE
在上篇文章解决了 TCP 和 UDP 并发的问题,是通过一个 socketserver模块实现的,那么这个模块到底做了什么可以实现并发的效果呢?
进入 socketserver 中查看注释:
翻译如下:
通用套接字服务类
该模块试图捕获定义服务器的各个方面
基于套接字的服务器
基于请求的服务器
在进一步查看请求之前进行客户地址验证,(这实际上是一个钩子,用于任何需要在进行其他操作之前查看请求的处理)
如何处理多个请求:
? 同步(一次处理一个请求)
? 分叉(每个请求由一个新进程处理)
? 线程化(每个请求由一个新线程处理)
这个模块的类喜欢编写最简单的服务器类型:一个同步的 TCP/IP 服务器.这是糟糕的设计,但是节省了一些输入.(还有一个问题是,深度类层次结构会减慢方法查找的速度)
在继承关系图中有五个类,其中四个表示四种类型的同步服务器:
注意UnixDatagramServer 继承于 UDPServer, 而不是 UnixStreamServer--IP 和 Unix 服务器之间的唯一区别是地址族,这在两个 Unix 服务器类中都是重复的.
可以使用FrokingMixIn 和ThreadingMixIn 混合类创建每种类型服务器的进程和线程版本.例如,创建一个线程 UDP 服务器如下所示:
class ThreadingUDPServer(ThreadingMixIn, UDPServer):
pass
混合类必须首先出现,因为它覆盖了 UDPServer 中定义的方法,设置各种成员变量还会改变底层服务器机制的行为.
要实现服务,必须从 BaseRequestHandler 派生一个类,并重新定义它的 handle() 方法.然后,你可以通过将一个服务器类与请求处理程序类组合在一起来运行服务的不同版本.
对于数据报或流式服务器,请求处理程序类必须是不同的.这可以通过使用请求处理程序子类 StreamRequestHandler 或 DatagramRequestHandler 类来隐藏.
当然,你还得动动脑子.
例如,如果服务在内存中包含可以由请求修改的状态,那么使用进程服务器是没有意义的(因为子进程中的修改永远不会达到父进程中保留并传递给每个子进程的初始状态).在这种情况下,你可以使用线程化服务器,但是你可能必须使用锁来避免几乎同时出现的两个请求,以对服务器状态应用冲突更改.
另一方面,如果你正在构建一个 HTTP 服务器,其中所有数据都存储在外部(例如文件系统中),同步类本质上是在处理一个请求时使服务‘听不见‘,如果客户端读取其请求的所有数据很慢,那么这种情况可能会持续很长时间,在这里,线程或进程服务器是合适的.
在某些情况下,同步的处理请求的一部分可能是合适的,但根据请求数据在进程子节点中完成处理可能是合适的.这可以通过使用同步服务器和在请求处理程序类 handle() 方法中执行显式进城来实现.
处理多个同时请求的另一种方法既不支持线程也不支持进程的环境(这些环境太昂贵或不适合服务)是维持一个部分完成的请求的显式表和使用选择器决定下一个要处理的请求(或者是否处理新的请求)对对流服务尤为重要.每个客户端可能长时间连接的地方.(如果线程或子线程不能使用)
未来的工作:
? Sun RPC 的标准类(使用 TCP 或 UDP)
? 标准混合类实现各种身份验证
总结: 上面的翻译是 socketserver 的注释翻译,能够是我们大致了解该模块的工作流程.那么当我们自定义自己的类是如何进行实例化得呢?
该模块分为两大类: server 类(解决连接问题)和 request 类(解决通信问题)
server 类继承关系:
request 类继承关系:
总结继承关系:
以下述代码为例,分析 socketserver 源码:
服务端
import socketserver
import json
import struct
import os
class FtpServer(socketserver.BaseRequestHandler):
coding = ‘utf-8‘
server_dir = ‘file_upload‘
max_packet_size = 1024
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def handle(self):
print(self.request)
while True:
data = self.request.recv(4)
data_len = struct.unpack(‘i‘, data)[0]
head_json = self.request.recv(data_len).decode(self.coding)
head_dic = json.loads(head_json)
cmd = head_dic[‘cmd‘]
if hasattr(self, cmd):
func = getattr(self, cmd)
func(head_dic)
def put(self, args):
file_path = os.path.normpath(os.paht.join(
self.BASE_DIR,
self.server_dir,
args[‘filename‘]))
filesize = args[‘filesize‘]
recv_size = 0
print(‘---->‘, file_path)
with open(file_path, ‘wb‘) as f:
while recv_size < filesize:
recv_data = self.request.recv(self.max_packet_size)
f.write(recv_data)
recv_size += len(recv_size)
print(‘recvsize: %s filessize: %s‘ % (recv_size, filesize))
ftpserver = socketserver.ThreadingTCPServer((‘‘, 8080), FtpServer)
ftpserver.serve_forever()
**客户端*8
import socket
import struct
import json
import os
class MyTCPClient:
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
allow_reuse_address = False
max_packet_size = 8192
coding = ‘utf-8‘
request_queue_size = 5
def __init__(self, server_address, connect=True):
self.server_address = server_address
self.socket = socket.socket(self.address_family,
self.socket_type)
if connect:
try:
self.client_connect()
except:
self.client_close()
raise
def client_connect(self):
self.socket.connect(self.server_address)
def client_close(self):
self.socket.close()
def run(self):
while True:
inp = input(‘>>>:‘).strip()
if not inp:
continue
l = inp.split()
cmd = l[0]
if hasattr(self, cmd):
func = getattr(self, cmd)
func(l)
def put(self, args):
cmd = args[0]
filename = args[1]
if not os.path.isfile(filename):
print(‘file: %s is not exist‘ % filenam)
return
else:
filesize = os.path.getsize(filename)
head_dic = {‘cmd‘: cmd, ‘filename‘: os.path.basename(filename), ‘filesize‘: filesize}
print(head_dic)
head_json = json.dumps(head_dic)
head_json_bytes = bytes(head_json, encoding=self.coding)
head_struct = struct.pack(‘i‘, len(head_json_bytes))
self.scoket.send(head_struct)
self.socket.send(head_json_bytes)
send_size = 0
with open(filename, ‘rb‘) as f:
for line im f:
self.socket.send(line)
send_size += len(line)
print(send_size)
else:
print(‘upload successful‘)
client = MyTCPClient((‘‘, 8080))
client.run()
以上述代码为例,分析 socketserver源码:
ftpserver = socketserver.ThreadingTCPServer((‘‘, 8080))
ftpserver.serve_forever()
查找属性的顺序可以从上面第三个继承图得到答案:
ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
源码分析总结:
基于 TCP 的 socketserver 定义出的类:
基于 UDP 的 socketserver 定义出的类:
本文查看了一下 socketserver 模块的源码,大致的实例化对象的步骤和主要运行流程差不多搞清楚了,但是有些地方还是需要仔细琢磨的,比如第五步怎么一下就到开启多线程实现并发了.
总的来说,查看源码还是有点用的,而且该模块的继承也对之前学习的继承知识进行了补充,特别是根据继承解析出来的( MRO)列表,这个列表是类中属性的查找顺序表,和是否直接继承没有关系.
标签:使用 自己的 怎么 rpc odi 图片 工作流 client ISE
原文地址:https://www.cnblogs.com/zuanzuan/p/9943477.html