码迷,mamicode.com
首页 > Windows程序 > 详细

c#生产/消费RabbitMQ

时间:2017-02-05 20:34:16      阅读:824      评论:0      收藏:0      [点我收藏+]

标签:bsp   queue   private   for   通道   rgs   persist   var   测试   

public sealed class JsonSerializer  
    {
        public static byte[] Serialize(object message)
        {
            return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
        }

        public static object Deserialize<T>(byte[] bytes)
        {
            return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(bytes));
        }
    }
    public sealed class BinarySerializer  
    {
        public static byte[] SerializeToBytes(object obj)
        {
            var formatter = new BinaryFormatter();
            using (var stream = new MemoryStream())
            {
                formatter.Serialize(stream, obj);

                return StreamUtil.ReadAllBytes(stream);
            }
        }
       
        public static object DeserializeFromBytes(byte[] bytes)
        {
            var formatter = new BinaryFormatter();
            using (var stream = new MemoryStream(bytes))
            {
                return formatter.Deserialize(stream);
            }
        }
    }
private static bool RawPublishMessage()
        {
            var exchange = "TestExchangeRouting...";
            var routingKey = "rk";
            Uri uri = new Uri("amqp://192.168.1.1:1234/");
            ConnectionFactory factory = new ConnectionFactory();

            factory.UserName = "guest";  
            factory.Password = "guest";
            factory.VirtualHost = "/";  
            factory.RequestedHeartbeat = 0;
            factory.Endpoint = new AmqpTcpEndpoint(uri);

            //创建一个连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个通道
                using (IModel channel = connection.CreateModel())
                {
                    //声明一个路由
                    channel.ExchangeDeclare(exchange, "direct");
                    var queueOk = channel.QueueDeclare("testQueue", true, false, false, null);
                    channel.QueueBind(queueOk.QueueName, exchange, routingKey);

                    var model = new Order
                    {
                        Id = 100021,
                        Title = "工一一个测试Test"
                    }; //这个才是具体的发送内容  

                    var body = JsonSerializer.Serialize(model);

                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.ContentType = typeof(Order).AssemblyQualifiedName;
                    properties.ContentEncoding = "JSON";

                    //写入  
                    channel.BasicPublish(exchange, routingKey, properties, body);
                    Console.WriteLine("写入成功");
                }

            }
            return false;
        }

        private static bool RawGetMessage()
        {
            var exchange = "TestExchangeRouting...";
            var routingKey = "rk";
            Uri uri = new Uri("amqp://192.168.1.1:1234/");
            ConnectionFactory factory = new ConnectionFactory();

            factory.UserName = "guest";  
            factory.Password = "guest";
            factory.VirtualHost = "/";  
            factory.RequestedHeartbeat = 0;
            factory.Endpoint = new AmqpTcpEndpoint(uri);

            //创建一个连接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建一个通道
                using (IModel channel = connection.CreateModel())
                {
                    var basicConsumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("testQueue", false, basicConsumer);

                    while (true)
                    {
                        try
                        {
                            BasicDeliverEventArgs basicDeliverEventArgs  ;
                            basicConsumer.Queue.Dequeue(1000, out basicDeliverEventArgs);
                            if (basicDeliverEventArgs == null)
                            {
                                break;
                            }

                            Task.Run(() =>
                            {
                                channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
                            }).Wait();
                            
                             
                            var body = JsonSerializer.Deserialize<Order>(basicDeliverEventArgs.Body);
                            Console.WriteLine(string.Format("RoutingKey:{0},Body:{1}", basicDeliverEventArgs.RoutingKey,
                                 JsonConvert.SerializeObject(body, Formatting.Indented)));
                        }
                        catch (Exception)
                        {
                            break;
                        }
                    }

                    channel.Close();
                }
                connection.Close();
            }

            return false;
        }

 

public static byte[] ReadAllBytes(Stream stream)
        {           

            var bytes = new byte[stream.Length];

            stream.Seek(0, SeekOrigin.Begin);

            for (var i = 0; i < stream.Length; i++)
            {
                bytes[i] = (byte)stream.ReadByte();
            }

            return bytes;
        }

 

c#生产/消费RabbitMQ

标签:bsp   queue   private   for   通道   rgs   persist   var   测试   

原文地址:http://www.cnblogs.com/zhshlimi/p/6368388.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!