码迷,mamicode.com
首页 > 编程语言 > 详细

Python 11st Day

时间:2016-07-23 01:56:44      阅读:310      评论:0      收藏:0      [点我收藏+]

标签:

Python 多线程

在Python中我们主要是通过thread和 threading这两个模块来实现的,其中Python的threading模块是对thread做了一些包装的,可以更加方便的被使用,所以我们使用 threading模块实现多线程编程。

threading模块

Thread 线程类,这是我们用的最多的一个类,你可以指定线程函数执行或者继承自它都可以实现子线程功能;

#!/usr/bin/env python

import threading

# 创建线程, 继承 threading.Thread 类
class MyThread(threading.Thread):
    def __init__(self, func, args,):
        self.func = func
        self.args = args
        # 执行父类的构造方法
        super(MyThread, self).__init__()

    # 线程开始执行时实际调用 run 方法
    def run(self):
        self.func(self.args)


def f2(arg):
    print(arg)


obj = MyThread(f2, 123)
obj.start()


# 常用方法
t = threading.Thread(target=f2, args=(123,))
t.start()

Lock 锁原语,这个我们可以对全局变量互斥时使用;

#!/usr/bin/env python

import threading
import time

NUM = 10

# lock = threading.RLock()
lock = threading.Lock()

‘‘‘
如果不加锁, 因为每个线程减完没有马上输出, sleep 之后再 print, NUM 已经等于了最后一个值, 所以输出都为 0
‘‘‘
def func(lock):
    global NUM
    lock.acquire()
    NUM = NUM - 1
    time.sleep(1)
    print(NUM)
    lock.release()


for i in range(10):
    t = threading.Thread(target=func, args=(lock,))
    t.start()

RLock 可重入锁,为了支持在同一线程中多次请求同一资源;
Event 通用的条件变量。多个线程可以等待某个事件发生,在事件发生后,所有的线程都被激活;

#!/usr/bin/env python

import threading

def func(i, e):
    print(i)
    e.wait() # 检测是什么灯
    print(i+100)


event = threading.Event()


for i in range(10):
    t = threading.Thread(target=func, args=(i, event, ))
    t.start()

###########


event.clear() # 设置红灯

inp = input(">>> ")
if inp == "1":
    event.set() # 设置绿灯

 

Condition 条件变量,能让一个线程停下来,等待其他线程满足某个"条件";

#!/usr/bin/env python

import threading


def condition():
    ret = False
    r = input(">>> ")
    if r == true:
        ret = True
    else:
        pass
    return ret


def func(i, con):
    print(i)
    con.acquire()
    con.wait_for(condition)
    print(i+100)
    con.release()

c = threading.Condition()

for i in range(10):
    t = threading.Thread(target=func, args=(i, c, ))
    t.start()


from threading import Timer


def hello():
    print(hello)

# 等一秒
t = Timer(1, hello)
t.start()

Semaphore为等待锁的线程提供一个类似"等候室"的结构;

queue 模块

#!/usr/bin/env python

import queue

# create q obj, maxsize = 10
q = queue.Queue(10)

# not block = do not wait
q.put(1, block=False)
q.put(2, block=False)


print(q.qsize())
# default block = True
print(q.get(block=False))
print(q.get(block=False))

# join, task_done # 阻塞进程, 当队列中任务执行完毕后, 不再阻塞


# 后进先出
q1 = queue.LifoQueue()


# 优先级队列
q2 = queue.PriorityQueue()
# 0 优先级最高
q2.put((2, data1))
q2.put((1, data2))
print(q2.get()[1])


# 双向队列
q3 = queue.deque()
# append, appendleft
# pop, popleft

示例一:

#!/usr/bin/env python

import threading
import time

# func take 2s
def func(num):
    for i in range(int(num)):
        time.sleep(2)
        # Return the current Thread object
        print("I come from %s, num: %s" %(threading.current_thread().getName(), i))


def main(thread_num):
    thread_list = []

    # 用 for 循环创建 thread_num 个 线程
    for i in range(thread_num):
        thread_name = "thread_%s" %i
        # append Thread obj into thread_list
        thread_list.append(threading.Thread(target=func, name=thread_name, args=(3,)))

    # start all thread
    for thread in thread_list:
        thread.start()
    # 主线程中等待所有子线程退出
    for thread in thread_list:
        thread.join()


if __name__ == __main__:
    # start 5 thread
    main(5)

输出:

I come from thread_1, num: 0
I come from thread_0, num: 0
I come from thread_3, num: 0
I come from thread_4, num: 0
I come from thread_2, num: 0
I come from thread_1, num: 1
I come from thread_3, num: 1
I come from thread_4, num: 1
I come from thread_2, num: 1
I come from thread_0, num: 1
I come from thread_3, num: 2
I come from thread_4, num: 2
I come from thread_2, num: 2
I come from thread_0, num: 2
I come from thread_1, num: 2

 这个例子中,每次打印耗时两秒,并且同时启动了五个线程进行打印。

Python 多进程

