标签:options ring via system receives worker define foreach other
/* Asynchronous request-reply single-threaded server in Python that spawns a request handler each time a request is received This is different from other examples because the number of request handler threads is not defined ahead of time. Request: Client DEALER --> Server ROUTER --> Request handler (spawned) 1. Clients send requests via a DEALER socket on port 5570 2. Server receives requests via a ROUTER socket on port 5570 3. Server passes both the request and the client identity directly to request handlers when they are spawned Reply: Client DEALER <-- Server ROUTER <-- Server DEALER <-- Request handler DEALER 1. Request handler returns the reply to the Server via a DEALER socket on inproc 2. Server receives the reply from the request handler via a DEALER socket on inproc 3. Server sends the reply to the client via a ROUTER socket on port 5570 4. Client receives the reply via a DEALER socket on port 5570 */ using System; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace NetmqSample { public class ZmqClient { public void Request(string input) { var socket = new DealerSocket(); socket.Options.Identity = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); socket.Connect("tcp://127.0.0.1:5570"); socket.SendFrame(input); Console.WriteLine($"client send: {input} : {DateTime.Now:T}"); var answer = socket.ReceiveFrameString(); Console.WriteLine($"client received: {answer} : {DateTime.Now:T}"); socket.Dispose(); } } public class ZmqServer { private DealerSocket _backend; private RouterSocket _frontend; public void Run() { _frontend = new RouterSocket(); _frontend.Bind("tcp://*:5570"); _frontend.ReceiveReady += Frontend_ReceiveReady; _backend = new DealerSocket(); _backend.Bind("inproc://backend"); _backend.ReceiveReady += Backend_ReceiveReady; var poller = new NetMQPoller { _frontend, _backend }; poller.RunAsync(); Console.WriteLine("server started"); } private void Backend_ReceiveReady(object sender, NetMQSocketEventArgs e) { var id = e.Socket.ReceiveFrameString(); var msg = e.Socket.ReceiveFrameString(); Console.WriteLine($"server backend response: {id} : {msg}"); _frontend.SendFrame(id, true); _frontend.SendFrame(msg); } private void Frontend_ReceiveReady(object sender, NetMQSocketEventArgs e) { var id = e.Socket.ReceiveFrameString(); var msg = e.Socket.ReceiveFrameString(); //Console.WriteLine($"server frontend received: {id} : {msg} : {DateTime.Now:T}"); var task = new Task(() => new RequestHandler().Run(id, msg), TaskCreationOptions.LongRunning); task.Start(); } } public class RequestHandler { public void Run(string id, string msg) { var worker = new DealerSocket("inproc://backend"); // Simulate a long-running operation Thread.Sleep(2000); worker.SendFrame(id, true); worker.SendFrame(msg + " : " + DateTime.Now.ToLongTimeString()); worker.Dispose(); } } }
class Program { static void Main(string[] args) { var server = new ZmqServer(); server.Run(); Enumerable.Range(0, 2000).ToList().ForEach(x => { Task.Factory.StartNew(() => new ZmqClient().Request(x.ToString("0000")), TaskCreationOptions.LongRunning); }); Console.ReadLine(); } }
标签:options ring via system receives worker define foreach other
原文地址:http://www.cnblogs.com/zhahost/p/6013550.html