标签:stat font 基于 ade argument ringbuf root dir byte
@Configuration public class RabbirnqConfig { private static final String RABBIT_HOST = "localhost"; private static final String RABBIT_USERNAME = "root"; private static final String RABBIT_PASSWORD = "root"; private static final int RABBIT_PORT = 5672; @Bean public static Connection getConnection(){ Connection connection = null; ConnectionFactory connectionFactory = new ConnectionFactory(); //连接rabbitMq,第一种方式,通过分配属性 connectionFactory.setHost(RABBIT_HOST);//主机名 connectionFactory.setUsername(RABBIT_USERNAME);//用户名 connectionFactory.setPassword(RABBIT_PASSWORD);//密码 connectionFactory.setPort(RABBIT_PORT);//端口 try { //连接rabbitMq,第二种方式,使用uri,如下会有默认值 //connectionFactory.setUri("amqp:// userName:password @ hostName:portNumber / virtualHost"); connection = connectionFactory.newConnection(); }catch (Exception e){ e.printStackTrace(); } return connection; } }
解释几个频繁使用的方法:
获取连接:Connection connection = RabbirnqConfig.getConnection();
创建通道:Channel channel = connection.createChannel();
声明交换机:channel.exchangeDeclare(EXCHANGE_NAME_FANOUT,"fanout",true,true,null);、
exchangeDeclare有很多的方法重载。以参数最多的为例。此处列出每个参数,方便具体使用哪个重载方法时,里面用到的参数都可以在这里找到代表的意思
DeclareOk exchangeDeclare(String var1, String var2, boolean var3, boolean var4, boolean var5, Map<String, Object> var6) throws IOException;
var1(exchange
):交换器名称
var2(BuiltinExchangeType type
):交换器类型,可选值:DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")
var3(booleandurable):是否持久化,durable设置为true表示持久化,反之是非持久化,持久化的可以将交换器存盘,在服务器重启的时候不会丢失信息
var4(boolean autoDelete):是否自动删除,设置为TRUE则表是自动删除,自删除的前提是至少有一个队列或者交换器与这交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑,一般都设置为fase
var5(boolean internal):是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式
var6(Map<String, Object> arguments):其它一些结构化参数比如:alternate-exchange
消息发布:channel.basicPublish(EXCHANGE_NAME_FANOUT,"",null,message.getBytes());
public void fanoutPpro() throws Exception {//生产消息 Connection connection = RabbirnqConfig.getConnection(); //创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME_FANOUT,"fanout",true,true,null); String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME_FANOUT,"",null,message.getBytes()); System.out.println("生产者 send :"+message); channel.close(); connection.close(); }
几个方法解释:
创建队列:channel.queueDeclare(F_QUEUE_NAME,false,false,false,null); 此方法同一个队列名只能创建一次,如果再次接受消息是还是用该方法创建同一个队列名,会抛出异常(再次测试这个队列接受消息,把这个方法注释掉即可)
queueDeclare有多个重载方法,以参数最多的这个为例解释
com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
如果queueDeclare()不带参数,该方法默认创建一个由RabbitMq命名的名称,这种队列也称之为匿名队列,排他 的,自动删除的,非持久化的队列queueDeclare()不带参数方法默认创建一个由RabbitMq命名的名称,这种队列也称之为匿名队列,排他的,自动删除的,非持久化的队列
var1(queue):队列名称
var2(durable):是否持久化, true ,表示持久化,会存盘,服务器重启仍然存在,false,非持久化
var3(exclusive):exclusive : 是否排他的,true,排他。如果一个队列声明为排他队列,该队列会对首次声明它的连接可见,并在连接断开时自动删除;排他是基于连接的Connection可见的,同一个连接的不同信道是可以同时访问同一个连接创建的排他队列
如果一个连接已经声明了一个排他队列,其它连接是不允许建立同名的排他队列,这个与普通队列不同,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除
var4(autoDelete);是否自动删除,true,自动删除,自动删除的前提:至少有一个消息者连接到这个队列,之后所有与这个队列连接的消息都断开时,才会自动删除;注意,生产者客户端创建这个队列,或者没有消息者客户端连接这个队列时,不会自动删除这个队列
var5(arguments):其它一些参数。如:x-message-ttl,之类
绑定到交换机:channel.queueBind(F_QUEUE_NAME,EXCHANGE_NAME_FANOUT,"");
public void fanoutCus1() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(F_QUEUE_NAME,false,false,false,null); channel.queueBind(F_QUEUE_NAME,EXCHANGE_NAME_FANOUT,""); channel.basicQos(5);//限制此通道预取数为5,当该队列没对接受到的消息进行回复时,不会再给其新的消息 StringBuffer message = new StringBuffer(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("接到消息1:"+message);
channel.basicAck(envelope.getDeliveryTag(),true);//手动回复确认
} };
channel.basicConsume(F_QUEUE_NAME,false,defaultConsumer);//对接收到的消息自动回复确认
}
重点解释:
消费者消息确认方式分为:
自动(交货无需确认,又称“开火即忘”)
手动(送货需要客户确认)
消息回复设置:channel.basicConsume(F_QUEUE_NAME,false,defaultConsumer);
basicConsume(String var1, boolean var2, Consumer var3) throws IOException;
当var2为true,表示自动回复。
为false表示手动回复,此时需要与channel.basicQos()方法配合使用,限制消息交付数量,避免消费者超负荷
channel.basicQos():限制此通道预取数,该值定义通道上允许的未确认交付的最大数量。通过使用basic.qos方法设置“预取计数”值。一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非已确认至少一个未处理的消息。
举例:现在通道里有1、2、3、4四个未确认交付的消息,队列channel里目前也有四个未确认的消息,且队列channel设置的预取值为2,现在chennel确认处理了其内部3个消息,那么就会再从通道取两个未确认的消息,因为其设置的预取值为2,即便channel里面还有一个空位也只会取2个消息
手动回复确认:channel.basicAck();
void basicAck(long var1, boolean var3) throws IOException;
var1(deliveryTag):该消息的id,即RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID
var2(multiple):是否批量.true:将一次性处理所有小于等于传入值的所有消息
public void directPpro() throws Exception { Connection connection = RabbirnqConfig.getConnection(); //创建通道 Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct"); //创建队列 //channel.queueDeclare(QUEUE_NAME,false,false,false,null); // for (int i = 0; i < 20; i++) { // message = message + i; // channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); // Thread.sleep(1000); // } String message = "这是消息B"; channel.basicPublish(EXCHANGE_NAME_DIRECT, "B", null, message.getBytes()); String messageA = "这是消息A"; channel.basicPublish(EXCHANGE_NAME_DIRECT, "A", null, messageA.getBytes()); String messageC = "这是消息C"; channel.basicPublish(EXCHANGE_NAME_DIRECT, "C", null, messageC.getBytes()); System.out.println("生产者 send :"+message); System.out.println("生产者 send :"+messageA); System.out.println("生产者 send :"+messageC); channel.close(); connection.close(); }
public void directCus1() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); //channel.queueDeclare(QUEUE_NAME1,false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct"); channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME_DIRECT,"A"); channel.basicQos(1);//当该队列没对接受到的消息进行回复时,不会再给其新的消息 StringBuffer message = new StringBuffer(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("接到消息A:"+message); } }; channel.basicConsume(QUEUE_NAME1,true,defaultConsumer);//对接收到的消息自动回复确认 }
public void directCus2() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME3,false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME_DIRECT,"direct"); channel.queueBind(QUEUE_NAME3,EXCHANGE_NAME_DIRECT,"A"); channel.basicQos(1);//限制此通道预取数为1,当该队列没对接受到的消息进行回复时,不会再给其新的消息 StringBuffer message = new StringBuffer(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("接到消息B:"+message); channel.basicAck(envelope.getDeliveryTag(),false);//手动回复确认 //deliveryTag:该消息的index multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。 channel.basicReject(envelope.getDeliveryTag(),true); } }; channel.basicConsume(QUEUE_NAME3,false,defaultConsumer);//对接收到的消息不自动回复确认,需手动回复 }
public void topicPro() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic"); String message = "主题订阅"; channel.basicPublish(EXCHANGE_NAME_TOPIC,"a.b",false,false,null,message.getBytes()); System.out.println("已发送:"+message); channel.close(); connection.close(); }
public void topicCus1() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("topic_cus1",false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic"); channel.queueBind("topic_cus1",EXCHANGE_NAME_TOPIC,"a.*"); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("t1: "+new String(body,"UTF-8")); } }; channel.basicConsume("topic_cus1",true,consumer); }
public void topicCus2() throws Exception{ Connection connection = RabbirnqConfig.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("topic_cus2",false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME_TOPIC,"topic"); channel.queueBind("topic_cus2",EXCHANGE_NAME_TOPIC,"a.*"); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("T2:"+new String(body,"UTF-8")); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("topic_cus2",false,consumer); }
标签:stat font 基于 ade argument ringbuf root dir byte
原文地址:https://www.cnblogs.com/Lk-skyhorse/p/13295261.html