标签:epo poetry init 模块 hat format nts div doc
5.由于线程拥有较少的资源,但又具有传统进程的许多特性,因此线程可被称为轻型进程(light weight process, LWP),传统进程相对称为重型进程(heavy weight process, HWP)。
线程模型:和进程一样,包括TCB(Thread Controller Block 线程控制块)、程序和数据。Thread结构包括线程标识符、调度状态信息、核心栈指针、用户栈指针及私有存储区等。
- 内核级线程(Kernel Supported threads,KST):内核控制线程的创建、撤销和切换,并为每个内核级线程创建TCB,从而感知其存在。内核级线程的优点是:1.在多处理器上,内核可以调用同一进程中的多个线程同时工作;2.如果一个进程中的某个线程阻塞,其他线程仍然可以继续运行。其缺点是:由于线程由CPU调度和分派,用户态线程要经由操作系统进入内核,用户态不同进程的多个线程进行切换时,都要进入内核再进行切换,切换代价较大。
- 用户级线程(User Level Threads,ULT):开放给程序员的、可以通过线程库(如python的Threading.py)创建的线程。用户级线程只存在于用户空间,内核并不能看到用户线程,并且内核资源的分配仍然是按照进程进行分配的;各个用户线程只能在进程内进行资源竞争。用户级线程的优点是:1.同进程内线程切换不需要转换到内核空间,节省了内核空间;2.线程调度算法可以是进程内专用,由用户程序进行指定;3.用户级线程实现和操作系统无关。其缺点是:1.如果系统调用同一进程中某个线程时阻塞,整个进程阻塞;2.一个进程只能在一个cpu上获得执行。
- 用户级线程和内核级线程有着一对一、一对多和混合型的映射关系,具体映射关系由操作系统来决定。
线程创建可以通过函数或者子类的方式实现。The Thread
class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run()
method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__()
and run()
methods of this class。
from threading import Thread def desc(step): global num for i in range(step): # print("desc-----: ", num) num -= 1 print("----------num------------", num) def add(step): global num for i in range(step): # print("add: ", num) num += 1 print("----------num------------", num) if __name__ == ‘__main__‘: num = 0 # 由于共享进程资源,num被子线程共享 step = 1000 # 也可以作为参数传进去来共享变量,而进程必须用队列或者管道 p1 = Thread(target=desc, args=(step, )) p2 = Thread(target=add, args=(step, )) p1.start() p2.start() p1.join() p2.join() print(num)
from threading import Thread class Desc(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): print("desc-----: ", num) num -= 1 class Add(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): print("add: ", num) num += 1 if __name__ == ‘__main__‘: num = 0 step = 1000000 p1 = Desc(step) p2 = Add(step) p1.start() p2.start() p1.join() p2.join() print(num)
from threading import Thread class Desc(Thread): def __init__(self, step): super().__init__() self.step = step self.num = num def run(self): for i in range(self.step): print("desc-----: ", self.num) self.num -= 1 print("----------num------------", self.num) class Add(Thread): def __init__(self, step): super().__init__() self.step = step self.num = num def run(self): for i in range(self.step): print("add: ", self.num) self.num += 1 print("----------num------------", self.num) if __name__ == ‘__main__‘: num = 0 step = 1000 p1 = Desc(step) p2 = Add(step) p1.start() p2.start() p1.join() p2.join() print(num)
from threading import Thread from threading import (active_count, current_thread, get_ident, enumerate, main_thread) import time class Example(Thread): def __init__(self): super().__init__() def run(self): print("current_thread: ", current_thread()) # 当前线程标识符 print("get_ident: ", get_ident()) # 当前线程 time.sleep(3) print("-------------------------------------------------") if __name__ == ‘__main__‘: p1 = Example() p1.start() # p1.setDaemon(True) # 守护线程,主线程结束子线程如果没结束就直接被kill掉 print("active_count: ", active_count()) # 活跃线程数:2 print("enumerate: ", enumerate()) # 当前进程内活跃的线程对象 p1.join() # 主线程等待子线程结束再结束/不写时主线程结束,子线程继续执行 print("active_count: ", active_count()) # 活跃线程数:1 - 主线程 print("current_thread: ", current_thread()) # 当前线程标识符 print("get_ident: ", get_ident()) # 当前线程 print("main_thread: ", main_thread()) # 主线程对象
如果将上面的step设置一个非常大的值,那么num值就有各种结果。这里(解释器Cpython)就要说到全局解释锁GIL (Global interpreter Lock)。它有两个特点:
from threading import Thread from queue import Queue def desc(step): for i in range(step): num = q.get() - 1 print("desc-----: ", num) q.put(num) def add(step): for i in range(step): num = q.get() + 1 print("add: ", num) q.put(num) if __name__ == ‘__main__‘: q = Queue() # queue队列实现了线程安全 q.put(0) step = 1000000 p1 = Thread(target=desc, args=(step,)) p2 = Thread(target=add, args=(step,)) p1.start() p2.start() p1.join() p2.join() print(q.get())
import dis def add(num): num -= 1if __name__ == ‘__main__‘: print(dis.dis(add))
3 0 LOAD_FAST 0 (num)
2 LOAD_CONST 1 (1)
6 STORE_FAST 0 (num)
8 LOAD_CONST 0 (None)
线程锁Lock是在保证原子操作的基础上,对共享变量进行同步限制。根据同步原语(获得锁 -- dosomething -- 释放锁),Lock有两个方法acquire和release。前者获取锁,release释放锁,中间部分则是不可分割的代码逻辑。线程锁是全局对象,用于操作所有线程。
from threading import Thread, Lock class Desc(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): lock.acquire() num -= 1 lock.release() class Add(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): lock.acquire() num += 1 lock.release() if __name__ == ‘__main__‘: num = 0 step = 1000000 lock = Lock() p1 = Desc(step) p2 = Add(step) p1.start() p2.start() p1.join() p2.join() print(num)
from threading import Thread, Lock, RLock class Desc(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): lock.acquire() num -= 1 lock.release() class Add(Thread): def __init__(self, step): super().__init__() self.step = step def run(self): global num for i in range(self.step): lock.acquire() num += 2 lock.acquire() num -= 1 lock.release() lock.release() if __name__ == ‘__main__‘: num = 0 step = 1000000 lock = RLock() p1 = Desc(step) p2 = Add(step) p1.start() p2.start() p1.join() p2.join() print(num)
from threading import Thread, Semaphore, current_thread import time class Fn(Thread): def __init__(self, sm): super().__init__() self.sm = sm def run(self): self.sm.acquire() print(‘current_thread: {}, {}‘.format(current_thread().name, current_thread().ident)) time.sleep(2) self.sm.release() if __name__ == ‘__main__‘: sm=Semaphore(3) t_list = [] for i in range(10): t = Fn(sm) t_list.append(t) for t in t_list: t.start()
A condition variable obeys the context management protocol: using the with
statement acquires the associated lock for the duration of the enclosed block.
The wait()
method releases the lock, and then blocks until another thread awakens it by calling notify()
or notify_all()
. Once awakened, wait()
re-acquires the lock and returns. It is also possible to specify a timeout.
from threading import Thread, Condition class Poetry1(Thread): def __init__(self, con, poetry): super().__init__() self.poetry = poetry self.con = con def run(self): global lis with self.con: for line in self.poetry: lis.append(line) self.con.notify() self.con.wait() class Poetry2(Thread): def __init__(self, con, poetry): super().__init__() self.poetry = poetry self.con = con def run(self): global lis with self.con: for line in self.poetry: self.con.wait() lis.append(line) self.con.notify() if __name__ == ‘__main__‘: con = Condition() lis = [] poy1 = ["楚国多豪俊,", "每与豺狼交,"] poy2 = ["相比得剑术。", "片血不沾衣。"] p1 = Poetry1(con, poy1) p2 = Poetry2(con, poy2) p2.start() # 必须让wait的线程先跑起来,从新生状态转到阻塞状态,等待notify激活 p1.start() p1.join() p2.join() print("\r\n".join(lis))
第二种写法: con.acquire()和con.release()。
from threading import Thread, Condition class Poetry1(Thread): def __init__(self, con, poetry): super().__init__() self.poetry = poetry self.con = con def run(self): global lis self.con.acquire() for line in self.poetry: lis.append(line) self.con.notify() self.con.wait() self.con.release() class Poetry2(Thread): def __init__(self, con, poetry): super().__init__() self.poetry = poetry self.con = con def run(self): global lis self.con.acquire() for line in self.poetry: self.con.wait() lis.append(line) self.con.notify() self.con.release() if __name__ == ‘__main__‘: con = Condition() lis = [] poy1 = ["楚国多豪俊,", "每与豺狼交,"] poy2 = ["相比得剑术。", "片血不沾衣。"] p1 = Poetry1(con, poy1) p2 = Poetry2(con, poy2) p2.start() p1.start() p1.join() p2.join() print("\r\n".join(lis))
# Consume one item with cv: while not an_item_is_available(): cv.wait() get_an_available_item() # Produce one item with cv: make_an_item_available() cv.notify()
生产者(Producer): 如果队列中的包子数量小于20,立刻生产10个包子;消费者(Consumer):如果队列中的包子数量大于20,立刻消费3个包子。
from threading import Thread, Condition, current_thread from queue import Queue import time class Producer(Thread): def __init__(self, con, q): super().__init__() self.con = con self.q = q def run(self): while True: with self.con: while self.q._qsize() > 20: self.con.wait() for i in range(10): self.q.put("包子") print("{}: 生产了10个包子.".format(current_thread().name)) self.con.notify() class Consumer(Thread): def __init__(self, con, q): super().__init__() self.con = con self.q = q def run(self): while True: with self.con: while self.q._qsize() < 20: self.con.wait() time.sleep(2) for i in range(3): self.q.get() print("{}: 消费了3个包子。".format(current_thread().name)) self.con.notify() if __name__ == ‘__main__‘: q = Queue() con = Condition() t_list = [] for i in range(4): t = Producer(con, q) t_list.append(t) for i in range(10): t = Consumer(con, q) t_list.append(t) for t in t_list: t.start() for t in t_list: t.join()
from concurrent.futures import ThreadPoolExecutor import time def fn(num): print(num) time.sleep(2) if __name__ == ‘__main__‘: executor = ThreadPoolExecutor(max_workers=2) # 创建一个线程池 executor.submit(fn, 100) executor.submit(fn, 200) # 提交执行,第一个参数是函数,第二个参数是函数的参数
import concurrent.futures import urllib.request URLS = [‘http://www.foxnews.com/‘, ‘http://www.cnn.com/‘, ‘http://europe.wsj.com/‘, ‘http://www.bbc.co.uk/‘, ‘http://some-made-up-domain.com/‘] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print(‘%r generated an exception: %s‘ % (url, exc)) else: print(‘%r page is %d bytes‘ % (url, len(data)))
from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Thread from queue import Queue import urllib from bs4 import BeautifulSoup class UrlMaker(Thread): def __init__(self, number): super().__init__() self.number = number def run(self): num = 2024354 for i in range(self.number): url = "https://www.ybdu.com/xiaoshuo/10/10237/{}.html".format(num) q.put(url) print(url) num += 1 q.put("over") def urlParser(file): url = q.get() # 从列表中获取url if url == "over": return { "code": False, "url": False, } else: html = urllib.request.urlopen(url) # 请求html html_bytes = html.read() # 读取字节数据 soup = BeautifulSoup(html_bytes, "html.parser") title = soup.find("div", attrs={"class": "h1title"}).h1.get_text() string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text() # 获取小说内容 lines = string.split() with open(file, mode="a", encoding="utf-8") as f: # 写入文件 f.write(title + "\r\n") for i, line in enumerate(lines[: -6]): f.write(" " + line + "\r\n") return { "code": True, "url": url, "title": title, } def callback(msg): if msg["code"]: print("Process handled url: {}, title: {}.".format(msg["url"], msg["title"])) else: print("All urls had parsed.") if __name__ == ‘__main__‘: q = Queue() executor = ThreadPoolExecutor(max_workers=5) p1 = UrlMaker(10) p1.start() p1.join() task_list = [] for i in range(20): task = executor.submit(urlParser, "天龙八部1.txt") task_list.append(task) for task in as_completed(task_list): # as_completed是任务执行后的一些数据的封装 data = task.result() # 获取执行结果 print("Task: {} , data: {}.".format(task, data["url"]))
标签:epo poetry init 模块 hat format nts div doc