标签:
在Python中我们主要是通过thread和 threading这两个模块来实现的,其中Python的threading模块是对thread做了一些包装的,可以更加方便的被使用,所以我们使用 threading模块实现多线程编程。
Thread 线程类,这是我们用的最多的一个类,你可以指定线程函数执行或者继承自它都可以实现子线程功能;
#!/usr/bin/env python import threading # 创建线程, 继承 threading.Thread 类 class MyThread(threading.Thread): def __init__(self, func, args,): self.func = func self.args = args # 执行父类的构造方法 super(MyThread, self).__init__() # 线程开始执行时实际调用 run 方法 def run(self): self.func(self.args) def f2(arg): print(arg) obj = MyThread(f2, 123) obj.start() # 常用方法 t = threading.Thread(target=f2, args=(123,)) t.start()
Lock 锁原语,这个我们可以对全局变量互斥时使用;
#!/usr/bin/env python import threading import time NUM = 10 # lock = threading.RLock() lock = threading.Lock() ‘‘‘ 如果不加锁, 因为每个线程减完没有马上输出, sleep 之后再 print, NUM 已经等于了最后一个值, 所以输出都为 0 ‘‘‘ def func(lock): global NUM lock.acquire() NUM = NUM - 1 time.sleep(1) print(NUM) lock.release() for i in range(10): t = threading.Thread(target=func, args=(lock,)) t.start()
RLock 可重入锁,为了支持在同一线程中多次请求同一资源;
Event 通用的条件变量。多个线程可以等待某个事件发生,在事件发生后,所有的线程都被激活;
#!/usr/bin/env python import threading def func(i, e): print(i) e.wait() # 检测是什么灯 print(i+100) event = threading.Event() for i in range(10): t = threading.Thread(target=func, args=(i, event, )) t.start() ########### event.clear() # 设置红灯 inp = input(">>> ") if inp == "1": event.set() # 设置绿灯
Condition 条件变量,能让一个线程停下来,等待其他线程满足某个"条件";
#!/usr/bin/env python import threading def condition(): ret = False r = input(">>> ") if r == ‘true‘: ret = True else: pass return ret def func(i, con): print(i) con.acquire() con.wait_for(condition) print(i+100) con.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func, args=(i, c, )) t.start() from threading import Timer def hello(): print(‘hello‘) # 等一秒 t = Timer(1, hello) t.start()
Semaphore为等待锁的线程提供一个类似"等候室"的结构;
#!/usr/bin/env python import queue # create q obj, maxsize = 10 q = queue.Queue(10) # not block = do not wait q.put(1, block=False) q.put(2, block=False) print(q.qsize()) # default block = True print(q.get(block=False)) print(q.get(block=False)) # join, task_done # 阻塞进程, 当队列中任务执行完毕后, 不再阻塞 # 后进先出 q1 = queue.LifoQueue() # 优先级队列 q2 = queue.PriorityQueue() # 0 优先级最高 q2.put((2, ‘data1‘)) q2.put((1, ‘data2‘)) print(q2.get()[1]) # 双向队列 q3 = queue.deque() # append, appendleft # pop, popleft
示例一:
#!/usr/bin/env python import threading import time # func take 2s def func(num): for i in range(int(num)): time.sleep(2) # Return the current Thread object print("I come from %s, num: %s" %(threading.current_thread().getName(), i)) def main(thread_num): thread_list = [] # 用 for 循环创建 thread_num 个 线程 for i in range(thread_num): thread_name = "thread_%s" %i # append Thread obj into thread_list thread_list.append(threading.Thread(target=func, name=thread_name, args=(3,))) # start all thread for thread in thread_list: thread.start() # 主线程中等待所有子线程退出 for thread in thread_list: thread.join() if __name__ == ‘__main__‘: # start 5 thread main(5)
输出:
I come from thread_1, num: 0 I come from thread_0, num: 0 I come from thread_3, num: 0 I come from thread_4, num: 0 I come from thread_2, num: 0 I come from thread_1, num: 1 I come from thread_3, num: 1 I come from thread_4, num: 1 I come from thread_2, num: 1 I come from thread_0, num: 1 I come from thread_3, num: 2 I come from thread_4, num: 2 I come from thread_2, num: 2 I come from thread_0, num: 2 I come from thread_1, num: 2
这个例子中,每次打印耗时两秒,并且同时启动了五个线程进行打印。
Unix/Linux 操作系统提供了一个 fork() 系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是 fork() 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。
子进程永远返回 0
,而父进程返回子进程的 ID。这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid()
就可以拿到父进程的 ID。
multiprocessing 模块是跨平台版本的多进程模块,multiprocessing 模块提供了一个 Process 类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print(‘Run child process %s (%s)...‘ % (name, os.getpid())) if __name__==‘__main__‘: print(‘Parent process %s.‘ % os.getpid()) p = Process(target=run_proc, args=(‘test‘,)) print(‘Child process will start.‘) p.start() p.join() print(‘Child process end.‘)
join() 方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool import os, time, random def long_time_task(name): print(‘Run task %s (%s)...‘ % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print(‘Task %s runs %0.2f seconds.‘ % (name, (end - start))) if __name__==‘__main__‘: print(‘Parent process %s.‘ % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print(‘Waiting for all subprocesses done...‘) p.close() p.join() print(‘All subprocesses done.‘)
对 Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),调用 close() 之后就不能继续添加新的 Process 了。
Python的 multiprocessing 模块包装了底层的机制,提供了 Queue、Pipes 等多种方式来交换数据。
from multiprocessing import Process, Queue import os, time, random # 写数据进程执行的代码: def write(q): print(‘Process to write: %s‘ % os.getpid()) for value in [‘A‘, ‘B‘, ‘C‘]: print(‘Put %s to queue...‘ % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print(‘Process to read: %s‘ % os.getpid()) while True: value = q.get(True) print(‘Get %s from queue.‘ % value) if __name__==‘__main__‘: # 父进程创建 Queue,并传给各个子进程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 启动子进程 pw,写入: pw.start() # 启动子进程 pr,读取: pr.start() # 等待 pw 结束: pw.join() # pr 进程里是死循环,无法等待其结束,只能强行终止: pr.terminate()
程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同,协程在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
Python 对协程的支持是通过 generator 实现的,在 generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过 yield
跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
# consumer 是一个 generator
def consumer(): r = ‘‘ while True: n = yield r if not n: return print(‘[CONSUMER] Consuming %s...‘ % n) r = ‘200 OK‘
# 传入 consumer def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print(‘[PRODUCER] Producing %s...‘ % n) r = c.send(n) print(‘[PRODUCER] Consumer return: %s‘ % r) c.close() c = consumer() produce(c)
把一个 consumer (生成器) 传入 produce 后:
c.send(None)
启动生成器启动进程:
memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid
memcache hello world
#!/usr/bin/env python # memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid import memcache mc = memcache.Client([‘127.0.0.1:11211‘], debug=True) mc.set("foo", "bar") ret = mc.get(‘foo‘) print(ret)
输出:
bar
基本操作:
#!/usr/bin/env python # memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid import memcache mc = memcache.Client([‘127.0.0.1:11211‘], debug=True) # mc.set("foo", "bar") # ret = mc.get(‘foo‘) # print(ret) # 添加一条键值对, 如果 key 已经存在操作异常 mc.add(‘k3‘, ‘v3‘) ret = mc.get(‘k3‘) print(ret) # replace 某个 key 的值 mc.replace(‘k3‘, ‘111‘) ret = mc.get(‘k3‘) print(ret) # 删除 k2 键值对 mc.delete(‘k2‘)
#!/usr/bin/env python import redis # 创建对象 r = redis.Redis(host=‘127.0.0.1‘, port=6379) # 在 Redis 中设置值,默认不存在则创建,存在则修改 r.set(‘foo‘, ‘bar‘) print(r.get(‘foo‘)) # 连接池 pool = redis.ConnectionPool(host=‘127.0.0.1‘, port=6379) rp = redis.Redis(connection_pool=pool) rp.set(‘k1‘, ‘v1‘) print(r.get(‘k1‘)) # 批量赋值, 取值 rp.mset(k2=‘v2‘, k3=‘v3‘) print(rp.mget(‘k2‘, ‘k3‘)) # hash 操作 rp.hset(‘h1‘,‘k4‘,‘v4‘) print(rp.hget(‘h1‘,‘k4‘)) m = { ‘name‘: ‘gary‘, ‘age‘: 31 } rp.hmset(‘info‘, m) # 获取 mapping 中某个 key 的值 print(rp.hmget(‘info‘, ‘name‘)) # 获取 hash 中的所有键值 print(rp.hgetall(‘info‘)) # {b‘name‘: b‘gary‘, b‘age‘: b‘31‘} # 获取 hash 中的所有 keys print(rp.hkeys(‘info‘)) # [b‘name‘, b‘age‘] # list 操作 rp.lpush(‘l1‘, 11,22,33) rp.rpush(‘l2‘, 11,22,33) # 创建一个迭代器 def list_iter(name, redis_obj): list_count = redis_obj.llen(name) for i in range(list_count): yield redis_obj.lindex(name, i) for item in list_iter(‘l1‘, rp): print(item)
标签:
原文地址:http://www.cnblogs.com/garyang/p/5693025.html