标签:fetch host adl class 常用 blog cli message rate
吃多了拉就是队列,吃饱了吐就是栈
Setx ERLANG_HOME “D:\Program Files\erl8.2″
rabbitmqctl status
确定rabbitmq状态Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
consumer:消息消费者,就是接收消息的程序。
rabbitmq-plugins enable rabbitmq_management
// 启用rabbitmq-plugins disable rabbitmq_management
// 禁用rabbitmqctl list_queues
// 查看队列rabbitmqctl list_users
// 查看所有用户rabbitmqctl add_user user_admin passwd_admin
// 添加用户rabbitmqctl set_user_tags user_admin administrator
// 添加权限rabbitmqctl delete_user guest
// 删除用户rabbitmqctl change_password {username} {newpassowrd}
// 修改密码rabbitmqctl add_vhost vhostpath
// 创建虚拟主机rabbitmqctl delete_vhost vhostpath
// 删除虚拟主机rabbitmqctl list_vhosts
// 列出所有虚拟主机var factory = new ConnectionFactory
{
HostName = hostName, // rabbit server
UserName = "admin",
Password = "admin",
Port = 5672, // Broker端口
VirtualHost = "/" // 虚拟Host,需提前配置
};
using (var connection = factory.CreateConnection()) // 创建与RabbitMQ服务器的连接
{
using (var channel = connection.CreateModel()) // 创建1个Channel(大部分API在该Channel中)
{
// 定义1个队列,自动会和默认的exchange 做direct类型绑定
channel.QueueDeclare(
queue: "hello", // 队列名称
durable: true, // 队列是否持久化
exclusive: false, // 排他队列:如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。(活动在一次连接内)
autoDelete: false, // 自动删除:当最后一个消费者取消订阅时,队列自动删除。如果您需要仅由一个使用者使用的临时队列,请将自动删除与排除。当消费者断??开连接时,队列将被删除。(至少消费者能连一次)
arguments: null); // 配置参数
var randomQueue = channel.QueueDeclare(); // 定义随机的队列 该队列为临时队列(排他队列 + 自动删除)
// 定义Exchange(一般而言,不需要定义exchange,rabbitmq默认创建了所有类型的exchange)
//channel.ExchangeDeclare("direct-demo", ExchangeType.Direct); // 定义direct exchange
//channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout); // 定义fanout exchange
//channel.ExchangeDeclare("topic-demo", ExchangeType.Topic); // 定义fanout exchange
// 定义queue exchange key 关系(在某些业务场景下,会使用该关系做路由功能)
//channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 默认绑定的关系和该行代码效果一样
//channel.QueueBind("hello", "amq.fanout", "hello"); // 该类型下的routingKey 实际不需要
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
while (true)
{
string message = "Hello World!" + DateTime.Now;
var body = Encoding.UTF8.GetBytes(message);
// 发送消息到队列中
channel.BasicPublish(
exchange: string.Empty, // 传递为Empty的时候,通过 `(AMQP default)`传递
routingKey: "hello", // routing key 与 queuebind中的binding key对应
basicProperties: properties, // 消息header
body: body); // 消息body:发送的是bytes 可以任意编码
Console.WriteLine(" [x] Sent {0}", message);
}
}
}
接收消息(以消息响应为例)
var factory = new ConnectionFactory
{
HostName = hostName, // rabbit server
UserName = "admin",
Password = "admin",
Port = 5672, // Broker端口
VirtualHost = "/" // 虚拟Host,需提前配置
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel); // 创建Consumer
consumer.Received += (model, ea) => // 通过回调函数异步推送我们的消息
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Thread.Sleep(1000);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息响应
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicQos(0, 1, false); // 设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息
channel.BasicConsume(queue: "hello",
noAck: false, // 需要消息响应(Acknowledgments)机制
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
标签:fetch host adl class 常用 blog cli message rate
原文地址:http://www.cnblogs.com/Leo_wl/p/6582811.html