标签:异步io __name__ val lock 保存 == indexer orm future
import time def profile(func): def wrapper(*args, **kwargs): import time start = time.time() func(*args, **kwargs) end = time.time() print ‘COST: {}‘.format(end - start) return wrapper @profile def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) fib(35)
import threading for i in range(2): t = threading.Thread(target=fib, args=(35,)) t.start() main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue t.join()
threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
from threading import Semaphore import time sema = Semaphore(3) def foo(tid): with sema: print(‘{} acquire sema‘.format(tid)) wt = random() * 2 time.sleep(wt) print(‘{} release sema‘.format(tid))
from threading import Thread Lock value = 0 lock = Lock() def getlock(): global lock with lock: new = value + 1 time.sleep(0.001) value = new
import time import threading def consumer(cond): t = threading.currentThread() with cond: cond.wait() print("{}: Resource is available to sonsumer".format(t.name)) def producer(cond): t = threading.currentThread() with cond: print("{}: Making resource available".format(t.name)) cond.notifyAll() condition = threading.Condition() c1 = threading.Thread(name=‘c1‘, target=consumer, args=(condition,)) c2 = threading.Thread(name=‘c2‘, target=consumer, args=(condition,)) p = threading.Thread(name=‘p‘, target=producer, args=(condition,)) c1.start() c2.start() p.start()
import time import threading from random import randint TIMEOUT = 2 def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print ‘{} popped from list by {}‘.format(integer, t.name) event.clear() # 重置事件状态 except IndexError: # 为了让刚启动时容错 pass def producer(event, l): t = threading.currentThread() while 1: integer = randint(10, 100) l.append(integer) print ‘{} appended to list by {}‘.format(integer, t.name) event.set() # 设置事件 time.sleep(1) event = threading.Event() l = [] threads = [] for name in (‘consumer1‘, ‘consumer2‘): t = threading.Thread(name=name, target=consumer, args=(event, l)) t.start() threads.append(t) p = threading.Thread(name=‘producer1‘, target=producer, args=(event, l)) p.start() threads.append(p) for t in threads: t.join()
import queue def worker(): while True: item = q.get() if item is None: break do_work(item) q.task_done() q = queue.Queue() threads = [] for i in range(num_worker_threads): t = threading.Thread(target=worker) t.start() threads.append(t) for item in source(): q.put(item) q.join() for i in range(num_worker_threads): q.put(None) for t in threads: t.join()
import threading from random import randint from queue import PriorityQueue q = PriorityQueue() def double(n): return n * 2 def producer(): count = 0 while 1: if count > 5: break pri = randint(0, 100) print(‘put :{}‘.format(pri)) q.put((pri, double, pri)) # (priority, func, args) count += 1 def consumer(): while 1: if q.empty(): break pri, task, arg = q.get() print(‘[PRI:{}] {} * 2 = {}‘.format(pri, arg, task(arg))) q.task_done() time.sleep(0.1) t = threading.Thread(target=producer) t.start() time.sleep(1) t = threading.Thread(target=consumer) t.start()
from multiprocessing.pool import ThreadPool pool = ThreadPool(5) pool.map(lambda x: x**2, range(5))
from multiprocessing.dummy import Pool
from concurrent.futures improt ThreadPoolExecutor from concurrent.futures import as_completed import urllib.request URLS = [‘http://www.baidu.com‘, ‘http://www.hao123.com‘] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() with ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in as_completed(future_to_url): url = future_to_url[future] try: data = future.result() execpt Exception as exc: print("%r generated an exception: %s" % (url, exc)) else: print("%r page is %d bytes" % (url, len(data)))
import multiprocessing jobs = [] for i in range(2): p = multiprocessing.Process(target=fib, args=(12,)) p.start() jobs.append(p) for p in jobs: p.join()
from multiprocessing import Pool pool = Pool(2) pool.map(fib, [36] * 2)
from concurrent.futures import ProcessPoolExecutor import math PRIMES = [ 112272535095293, 112582705942171] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True if __name__ == "__main__": with ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print("%d is prime: %s" % (number, prime))
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again") loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close()
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again") loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
import asyncio import httpx async def get_url(): r = await httpx.get("http://www.baidu.com") return r.status_code loop = asyncio.get_event_loop() tasks = [get_url() for i in range(10)] results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close() for num, result in zip(range(10), results): print(num, result)
标签:异步io __name__ val lock 保存 == indexer orm future
原文地址:https://www.cnblogs.com/junmoxiao/p/11948993.html