Unix/Linux 操作系统提供了一个 fork() 系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是 fork() 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。

子进程永远返回 0,而父进程返回子进程的 ID。这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid() 就可以拿到父进程的 ID。

multiprocessing 模块

multiprocessing 模块是跨平台版本的多进程模块,multiprocessing 模块提供了一个 Process 类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print(Run child process %s (%s)... % (name, os.getpid()))

if __name__==__main__:
    print(Parent process %s. % os.getpid())
    p = Process(target=run_proc, args=(test,))
    print(Child process will start.)
    p.start()
    p.join()
    print(Child process end.)

join() 方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Pool 进程池

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print(Run task %s (%s)... % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print(Task %s runs %0.2f seconds. % (name, (end - start)))

if __name__==__main__:
    print(Parent process %s. % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print(Waiting for all subprocesses done...)
    p.close()
    p.join()
    print(All subprocesses done.)

对 Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),调用 close() 之后就不能继续添加新的 Process 了。

进程间通信

Python的 multiprocessing 模块包装了底层的机制,提供了 Queue、Pipes 等多种方式来交换数据。

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print(Process to write: %s % os.getpid())
    for value in [A, B, C]:
        print(Put %s to queue... % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print(Process to read: %s % os.getpid())
    while True:
        value = q.get(True)
        print(Get %s from queue. % value)

if __name__==__main__:
    # 父进程创建 Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw,写入:
    pw.start()
    # 启动子进程 pr,读取:
    pr.start()
    # 等待 pw 结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

协程(Coroutine)

程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同,协程在执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

Python 对协程的支持是通过 generator 实现的,在 generator 中,我们不但可以通过 for 循环来迭代,还可以不断调用 next() 函数获取由 yield 语句返回的下一个值。

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。

如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

# consumer 是一个 generator
def
consumer(): r = ‘‘ while True: n = yield r if not n: return print([CONSUMER] Consuming %s... % n) r = 200 OK
# 传入 consumer def produce(c): c.send(None) n = 0 while n < 5: n = n + 1 print([PRODUCER] Producing %s... % n) r = c.send(n) print([PRODUCER] Consumer return: %s % r) c.close() c = consumer() produce(c)

把一个 consumer (生成器) 传入 produce 后:

  1. 首先调用 c.send(None) 启动生成器
  2. 一旦生产了东西,通过 c.send(n) 切换到 consumer 执行
  3. consumer 通过 yield 拿到消息,处理,又通过 yield 把结果传回
  4. consumer通过yield拿到消息,处理,又通过yield把结果传回

Mecached

启动进程:

memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid

memcache hello world

#!/usr/bin/env python

# memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid

import memcache

mc = memcache.Client([127.0.0.1:11211], debug=True)

mc.set("foo", "bar")
ret = mc.get(foo)
print(ret)

输出:

bar

基本操作:

#!/usr/bin/env python

# memcached -d -m 10 -u root -l 127.0.0.1 -p 11211 -c 256 -P /tmp/memcached.pid

import memcache

mc = memcache.Client([127.0.0.1:11211], debug=True)

# mc.set("foo", "bar")
# ret = mc.get(‘foo‘)
# print(ret)

# 添加一条键值对, 如果 key 已经存在操作异常
mc.add(k3, v3)
ret = mc.get(k3)
print(ret)

# replace 某个 key 的值
mc.replace(k3, 111)
ret = mc.get(k3)
print(ret)

# 删除 k2 键值对
mc.delete(k2)

Redis

#!/usr/bin/env python

import redis

# 创建对象
r = redis.Redis(host=127.0.0.1, port=6379)

# 在 Redis 中设置值,默认不存在则创建,存在则修改
r.set(foo, bar)
print(r.get(foo))

# 连接池
pool = redis.ConnectionPool(host=127.0.0.1, port=6379)

rp = redis.Redis(connection_pool=pool)
rp.set(k1, v1)
print(r.get(k1))

# 批量赋值, 取值
rp.mset(k2=v2, k3=v3)
print(rp.mget(k2, k3))

# hash 操作
rp.hset(h1,k4,v4)
print(rp.hget(h1,k4))

m = {
    name: gary,
    age: 31
}

rp.hmset(info, m)
# 获取 mapping 中某个 key 的值
print(rp.hmget(info, name))
# 获取 hash 中的所有键值
print(rp.hgetall(info)) # {b‘name‘: b‘gary‘, b‘age‘: b‘31‘}
# 获取 hash 中的所有 keys
print(rp.hkeys(info)) # [b‘name‘, b‘age‘]

# list 操作
rp.lpush(l1, 11,22,33)
rp.rpush(l2, 11,22,33)


# 创建一个迭代器
def list_iter(name, redis_obj):
    list_count = redis_obj.llen(name)
    for i in range(list_count):
        yield redis_obj.lindex(name, i)

for item in list_iter(l1, rp):
    print(item)

Python 11st Day

标签:

原文地址:http://www.cnblogs.com/garyang/p/5693025.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!