标签:线程安全 except pass run 编写 生产者消费者 ddr thread 消费者
函数:生产者和消费者
import random from queue import Queue from threading import Thread, current_thread import time # 实例化一个队列 myq = Queue() # 定义生产者 def producer(): while True: tmp = random.randint(1,100) myq.put(tmp) print("%s生产了%s,生产后,现在产品总量:%s" % (current_thread().name, tmp, myq.qsize())) time.sleep(0.5) # 定义消费者 def consumer(): while True: print("%s消费了%s,剩余产品%s" % (current_thread().name, myq.get(), myq.qsize())) time.sleep(1.1) # 启动生产者和消费者 # 启动生产者 tp = Thread(target=producer) tp.start() # 启动消费者 for i in range(2): tc = Thread(target=consumer) tc.start()
函数2:
# coding:utf-8 from queue import Queue from threading import Thread,current_thread import random import time # 实例化一个队列,线程安全 myq = Queue() # 定义生产者 def produce(): while True: tmp = random.randint(1,100) myq.put(tmp) print(‘%s生产了%s‘ % (current_thread().name,tmp)) time.sleep(0.5) # 消费者 def consumer(): while True: print(‘%s消费了%s‘ % (current_thread().name, myq.get())) time.sleep(1) # 启动生产者和消费者 t_p = Thread(target=produce) t_p.start() # 启动消费者 for i in range(2): t_cs = Thread(target=consumer) t_cs.start()
函数3:
# 编写一个基于tcp的echo服务器(回响服务器,即将客户端发送的信息返回给客户端), # 要求使用线程和生产者消费者模型(提示:一个线程accept--生产者;两个线程用于接收和发送--消费者)。 import socket from threading import Thread, current_thread from queue import Queue # 生产者 def accept_t(queue): print("当前线程",current_thread().name) # client_info = server.accept() # queue.put(client_info) # 消费者recv def recv_t(queue, queue_data): client_info = queue.get() client_sock = client_info[0] data = client_sock.recv(1024) queue_data.put(data) pass try: print(data.decode()) except: print(data.decode(‘gbk‘)) # 消费者send def send_t(queue_data): data = queue_data.get() client_sock = client_info[0] client_sock.send(data) client_sock.close() pass if __name__ == "__main__": client_info = None server = None # 创建服务器的套接字(监听套接字) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置地址复用属性 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定IP和端口 server_address = ("", 7972) server.bind(server_address) # 监听 server.listen(128) queue = Queue() queue_data = Queue() t1 = Thread(target=accept_t, args=(queue)) t1.start() t2 = Thread(target=recv_t, args=(queue, queue_data)) t2.start() t3 = Thread(target=send_t, args=(queue_data,)) t3.start() t1.join() t2.join() t3.join()
类:生产者和消费者
import socket from queue import Queue from threading import Thread import time import chardet client_queue = Queue() # 生产者 class Producer(Thread): def __init__(self, tcp_server): super().__init__() self.tcp_server = tcp_server def run(self): client_info = self.tcp_server.accept() client_queue.put(client_info) # 消费者 class Consumer(Thread): def __init__(self): super().__init__() def run(self): client_info = client_queue.get() client_sock = client_info[0] client_addr = client_info[1] msg = client_sock.recv(1024) print("原始字节流:",msg) a = ‘abcd‘.encode("UTF-8") print(‘a:‘, a) # a = msg.decode() code = chardet.detect(a) print(‘获取到a的编码是‘,code[‘encoding‘]) print("%s说:%s" % (client_addr, msg.decode())) client_sock.send(msg.decode().encode(‘gbk‘)) client_sock.close() print(‘consumer is over‘) # 主函数 def main(): tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_server.bind(("", 7892)) tcp_server.listen(128) p = Producer(tcp_server) c1 = Consumer() # c2 = Consumer() p.start() c1.start() # c2.start() # time.sleep(2) p.join() c1.join() # c2.join() tcp_server.close() if __name__ == ‘__main__‘: main()
标签:线程安全 except pass run 编写 生产者消费者 ddr thread 消费者
原文地址:https://www.cnblogs.com/andy9468/p/8988326.html