标签:wrapper patch 用户 cti sel 字符 buffer Owner 回调
def wrapper(func):
def inner(*args, **kwargs):
# args是元组, kwargs是map
start_time = time.time()
# 保持返回值不变
res = func(*args, **kwargs)
end_time = time.time()
print("耗时%ds" % (end_time - start_time))
return res
return inner
def cal(l):
res = 0
for i in l:
res += i
return res
# 关键点
cal = wrapper(cal)
def wrapper(func):
def inner(*args, **kwargs):
# args是元组, kwargs是map
start_time = time.time()
res = func(*args, **kwargs)
end_time = time.time()
print("耗时 %d s" % (end_time - start_time))
return res
return inner
# 使用Python的装饰器
@wrapper
# cal = wrapper(cal)
def cal(l):
res = 0
for i in l:
res += i
return res
# 接受函数
def my_func(func):
# 使用*args和**kwargs
def wrapper(*args, **kwargs):
# 接受被修饰函数的返回值
# 注意这个时候会发生闭包
ret = func()
# 返回被修饰函数的返回值
return ret
# 返回修饰的函数, 以后执行的就是wrapper的函数
return wrapper
@my_func
# @my_func <-> test = my_func(test) !!
def test(name, age):
print(name)
print(age)
def my_func(msg):
def my_wrapper(func):
def wrapper(*args, **kwargs):
print(msg)
ret = func()
return ret
return wrapper
return my_wrapper
@my_func(‘hello‘) # -> func = my_func(‘hello‘) -> @func
def test():
print(‘test‘)
def Person(name, age):
def __init__(name, age):
person = {
‘name‘: name,
‘age‘: age
}
return person
def say(person, words):
print(person[‘name‘] + ‘say: ‘ + words)
return __init__(name, age)
# 调用
p = Person(‘Main‘, 18)
p[say](p, ‘Hello, world!‘)
1. __dict__: # 查看属性字典, 调用类的__dict__时显示出类的数据属性与函数属性, 调用对象的__dict__时显示的是该对象的数据属性, 没有函数属性, 因为对象没有保存着函数属性, 函数属性保存在类中
2. __name__: # 返回类名
3. __doc__: # 返回文档字符串, 不能被继承
4. __module__: # 返回所在模块
5. __class__: # 返回类型
6. __base__: # 基类
7. __bases__: # 基类元组
8. __init__: # 对象初始化属性, 系统在调用了__init__之后就会将self返回
9. __call__: # object()
10. __getattr__(self, item): # 在调用或者访问一个对象不存在的属性的时候调用
11. __getattribute__: # 只要通过object.property访问或者调用都会调用__getattribute__魔法方法, 在重写的情况下, 如果属性不存在则__getattribute__方法就会跑出AttributeError, 只要抛出AttributeError解析器内部就是紧接着调用__getattr__魔法方法, 默认__getattr__魔法方法就是抛异常; 如果重写__getattribute__方法的时候没有抛出异常则一定不会执行__getattr__, 但是只要抛出了AttributeError异常一定会调用__getattr__方法, 这才是第9点提到的__getattr__调用的实质
12. __setattr__(self, key, value): # 为对象添加属性, foo.x = x 底层调用, 重写时需要防止递归, 要操作dict
13. __delattr__(self, item): # 删除对象的属性, del foo.x 底层调用, 重写时需要防止递归, 要操作dict
14. __getitem__
15. __setitem__
16. __delitem__
17. __str__与__repr: # str()工厂类默认调用`__str__`方法, 但是如果该对象的`__str__`方法没有被重写, 则调用`__repr__`方法
18. __format__(self, format\_spec): # 调用format的实质就是调用该方法
19. __slots__: # 不是从object继承过来的, 需要我们添加定义为类变量, 定义了__slots__会消掉__dict__, 我们知道在Python中可以通过反射的方式添加新的属性, 其实质就是操作dict, 如果定义了__slots__的时候则没有了dict, 那么就可以限制用户定义其他属性, 但是使用slots的功能是为了节省内存, 不要用它来显示属性定义
20. __del__: # 析构方法
21. __iter__: # 返回迭代器, 使用for i in obj时调用obj.__iter__()
22. __next__: # 返回迭代器下一个值, 抛出StopIteration时for循环停止迭代
1. hasattr(object, attr): # 判断一个对象是否可以调用attr属性, 并不是查找dict字典
2. getattr(object, attr): # foo.x
3. setattr(object, key, value): # foo.x = x
4. delattr(object, item): # del foo.x
5. str(object): # object.__str__() -> object.__repr__() if __str__ 没有被重写
6. repr(object): # object.__repr()
7. isinstance(object, cls): # 查找object.__mro__如果里面有一个等于cls则返回True
8. issubclass(cls1, cls2): # 查找cls1.__bases__()中是否有cls2
9. format(object, format\_spec): # 调用__format__, 如果没有重写, 则调用__str__, 如果没有重写__str__, 则调用__repr__
10. iter(): # 调用__iter__()
11. next(): # 调用__next__(), 一个类只有同时定义了__iter__和__next__才算接受了迭代器协议, 这样在调用iter, for i in some的时候才不会报错
1. __import__(modulename): 导入模块, __import__(‘a.b‘), 导入a.b模块, 但是返回的是最高层的模块a, 此时要访问b的内容需要a.b.something; 类似的功能为importlib模块, importlib.import_module(modulename), 与__import__区别就是返回的模块是最内层模块, 而不是最高层模块
1. BaseClass.__init__(self, ...)
2. super().__init__(...) == super(CurrentClass, self).__init__(...), 注意带有参数的super中地址参数是类对象, 并且该类对象是当前正在编辑的类, 不是基类
# 导入Abstract Class模块
import abc
class Animal(abc.ABCMeta):
@abs.abstractmethod
def run(self):
pass
class Dog(Animal):
# 如果没有实现run方法则在创建Dog对象的时候会报错
def run(self):
print(‘dog is running!‘)
# 采用继承的方式继承基类公开的属性, 采用重写方式修改函数属性逻辑, 采用派生方式添加新功能
class String(str):
def show(self):
print(self)
class String(object):
def __init__(self, string):
self.string = str(string)
def show(self):
print(self.string)
# 在item不是String中的属性的时候调用
def __getattr(self, item):
return getattr(self.string, item)
__get__
, __set__
, __delete__
其中一个@staticmethod, @classmethod, __slots__
, 只要描述符+装饰器, 因为他们底层就是这样实现的
class TypeChecker(object):
def __init__(self, name, type):
self.name = name
self.type = type
def __set__(self, instance, value):
if isinstance(value, self.type):
instance.__dict__[self.name] = value
return
raise TypeError
def __get__(self, instance, owner):
if self.name in instance.__dict__:
return instance.__dict__[self.name]
raise AttributeError
def __delete__(self, instance):
if self.name in instance.__dict__:
del instance.__dict__[self.name]
raise AttributeError
def type_check(**kwargs):
def wrapper(cls):
for key in kwargs:
setattr(cls, key, TypeChecker(key, kwargs[key]))
return cls
return wrapper
@type_check(name=str, age=int)
class Person(object):
def __init__(self, name, age):
self.name = name
self.age = age
def bind_cls(func, cls):
def wrapper(*args, **kwargs):
ret = func(cls, *args, **kwargs)
return ret
return wrapper
class ClassMethod(object):
def __init__(self, func):
self.func = func
def __get__(self, instance, owner):
return bind_cls(self.func, owner)
# server.py
tcp_server = socket(AF_INET, SOCK_STREAM)
tcp_server.bind((‘127.0.0.1‘, 8080))
tcp_server.listen(5)
while True:
conn, addr = tcp_accept()
while True:
data = conn.recv(1024)
# 客户端关闭了, 在Windows与Linux会抛出异常, 但是在Mac上返回b‘‘
if not data:
break
print(data.decoding(‘utf-8‘))
conn.send(data.upper())
conn.close()
tcp_server.close()
# client.py
tcp_client = socket(AF_INET, SOCK_STREAM)
tcp_client.connect((‘127.0.0.1‘, 8080))
while True:
msg = input(‘>> ‘)
tcp_client.send(msg.encoding(‘utf-8‘))
print(tcp_client.recv(1024).decoding(‘utf-8‘))
tcp_client.close()
# ntp_server.py
ntp_server = socket(AF_INET, SOCK_DGRAM)
ntp_server.bind((‘127.0.0.1‘, 8080))
while True:
data, addr = ntp_server.recvfrom(1024)
# ret[0]为内容, ret[1]为ip_port
npt_server.sendto(time.strftime(‘%Y-%m-%d %H:%M:%S‘).encode(‘utf-8‘), addr)
ntp_server.close()
# ntp_client.py
ntp_client = socket(AF_INET, SOCK_DGRAM)
while True:
msg = input(‘>> ‘).strip()
ntp_client.sendto(msg.encode(‘utf-8‘), (‘127.0.0.1‘, 8080))
data, addr = ntp_client.recvfrom(1024)
print(data.decode(‘utf-8‘))
ntp_client.close()
# server.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import struct
import subprocess
ip_port = (‘127.0.0.1‘, 8080)
buffer_size = 1024
backlog = 5
tcp_server = socket(AF_INET, SOCK_STREAM)
tcp_server.bind(ip_port)
tcp_server.listen(backlog)
while True:
conn, addr = tcp_server.accept()
while True:
cmd = conn.recv(buffer_size).decode(‘utf-8‘)
pipe = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
err = pipe.stderr.read()
data = None
if err:
data = err
else:
out = pipe.stdout.read()
if out:
data = out
data_size = struct.pack(‘i‘, len(data))
conn.send(data_size)
conn.send(data)
conn.close()
tcp_server.close()
# client.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import struct
ip_port = (‘127.0.0.1‘, 8080)
buffer_size = 1024
tcp_client = socket(AF_INET, SOCK_STREAM)
tcp_client.connect(ip_port)
while True:
cmd = input(‘>> ‘).strip()
if cmd == ‘exit‘:
break
cmd = cmd.encode(‘utf-8‘)
if not cmd:
continue
tcp_client.send(cmd)
data_size = struct.unpack(‘i‘, tcp_client.recv(4))[0]
output = ‘‘
while data_size > 0:
data = tcp_client.recv(buffer_size)
data_size -= len(data)
output += data.decode(‘utf-8‘)
print(output)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
ip_port = (‘127.0.0.1‘, 8080)
buffer_size = 1024
udp_server = socket(AF_INET, SOCK_DGRAM)
udp_server.bind(ip_port)
while True:
data, addr = udp_server.recvfrom(buffer_size)
print(data.upper())
udp_server.sendto(data.upper(), addr)
udp_server.close()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
ip_port = (‘127.0.0.1‘, 8080)
buffer_size = 1024
udp_client = socket(AF_INET, SOCK_DGRAM)
while True:
msg = input(‘>> ‘).strip()
udp_client.sendto(msg.encode(‘utf-8‘), ip_port)
data, addr = udp_client.recvfrom(buffer_size)
print(data.decode(‘utf-8‘))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socketserver
class MyServer(socketserver.BaseRequestHandler):
buffer_size = 1024
# 实现通讯循环
def handle(self):
while True:
data = self.request.recv(self.buffer_size)
if not data:
break
print(data.upper())
self.request.sendall(data.upper())
self.request.close()
if __name__ == ‘__main__‘:
# 创建ThreadingTCPServer对象
server = socketserver.ThreadingTCPServer((‘127.0.0.1‘, 8080), MyServer)
# 调用server_forever()产生连接循环
server.serve_forever()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socketserver
class MyServer(socketserver.BaseRequestHandler):
buffer_size = 1024
# 实现通讯循环
def handle(self):
while True:
data = self.request.recv(self.buffer_size)
if not data:
break
print(data.upper())
self.request.sendall(data.upper())
self.request.close()
if __name__ == ‘__main__‘:
# 创建ThreadingTCPServer对象
server = socketserver.ForkingTCPServer((‘127.0.0.1‘, 8080), MyServer)
# 调用server_forever()产生连接循环
server.serve_forever()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
buffer_size = 1024
backlog = 5
address = (‘127.0.0.1‘, 8080)
server.bind(address)
server.listen(backlog)
rlist = [server,]
wlist = []
xlist = []
def main():
while True:
# select 为水平触发的, 接收到了连接, 必须调用recv系统调用拿走数据才会停止此次触发
r_list, w_list, x_list = select.select(rlist, wlist, xlist)
# 接收到了新的连接
for sock in r_list:
# 处理连接
if sock == server:
conn, addr = sock.accept()
print(‘接受到%s:%s的连接‘ % (addr[0], addr[1]))
# 添加到rlist中是关键
rlist.append(conn)
else:
# 处理数据的接受与发送
data = sock.recv(buffer_size)
if not data:
sock.close()
rlist.remove(sock)
continue
print(data.upper())
server.close()
if __name__ == ‘__main__‘:
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import selectors
from socket import *
address = (‘127.0.0.1‘, 8080)
buffer_size = 1024
backlog = 5
server = socket(AF_INET, SOCK_STREAM)
server.bind(address)
server.listen(backlog)
# !!!
server.setblocking(False)
def read(conn, mask):
while True:
data = None
try:
data = conn.recv(buffer_size)
if not data:
conn.close()
# 相当于在使用select方法中使用remove删除list中的一个socket
sel.unregister(conn)
break
print(data.upper())
except Exception as e:
continue
def accept(server, mask):
conn, addr = server.accept()
# 相当于在select方法中append一个socket到list中
sel.register(conn, selectors.EVENT_READ, read)
print(‘this is accept function‘)
def main():
while True:
events = sel.select()
for key, value in events:
conn = key.fileobj
callback = key.data
callback(conn, value)
if __name__ == ‘__main__‘:
sel = selectors.DefaultSelector()
sel.register(server, selectors.EVENT_READ, accept)
main()
标签:wrapper patch 用户 cti sel 字符 buffer Owner 回调
原文地址:https://www.cnblogs.com/megachen/p/10360026.html