标签:any 固定 地方法 inf def 优点 进一步 老版本 break
上节回顾:5种IO模型 | IO多路复用 and 万物互联之~网络编程加强篇
官方文档:https://docs.python.org/3/library/internet.html
画一张图来通俗化讲讲TCP三次握手:
用代码来说,大概过程就是:
画图通俗讲下TCP四次挥手:
用代码来说,大概过程就是:
其实这个也很好的解释了之前的端口占用问题,如果是服务端先断开连接,那么服务器就是四次挥手的发送方,
最后一次消息是得不到回复的,端口就会保留一段时间
(服务端的端口固定)也就会出现端口占用的情况。如果是客户端先断开,那下次连接会自动换个端口,不影响(客户端的端口是随机分配的)
PS:之前我们讲端口就是send
一个空消息,很多人不是很清楚,这边简单验证下就懂了:
之前其实已经写了个简版的Web服务器了,简单回顾下流程:
TCP三次握手
send
一个http的请求报文,服务器接recv
之后进行相应的处理并返回对应的页面client close
),进行了TCP四次挥手
然后简单说下HTTP状态码:
baidu.com
=302=> www.baidu.com
404 not found
500 Server Error
WSGI
)¶我们先自己定义一个动态服务器:
import re
import socket
class HTTPServer(object):
def __init__(self):
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind((‘‘, 8080))
tcp_server.listen()
while True:
self.client_socket, self.client_address = tcp_server.accept()
self.handle()
def response(self, status, body=None):
print(status)
header = f"HTTP/1.1 {status}\r\n\r\n"
with self.client_socket:
self.client_socket.send(header.encode("utf-8"))
if body:
self.client_socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handle(self):
with self.client_socket:
print(f"[来自{self.client_address}的消息:]\n")
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
print(header)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.groups(1)[0]
# Python三元表达式(之前好像忘说了)
url = "/index.html" if url == "/" else url
print("请求url:", url)
body = str()
# 动态页面
if ".py" in url:
# 提取模块名(把开头的/和.py排除)
body = self.__dynamic_handler(url[1:-3])
else: # 静态服务器
body = self.__static_handler(url)
# 根据返回的body内容,返回对应的响应码
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else: # 匹配不到url(基本上不会发生,不排除恶意修改)
self.response((404, "404 Not Found"))
if __name__ == "__main__":
import sys
# 防止 __import__ 导入模块的时候找不到,忘了可以查看:
# https://www.cnblogs.com/dotnetcrazy/p/9253087.html#5.自己添加模块路径
sys.path.insert(1, "./www/bin")
HTTPServer()
效果:
代码不难其中有个技术点说下:模块名为字符串怎么导入
?
# test.py
# 如果模块名是字符串,需要使用__import__
s = "time"
time = __import__(s)
def application():
return time.ctime() # 返回字符串
if __name__ == "__main__":
time_str = application()
print(type(time_str))
print(time_str)
输出:
<class ‘str‘>
Thu Dec 20 22:48:07 2018
和上面基本一样,多了个路由表(self.router_urls
)而已
import re
import socket
class HttpServer(object):
def __init__(self):
# 路由表
self.router_urls = {"/test": "/test.py", "/user": "/test2.py"}
def run(self):
with socket.socket() as server:
# 端口复用
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
print(f"[{self.client_address}已上线]")
self.handler()
def response(self, status, body=None):
with self.client_socket as socket:
header = f"HTTP/1.1 {status}\r\n\r\n"
socket.send(header.encode("utf-8"))
if body:
socket.send(body)
def __static_handler(self, name):
try:
with open(f"./www{name}", "rb") as fs:
return fs.read()
except Exception as ex:
print(ex)
return None
def __dynamic_handler(self, name):
try:
m = __import__(name)
return m.application().encode("utf-8")
except Exception as ex:
print(ex)
return None
def handler(self):
data = self.client_socket.recv(2048)
if data:
header, _ = data.decode("utf-8").split("\r\n", 1)
# GET /xxx HTTP/1.1
ret = re.match("^\w+? (/[^ ]*) .+$", header)
if ret:
url = ret.group(1)
print(url) # print url log
body = None
# 路由有记录:动态页面
if url in self.router_urls.keys():
url = self.router_urls[url]
# 切片提取模块名
body = self.__dynamic_handler(url[1:-3])
else: # 静态服务器
if url == "/":
url = "/index.html"
body = self.__static_handler(url)
# 没有这个页面或者出错
if body:
self.response("200 ok", body)
else:
self.response("404 Not Found")
else:
# 404
self.response("404 Not Found")
else:
print(f"{self.client_address}已下线")
self.client_socket.close()
if __name__ == "__main__":
import sys
# 临时添加模块所在路径
sys.path.insert(1, "./www/bin")
HttpServer().run()
输出:
看一眼test2.py
:
# test2.py
def application():
return "My Name Is XiaoMing"
if __name__ == "__main__":
print(application())
官方文档:https://docs.python.org/3/library/wsgiref.html
其实Python官方提供了一个WSGI:Web Server Gateway Interface
的约定:
它只要求Web开发者实现一个
application
函数,就可以响应HTTP请求
eg:(只要对应的python文件提供了 application(env,start_response)
方法就行了)
# hello.py
# env 是一个字典类型
def application(env, start_response):
# 设置动态页面的响应头(回头服务器会再加上自己的响应头)
# 列表里面的 item 是 tuple
start_response("200 OK", [("Content-Type", "text/html")])
# 返回一个列表
return ["<h1>This is Test!</h1>".encode("utf-8")]
先使用官方的简单服务器看看:
from wsgiref.simple_server import make_server
# 导入我们自己编写的application函数:
from hello import application
# 创建一个服务器,端口是8080,处理函数是application:
httpd = make_server(‘‘, 8080, application)
print(‘Serving HTTP on port 8080...‘)
# 开始监听HTTP请求:
httpd.serve_forever()
运行后效果:127.0.0.1:8080
This is Test!
如果把hello.py
改成下面代码(服务端不变),那么就可以获取一些请求信息了:
def application(env, start_response):
print(env["PATH_INFO"])
start_response("200 OK", [("Content-Type", "text/html")])
return [f‘<h1>Hello, {env["PATH_INFO"][1:] or "web"}!</h1>‘.encode("utf-8")]
输出:
上面的application()
函数就是符合WSGI
标准的一个HTTP处理函数
,它接收两个参数:
environ
:一个包含所有HTTP请求信息的dict
对象;start_response
:一个发送HTTP响应的函数(调用服务器定义的方法)
Header
只能发送一次 ==> 只能调用一次start_response()
函数有了WSGI
,我们关心的就是如何从env
这个dict
对象拿到HTTP请求信息
,然后构造HTML
,通过start_response()
发送Header
,最后返回Body内容
Python内置了一个WSGI
服务器,这个模块叫wsgiref
,它是用纯Python
编写的WSGI
服务器的参考实现(完全符合WSGI
标准,但是不考虑任何运行效率,仅供开发和测试使用)
PS:这样的好处就是,只要符合WSGI
规范的服务器,我们都可以直接使用了
其实通过源码就可以知道这个WSGIServer
到底是何方神圣了:
class WSGIServer(HTTPServer):
pass
# HTTPServer其实就是基于TCPServer
class HTTPServer(socketserver.TCPServer):
pass
# 这个就是我们开头说的Python封装的简单WebServer了
class TCPServer(BaseServer):
pass
如果还是记不得可以回顾下上次说的内容,提示:
__all__ = ["BaseServer", "TCPServer", "UDPServer",
"ThreadingUDPServer", "ThreadingTCPServer",
"BaseRequestHandler", "StreamRequestHandler",
"DatagramRequestHandler", "ThreadingMixIn"]
如果你想要在这个基础上进行处理,可以和上面说的一样,定义一个继承class WSGIRequestHandler(BaseHTTPRequestHandler)
的类,然后再处理
在本小节结束前我们模仿一下示例,定义一个符合WSGI
规范的简单服务器:
import re
import socket
from index import WebFrame
class WSGIServer(object):
def __init__(self):
# 请求头
self.env = dict()
# 存放处理后的响应头
self.response_headers = str()
def run(self):
with socket.socket() as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("", 8080))
server.listen()
while True:
self.client_socket, self.client_address = server.accept()
self.handler()
# 转换浏览器请求头格式
def request_headers_handler(self, headers):
# 过滤一下空字符串(不能过滤空列表)
headers = list(filter(None, headers.split("\r\n")))
# 提取 Method 和 Url
ret = re.match("^([\w]+?) (/[^ ]*?) .+$", headers[0])
if ret:
self.env["method"] = ret.group(1)
url = ret.group(2)
print(url)
self.env["path"] = "/index.html" if url == "/" else url
else:
return None
# [[‘Host‘, ‘ localhost:8080‘], [‘Connection‘, ‘ keep-alive‘]...]
array = map(lambda item: item.split(":", 1), headers[1:])
for item in array:
self.env[item[0].lower()] = item[1]
# print(self.env)
return "ok"
# 响应客户端(吐槽一下,request和response的headers为毛格式不一样,这设计真不合理!)
def start_response(self, status, header_list=[]):
# 响应头
self.response_headers = f"HTTP/1.1 {status}\r\n"
for item in header_list:
self.response_headers += f"{item[0]}:{item[1]}\r\n"
# print(self.response_headers)
# 响应浏览器
def response(self, body):
with self.client_socket as client:
# 省略一系列服务器响应的headers
self.response_headers += "server:WSGIServer\r\n\r\n"
client.send(self.response_headers.encode("utf-8"))
if body:
client.send(body)
def handler(self):
with self.client_socket as client:
data = client.recv(2048)
if data:
# 浏览器请求头
headers = data.decode("utf-8")
if self.request_headers_handler(headers):
# 模仿php所有请求都一个文件处理
body = WebFrame().application(self.env,
self.start_response)
# 响应浏览器
self.response(body)
else:
self.start_response("404 Not Found")
else:
client.close()
if __name__ == "__main__":
WSGIServer().run()
自己定义的框架:
class WebFrame(object):
def __init__(self):
# 路由表
self.router_urls = {"/time": "get_time", "/user": "get_name"}
def get_time(self):
import time
return time.ctime().encode("utf-8")
def get_name(self):
return "<h2>My Name Is XiaoMing</h2>".encode("utf-8")
def application(self, env, start_response):
body = b""
url = env["path"]
# 请求的页面都映射到路由对应的方法中
if url in self.router_urls.keys():
func = self.router_urls[url]
body = getattr(self, func)()
else:
# 否则就请求对应的静态资源
try:
with open(f"./www{url}", "rb") as fs:
body = fs.read()
except Exception as ex:
start_response("404 Not Found")
print(ex)
return b"404 Not Found" # 出错就直接返回了
# 返回对应的页面响应头
start_response("200 ok", [("Content-Type", "text/html"),
("Scripts", "Python")])
return body
输出:
知识扩展:
从wsgiref模块导入
https://docs.python.org/3/library/wsgiref.html
Python服务器网关接口
https://www.python.org/dev/peps/pep-3333/
Python原始套接字和流量嗅探
https://blog.csdn.net/cj1112/article/details/51303021
https://blog.csdn.net/peng314899581/article/details/78082244
【源码阅读】轻量级Web框架:bottle
https://github.com/bottlepy/bottle
上篇回顾:万物互联之~深入篇
其他专栏最新篇:协程加强之~兼容答疑篇 | 聊聊数据库~SQL环境篇
Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/
RPC
(Remote Procedure Call
):分布式系统常见的一种通信方法(远程过程调用),通俗讲:可以一台计算机的程序调用另一台计算机的子程序(可以把它看成之前我们说的进程间通信,只不过这一次的进程不在同一台PC上了)
PS:RPC
的设计思想是力图使远程调用中的通讯细节对于使用者透明,调用双方无需关心网络通讯的具体实现
引用一张网上的图:
和HTTP
有点相似,你可以这样理解:
HTTP/1.0
是短链接,而RPC
是长连接进行通信
HTTP/1.1
支持了长连接(Connection:keep-alive
),基本上和RPC
差不多了
keep-alive
一般都限制有最长时间,或者最多处理的请求数,而RPC
是基于长连接的,基本上没有这个限制HTTP/2.0
建立了gRPC
,它们之间的基本上也就差不多了
HTTP-普通话
和RPC-方言
的区别了RPC
和HTTP
调用不用经过中间件,而是端到端的直接数据交互
Socket
实现的(RPC
、HTTP
都是Socket
的读写操作)简单概括一下RPC
的优缺点就是:
PS:HTTP更多是Client
与Server
的通讯;RPC
更多是内部服务器间的通讯
上面说这么多,可能还没有来个案例实在,我们看个案例:
本地调用sum()
:
def sum(a, b):
"""return a+b"""
return a + b
def main():
result = sum(1, 2)
print(f"1+2={result}")
if __name__ == "__main__":
main()
输出:(这个大家都知道)
1+2=3
官方文档:
https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html
都说RPC
用起来就像本地调用一样,那么用起来啥样呢?看个案例:
服务端:(CentOS7:192.168.36.123:50051
)
from xmlrpc.server import SimpleXMLRPCServer
def sum(a, b):
"""return a+b"""
return a + b
# PS:50051是gRPC默认端口
server = SimpleXMLRPCServer((‘‘, 50051))
# 把函数注册到RPC服务器中
server.register_function(sum)
print("Server启动ing,Port:50051")
server.serve_forever()
客户端:(Win10:192.168.36.144
)
from xmlrpc.client import ServerProxy
stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")
输出:(Client
用起来是不是和本地差不多?就是通过代理访问了下RPCServer
而已)
1+2=3
PS:CentOS
服务器不是你绑定个端口就一定能访问的,如果不能记让防火墙开放对应的端口
这个之前在说MariaDB
环境的时候有详细说:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4
# 添加 --permanent永久生效(没有此参数重启后失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent
zeroRPC用起来和这个差不多,也简单举个例子吧:
把服务的某个方法注册到RPCServer
中,供外部服务调用
import zerorpc
class Test(object):
def say_hi(self, name):
return f"Hi,My Name is{name}"
# 注册一个Test的实例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()
调用服务端代码:
import zerorpc
client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)
看了上面的引入案例,是不是感觉RPC
不过如此?NoNoNo,要是真这么简单也就谈不上RPC架构
了,上面两个是最简单的RPC服务了,可以这么说:生产环境基本上用不到,只能当案例练习罢了,对Python来说,最常用的RPC就两个gRPC
and Thrift
PS:国产最出名的是Dubbo
and Tars
,Net最常用的是gRPC
、Thrift
、Surging
要自己实现一个RPC Server
那么就得了解整个流程了:
Client
(调用者)以本地调用的方式发起调用RPC
服务进行远程过程调用(RPC的目标就是要把这些步骤都封装起来,让使用者感觉不到这个过程)RPC Proxy
组件收到调用后,负责将被调用的方法名、参数
等打包编码成自定义的协议RPC Proxy
组件在打包完成后通过网络把数据包发送给RPC Server
RPC Proxy
组件把通过网络接收到的数据包按照相应格式进行拆包解码
,获取方法名和参数RPC Proxy
组件根据方法名和参数进行本地调用RPC Server
(被调用者)本地执行后将结果返回给服务端的RPC Proxy
RPC Proxy
组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy
组件RPC Proxy
组件收到数据包后,进行拆包解码,把数据返回给Client
Client
(调用者)得到本次RPC
调用的返回结果用一张时序图来描述下整个过程:
PS:RPC Proxy
有时候也叫Stub
(存根):(Client Stub,Server Stub)
为屏蔽客户调用远程主机上的对象,必须提供某种方式来模拟本地对象,这种本地对象称为存根(stub),存根负责接收本地方法调用,并将它们委派给各自的具体实现对象
PRC服务实现的过程中其实就两核心点:
Protocol Buffers
TCP/UDP/HTTP
)下面我们就根据上面的流程来手写一个简单的RPC:
1.Client调用:
# client.py
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
输出:
1+2=3
1.1+2=3.1
Wed Jan 16 22
2.Client Stub,客户端存根:(主要有打包
、解包
、和RPC服务器通信
的方法)
# client_stub.py
import socket
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def convert(self, obj):
"""根据类型转换成对应的类型编号"""
if isinstance(obj, int):
return 1
if isinstance(obj, float):
return 2
if isinstance(obj, str):
return 3
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议
格式:func:函数名@params:类型-参数,类型2-参数2...
"""
result = f"func:{func}"
if args:
params = ""
# params:类型-参数,类型2-参数2...
for item in args:
params += f"{self.convert(item)}-{item},"
# 去除最后一个,
result += f"@params:{params[:-1]}"
# print(result) # log 输出
return result.encode("utf-8")
def unpack(self, data):
"""解包:获取返回结果"""
msg = data.decode("utf-8")
# 格式应该是"data:xxxx"
params = msg.split(":")
if len(params) > 1:
return params[1]
return None
def get(self, func, args=None):
"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
data = self.pack(func, args)
# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
self.socket.send(data)
# 等待服务端返回结果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
简要说明下:(我根据流程在Code里面标注了,看起来应该很轻松)
之前有说到核心其实就是消息协议
and传输控制
,我客户端存根
的消息协议是自定义的格式(后面会说简化方案):func:函数名@params:类型-参数,类型2-参数2...
,传输我是基于TCP进行了简单的封装
3.Server端:(实现很简单)
# server.py
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服务端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口复用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客户端连接
client_socket, client_addr = self.socket.accept()
print(f"来自{client_addr}的请求:\n")
try:
# 交给服务端存根(Server Proxy)处理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer((‘‘, 50051), MyCode())
print("Server启动ing,Port:50051")
server.run()
为了简洁,服务端代码我单独放在了server_code.py
中:
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
4.然后再看看重头戏Server Stub
:
# server_stub.py
import socket
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def convert(self, num, obj):
"""根据类型编号转换类型"""
if num == "1":
obj = int(obj)
if num == "2":
obj = float(obj)
if num == "3":
obj = str(obj)
return obj
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
msg = data.decode("utf-8")
# 格式应该是"格式:func:函数名@params:类型编号-参数,类型编号2-参数2..."
array = msg.split("@")
func = array[0].split(":")[1]
if len(array) > 1:
args = list()
for item in array[1].split(":")[1].split(","):
temps = item.split("-")
# 类型转换
args.append(self.convert(temps[0], temps[1]))
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
# 格式:"data:返回值"
return f"data:{result}".encode("utf-8")
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
func = getattr(self.mycode, func, None)
if args:
return func(*args) # 解包
else:
return func() # 无参函数
def handle(self, client_socket, client_addr):
while True:
# 获取客户端发送的数据包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 执行无参函数
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 执行带参函数
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
data = self.pack(data) # 把函数执行结果按指定协议打包
# 把处理过的数据发送给客户端
client_socket.send(data)
else:
print(f"客户端:{client_addr}已断开\n")
break
再简要说明一下:里面方法其实主要就是解包
、执行函数
、返回值打包
输出图示:
再贴一下上面的时序图:
课外拓展:
HTTP1.0、HTTP1.1 和 HTTP2.0 的区别
https://www.cnblogs.com/heluan/p/8620312.html
简述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994
分布式基础—RPC
http://www.dataguru.cn/article-14244-1.html
下节预估:RPC服务进一步简化与演变、手写一个简单的REST接口
上篇回顾:万物互联之~RPC专栏 https://www.cnblogs.com/dunitian/p/10279946.html
之前有网友问,很多开源的RPC中都是使用路由表,这个怎么实现?
其实路由表实现起来也简单,代码基本上不变化,就修改一下server_stub.py
的__init__
和exe
两个方法就可以了:
class ServerStub(object):
def __init__(self, mycode):
self.func_dict = dict()
# 初始化一个方法名和方法的字典({func_name:func})
for item in mycode.__dir__():
if not item.startswith("_"):
self.func_dict[item] = getattr(mycode, item)
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
# func = getattr(self.mycode, func, None)
func = self.func_dict[func]
if args:
return func(*args) # 解包
else:
return func() # 无参函数
Python比较6的同志对上节课的Code肯定嗤之以鼻,上次自定义协议是同的通用方法,这节课我们先来简化下代码:
再贴一下上节课的时序图:
官方文档:https://docs.python.org/3/library/json.html
# 把字典对象转换为Json字符串
json_str = json.dumps({"func": func, "args": args})
# 把Json字符串重新变成字典对象
data = json.loads(data)
func, args = data["func"], data["args"]
需要注意的就是类型转换了(eg:python tuple
==> json array
)
Python | JSON |
---|---|
dict | object |
list, tuple | array |
str | string |
int, float | number |
True | true |
False | false |
None | null |
PS:序列化:json.dumps(obj)
,反序列化:json.loads(json_str)
在原有基础上只需要修改下Stub
的pack
和unpack
方法即可
Client_Stub(类型转换都省掉了)
import json
import socket
class ClientStub(object):
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议
格式:{"func": "sum", "args": [1, 2]}
"""
json_str = json.dumps({"func": func, "args": args})
# print(json_str) # log 输出
return json_str.encode("utf-8")
def unpack(self, data):
"""解包:获取返回结果"""
data = data.decode("utf-8")
# 格式应该是"{data:xxxx}"
data = json.loads(data)
# 获取不到就返回None
return data.get("data", None)
# 其他Code我没有改变
Server Stub()
import json
import socket
class ServerStub(object):
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
data = data.decode("utf-8")
# 格式应该是"格式:{"func": "sum", "args": [1, 2]}"
data = json.loads(data)
func, args = data["func"], data["args"]
if args:
return (func, tuple(args)) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
# 格式:"data:返回值"
json_str = json.dumps({"data": result})
return json_str.encode("utf-8")
# 其他Code我没有改变
输出图示:
RPC其实更多的是二进制的序列化方式,这边简单介绍下
官方文档:https://docs.python.org/3/library/pickle.html
用法和Json
类似,PS:序列化:pickle.dumps(obj)
,反序列化:pickle.loads(buffer)
和Json案例类似,也只是改了pack
和unpack
,我这边就贴一下完整代码(防止被吐槽)
1.Client
# 和上一节一样
from client_stub import ClientStub
def main():
stub = ClientStub(("192.168.36.144", 50051))
result = stub.get("sum", (1, 2))
print(f"1+2={result}")
result = stub.get("sum", (1.1, 2))
print(f"1.1+2={result}")
time_str = stub.get("get_time")
print(time_str)
if __name__ == "__main__":
main()
2.ClientStub
import socket
import pickle
class ClientStub(object):
def __init__(self, address):
"""address ==> (ip,port)"""
self.socket = socket.socket()
self.socket.connect(address)
def pack(self, func, args):
"""打包:把方法和参数拼接成自定义的协议"""
return pickle.dumps((func, args))
def unpack(self, data):
"""解包:获取返回结果"""
return pickle.loads(data)
def get(self, func, args=None):
"""1.客户端的RPC Proxy组件收到调用后,负责将被调用的方法名、参数等打包编码成自定义的协议"""
data = self.pack(func, args)
# 2.客户端的RPC Proxy组件在打包完成后通过网络把数据包发送给RPC Server
self.socket.send(data)
# 等待服务端返回结果
data = self.socket.recv(2048)
if data:
return self.unpack(data)
return None
3.Server
# 和上一节一样
import socket
from server_stub import ServerStub
class RPCServer(object):
def __init__(self, address, mycode):
self.mycode = mycode
# 服务端存根(RPC Proxy)
self.server_stub = ServerStub(mycode)
# TCP Socket
self.socket = socket.socket()
# 端口复用
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定端口
self.socket.bind(address)
def run(self):
self.socket.listen()
while True:
# 等待客户端连接
client_socket, client_addr = self.socket.accept()
print(f"来自{client_addr}的请求:\n")
try:
# 交给服务端存根(Server Proxy)处理
self.server_stub.handle(client_socket, client_addr)
except Exception as ex:
print(ex)
if __name__ == "__main__":
from server_code import MyCode
server = RPCServer((‘‘, 50051), MyCode())
print("Server启动ing,Port:50051")
server.run()
4.ServerCode
# 和上一节一样
# 5.RPC Server(被调用者)本地执行后将结果返回给服务端的RPC Proxy
class MyCode(object):
def sum(self, a, b):
return a + b
def get_time(self):
import time
return time.ctime()
5.ServerStub
import socket
import pickle
class ServerStub(object):
def __init__(self, mycode):
self.mycode = mycode
def unpack(self, data):
"""3.服务端的RPC Proxy组件把通过网络接收到的数据包按照相应格式进行拆包解码,获取方法名和参数"""
func, args = pickle.loads(data)
if args:
return (func, args) # (func,args)
return (func, )
def pack(self, result):
"""打包:把方法和参数拼接成自定义的协议"""
return pickle.dumps(result)
def exec(self, func, args=None):
"""4.服务端的RPC Proxy组件根据方法名和参数进行本地调用"""
# 如果没有这个方法则返回None
func = getattr(self.mycode, func)
if args:
return func(*args) # 解包
else:
return func() # 无参函数
def handle(self, client_socket, client_addr):
while True:
# 获取客户端发送的数据包
data = client_socket.recv(2048)
if data:
try:
data = self.unpack(data) # 解包
if len(data) == 1:
data = self.exec(data[0]) # 执行无参函数
elif len(data) > 1:
data = self.exec(data[0], data[1]) # 执行带参函数
else:
data = "RPC Server Error Code:500"
except Exception as ex:
data = "RPC Server Function Error"
print(ex)
# 6.服务端的RPC Proxy组件将返回值打包编码成自定义的协议数据包,并通过网络发送给客户端的RPC Proxy组件
data = self.pack(data) # 把函数执行结果按指定协议打包
# 把处理过的数据发送给客户端
client_socket.send(data)
else:
print(f"客户端:{client_addr}已断开\n")
break
输出图示:
然后关于RPC高级的内容(会涉及到注册中心
),咱们后面说架构的时候继续,网络这边就说到这
RESTful只是接口协议规范,它是建立在http基础上的,我们在网络加强篇的末尾简单带一下,后面讲爬虫应该会再给大家说的
在编写REST接口时,一般都是为HTTP服务的。为了实现一个简单的REST接口,你只需让代码满足Python的WSGI
标准即可
这边我就不自己实现了(上面手写服务器的时候其实已经展示了Restful接口是啥样),用Flask
快速过一遍:
看个引入案例:
import flask
app = flask.Flask(__name__)
@app.route("/")
def index():
return "This is Restful API Test"
if __name__ == "__main__":
app.run()
图示输出:
Server Log:
* Serving Flask app "1.test" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: off
* Running on http://127.0.0.1:8080/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Jan/2019 17:24:02] "GET / HTTP/1.1" 200 -
举个查询服务器节点
信息的例子:/api/servers/
import flask
from infos import info_list
app = flask.Flask(__name__)
# Json的404自定义处理(不加自定义处理会返回默认404页面)
@app.errorhandler(404)
def not_found(error):
return flask.make_response(
flask.jsonify({
"data": "Not Found",
"status": 404
}), 404)
# 运行Get和Post请求
@app.route("/api/v1.0/servers/<name>", methods=["GET", "POST"])
def get_info(name):
infos = list(filter(lambda item: item["name"] == name, info_list))
if len(infos) == 0:
flask.abort(404) # 404
# 基于json.dumps的封装版
return flask.jsonify({"infos": infos}) # 返回Json字符串
if __name__ == "__main__":
app.run(port=8080)
图示输出:(不深入说,后面爬虫会再提的)
课后拓展:
RESTful API 设计指南
http://www.ruanyifeng.com/blog/2014/05/restful_api.html
RESTful API 最佳实践
http://www.ruanyifeng.com/blog/2018/10/restful-api-best-practices.html
异步 API 的设计
http://www.ruanyifeng.com/blog/2018/12/async-api-design.html
使用python的Flask实现一个RESTful API服务器端[翻译]
https://www.cnblogs.com/vovlie/p/4178077.html
标签:any 固定 地方法 inf def 优点 进一步 老版本 break
原文地址:https://www.cnblogs.com/dotnetcrazy/p/9612569.html