标签:cli coding 消息队列 down 操作 world lob str color
rabbitmq-plugins enable rabbitmq_management
nuget 程序 RabbitMQ 安装完成后开始测试代码。
服务端:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Publish { class Program { static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "dp"; factory.Password = "as1234"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { bool durable = true;//设置队列持久化 channel.QueueDeclare("hello", durable, false, false, null); string message = GetMessage(args); var properties = channel.CreateBasicProperties(); properties.Persistent = true;//设置消息持久化 properties.DeliveryMode = 2;//或者可以使用 properties.SetPersistent(true); //需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", "hello", properties, body); Console.WriteLine(" send {0}", message); } } Console.ReadKey(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }
客户端:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Receive { class Program { static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.UserName = "dp"; factory.Password = "as1234"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("hello", false, false, false, null); channel.BasicQos(0, prefetchCount:1, global:false);//配置公平分发机制 设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时 var consumer = new QueueingBasicConsumer(channel); bool autoAck = false;//设置是否自动ack 默认是true channel.BasicConsume("hello", autoAck, consumer); Console.WriteLine(" waiting for message."); while (true) { var ea = consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); int dots = message.Split(‘.‘).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine("Received {0}", message); channel.BasicAck(ea.DeliveryTag, false);//ack } } } } } }
以上没有考虑 Exchange和routingKey绑定的模式。只是针对某个队列的操作。其他相关信息参考地址:http://www.cnblogs.com/sheng-jie/p/7192690.html
标签:cli coding 消息队列 down 操作 world lob str color
原文地址:http://www.cnblogs.com/q975261413/p/7631522.html