标签:递归 check 访问 pen 效果 error def sel 示例
[TOC]
#管道
```
from multiprocessing import Pipe,Process
def func(conn2):
print(conn2.recv())
conn1,conn2 = Pipe()
conn1.send("Hello pipe")
p = Process(target=func, args=(conn2,))
p.start()
```
**多进程中管道异常EOFError**
```
from multiprocessing import Pipe, Process
import time
import random
def func_recv(conn1, conn2):
conn2.close()
while True:
try:
print(conn1.recv())
time.sleep(random.random())
except EOFError:
conn1.close()
print("recv done")
break
def func_send(conn1, conn2):
conn1.close()
for i in range(4):
conn2.send("msg %d" % i)
conn2.close()
conn1, conn2 = Pipe()
recv_p = Process(target=func_recv, args=(conn1, conn2))
send_p = Process(target=func_send, args=(conn1, conn2))
recv_p.start()
send_p.start()
conn1.close()
conn2.close()
```
注意:多进程使用管道可能会出现数据不安全,需要加锁操作
[返回顶部](#top)
#进程间的数据共享
```
from multiprocessing import Manager
```
数据不安全,内容略
#进程池
##1.为什么会有进程池的概念
效率;
每开启进程,开启属于这个进程的内存空间(如寄存器,堆栈,文件都会战用内存空间);
进程过多,操作系统调试较为耗时
##2.进程池原理:
python中先创建一个属于进程的池子,这个池子指定能存放多少进程,任务存放于队列,等待进程池中的进程处理,当一个进程处理完一个任务后,并不销毁,而是放回进程池,然后继续去任务队列中拿取下一个任务,这就节省了进程被销毁和再创建的时间,也节省了过多进程调度的时间;
信号量与之相比,只是节省了调度时间,因为信号里是控制进程执行的数量,并不能控制进程创建的数量。
##进程池示例
```
from multiprocessing import Pool
import time
import os
def func(n):
print("start func %s,pid:%s" %(n,os.getpid()))
time.sleep(1)
print("end func %s,pid:%s" %(n,os.getpid()))
p = Pool(5) #初始化进程池,设置可同时开5个进程
for i in range(10):
p.apply_async(func,args = (i,)) #异步调用
p.close() #进程池结束接收任务
p.join() #感知进程池中的任务执行结束
```
[返回顶部](#top)
##进程池版的socket连接效果
server.py
```
from socket import socket
from multiprocessing import Pool
def interactive(conn):
conn.send("hello".encode(‘utf-8‘))
data = conn.recv(1024)
print(data.decode(‘utf-8‘))
sk = socket()
sk.bind((‘127.0.0.1‘, 9090))
sk.listen()
p = Pool(5) #进程数一般设置为CPU核数+1
while True:
conn, add = sk.accept()
p.apply_async(interactive, args=(conn,))
conn.close()
sk.close()
```
client.py
```
from socket import socket
sk = socket()
sk.connect((‘127.0.0.1‘,9090))
data = sk.recv(1024)
print(data.decode(‘utf-8‘))
msg = input("client:")
sk.send(msg.encode("utf-8"))
sk.close()
```
[返回顶部](#top)
##获取进程池异步调用的返回值
同步调用的返回值
```
from multiprocessing import Pool
def func(i):
return i*i
p = Pool(5)
ret_list = []
for i in range(10):
ret = p.apply(func, args=(i,))
print(ret)
```
异步调用的返回值
```
from multiprocessing import Pool
def func(i):
return i*i
p = Pool(5)
ret_list = []
for i in range(10):
ret = p.apply_async(func,args=(i,))
print(ret)
ret_list.append(ret) #存储返回的对象
for ret in ret_list:
print(ret.get()) #使用get方法获取返回对象的值,
#get方法会阻塞,因此不需要进程池close和join
```
[返回顶部](#top)
#进程池回调函数
callback回调函数,回调函数在主进程中执行
```
from multiprocessing import Pool
import os
import time
def func(i):
print("in func,pid: %s" % (os.getpid()))
time.sleep(1)
return i * i
def func2(ret):
print("pid:%s, in func2: %s" %(os.getpid(),ret))
p = Pool(3)
for i in range(10):
p.apply_async(func, args=(i,), callback=func2)
p.close()
p.join()
```
#总结:

[返回顶部](#top)
#多线程
##函数式线程
进程是内存分配的最小单位
线程是操作系统调度的最小单位
线程是CPU执行的最小单位
进程内至少包含一个线程
进程中可以并行执行多个线程
开启一个线程的时间要远远小于开启一个进程;
多个线程内部有自己的数据栈,数据不共享;
全局变量在多个线程之间是共享的。
```
from threading import Thread
import time
def func(i):
print(i)
time.sleep(1)
for i in range(10):
t = Thread(target=func, args=(i,))
t.start()
```
[返回顶部](#top)
##线程类
```
class MyTread(Thread):
def __init__(self,arg):
super().__init__()
self.arg = arg
def run(self):
time.sleep(1)
print(self.arg)
for i in range(10):
t = MyTread(i)
t.start()
```
[返回顶部](#top)
##多线程socket
server.py
```
from socket import socket
from multiprocessing import Pool
def interactive(conn):
conn.send("hello".encode(‘utf-8‘))
data = conn.recv(1024)
print(data.decode(‘utf-8‘))
sk = socket()
sk.bind((‘127.0.0.1‘, 9090))
sk.listen()
while True:
conn, add = sk.accept()
t = Thread(target=interactive, args=(conn,))
t.start()
conn.close()
sk.close()
```
client.py
```
from socket import socket
sk = socket()
sk.connect((‘127.0.0.1‘,9090))
data = sk.recv(1024)
print(data.decode(‘utf-8‘))
msg = input("client:")
sk.send(msg.encode("utf-8"))
sk.close()
```
[返回顶部](#top)
##线程模块中的方法
1.threading.current_thread() #线程对象:线程名和线程号
2.threading.get_ident() #线程号
3.threading.active_count() #在运行的线程数
4.threading.enumerate() #线程对像的列表
```
print(threading.current_thread())
<_MainThread(MainThread, started 140380643608384)>
print(threading.get_ident())
print(threading.active_count())
print(threading.enumerate())
```
[返回顶部](#top)
##守护线程
无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行
```
#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
```
```
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
```
示例:
```
from threading import Thread
import time
def func1():
print("in func1")
while True:
print("*" * 10)
time.sleep(0.5)
def func2():
print("in func2")
time.sleep(3)
print("func2 done")
t1 = Thread(target=func1,)
t1.daemon = True #设置为守护线程,必须在start方法之前设置,守护线程等待其它的子线程结束之后才结束
t1.start()
t2 = Thread(target=func2,)
t2.start()
# 主线程会等待其它非守护子线程执行结束后才能结束,
# 因为主线程结束就意味着进程就结束了,进程整体的资源会被回收,而进程必须保证非守护线程运行完毕后才能结束
```
[返回顶部](#top)
##线程join方法
```
from threading import Thread
import time
def func1():
print("in func1")
while True:
print("*" * 10)
time.sleep(0.5)
def func2():
print("in func2")
time.sleep(3)
print("func2 done")
t1 = Thread(target=func1,)
t1.daemon = True #设置为守护线程,守护线程等待其它的线程结束之后才结束
t1.start()
t2 = Thread(target=func2,)
t2.start()
t2.join() #代码执行到这里会阻塞,直到t2线程进程完成,再往下执行
print("t2执行完成")
time.sleep(2)
print("主线程执行完成")
```
[返回顶部](#top)
## 互斥锁
未加锁的情况,数据不安全
```
from threading import Lock,Thread
import time
def funn():
global n
tmp = n
time.sleep(0.1)
n = tmp - 1
n = 10
t_list = []
for i in range(10):
t = Thread(target=funn, )
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(n)
```
加锁后结果符合预期结果
```
from threading import Lock, Thread
import time
def funn(lock):
global n
lock.acquire()
tmp = n
time.sleep(0.2)
n = tmp - 1
lock.release()
n = 10
t_list = []
lock = Lock()
for i in range(10):
t = Thread(target=funn, args=(lock,))
t.start()
t_list.append(t)
for t in t_list: t.join()
print(n)
```
[返回顶部](#top)
##递归锁RLock--解决死锁问题
死锁:
在同一个线程或同一个进程里,使用两把锁及两把锁以上的锁时,都有可能产生死锁现象,为了避免,可以改为递归锁
死锁示例
```
from threading import Lock, Thread
import time
fork_lock = Lock()
noodle_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print("%s got the noodle" % name)
fork_lock.acquire()
print("%s got the fork" % name)
print("%s eat noodle" % name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print("%s got the fork" % name)
time.sleep(0.1)
noodle_lock.acquire()
print("%s got the noodle" % name)
print("%s eat noodle" % name)
noodle_lock.release()
fork_lock.release()
Thread(target=eat1, args=("alex",)).start()
Thread(target=eat2, args=("guest",)).start()
Thread(target=eat1, args=("boots",)).start()
Thread(target=eat2, args=("xiaoming",)).start()
```
解决死锁,采用递归锁RLock
递归锁在同一个线程里,可以多次acquire,
只要有一个线程有一次acquire,其它线程就无法acquire,也就无法访问acquire中锁住的数据了
```
from threading import RLock, Thread
import time
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print("%s got the noodle" % name)
fork_lock.acquire()
print("%s got the fork" % name)
print("%s eat noodle" % name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print("%s got the fork" % name)
time.sleep(0.1)
noodle_lock.acquire()
print("%s got the noodle" % name)
print("%s eat noodle" % name)
noodle_lock.release()
fork_lock.release()
Thread(target=eat1, args=("alex",)).start()
Thread(target=eat2, args=("guest",)).start()
Thread(target=eat1, args=("boots",)).start()
Thread(target=eat2, args=("xiaoming",)).start()
```
##线程信号量
```与进程同
```
[返回顶部](#top)
##线程事件Event
示例:起两个线程
1.第一个线程:尝试连接数据库:
1)等待一个信号,告诉我们之间的网络是通的
2)连接数据库
2.第二个线程:检测与数据库之间的网络是否连通
1)time.sleep(0,2)
2)将事件的状态设置为True
```
def connect_db(e):
pass
def check_network(e):
pass
```
[返回顶部](#top)
##线程条件Condition
Condition是一个更复杂点的锁
除了提供acquire,release方法之外,还提供wait,notify
wait:一个条件被创建之初,默认有一个False状态,该状态会使wait一直处于等待状态
notify: int数据类型,表示创建的一串钥匙有几把钥匙,钥匙是一次性的
```
```
##队列,栈,优先级队列
q = queue.Queue()
q = queue.LifoQueue()
q = queue.PriorityQueue()
q.put()
q.get()
q.put_nowait()
```
具体方法查询进程一节内容
```
[返回顶部](#top)
##线程池
方法参见:
http://www.cnblogs.com/Eva-J/articles/8306047.html#_label16
基本使用示例:
```
from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
time.sleep(2)
print(n)
return n*n
tpool = ThreadPoolExecutor(max_workers=5) # 默认不要超过CPU个数*5
t_list = []
for i in range(10):
t = tpool.submit(func, i) #往线程池中提交任务
t_list.append(t)
tpool.shutdown() # 阻塞,等待任务执行完成再往下执行,相当于以前的close() + join()
print(‘this is in main thread‘)
for t in t_list:
print("result: %s " % t.result())
```
回调函数
```
from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
time.sleep(2)
print(n)
return n*n
def call_back(m):
print("call back result:%s" % m.result())
tpool = ThreadPoolExecutor(max_workers=5) # 默认不要超过CPU个数*5
for i in range(10):
t = tpool.submit(func, i).add_done_callback(call_back) #往线程池中提交任务,后面调用回调函数
```
[返回顶部](#top)
标签:递归 check 访问 pen 效果 error def sel 示例
原文地址:https://www.cnblogs.com/rootid/p/9697227.html