标签:9.png utf-8 exception string height exchange mamicode throws ima
http://www.rabbitmq.com/tutorials/tutorial-three-java.html
生产者
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.util.ConnectionUtils; public class Send { private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //发送信息 String msg = "hello"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); conn.close(); } }
消费者1
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class Receive { private static final String QUEUE_NAME="test_queue1"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机转发器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者1接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者1处理完成!"); //手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 //自动应答false boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.util.ConnectionUtils; public class Receive2 { private static final String QUEUE_NAME="test_queue"; private static final String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = ConnectionUtils.getConnection(); Channel channel = conn.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机转发器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel){ //收到消息就会触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("消费者2接收到的消息" + msg); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("消费者2处理完成!"); //手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 //自动应答false boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
一个消息 可以被多个消费者
后台进行查看:
5、RabbitMQ-订阅模式 Publish/Subscribe
标签:9.png utf-8 exception string height exchange mamicode throws ima
原文地址:https://www.cnblogs.com/Mrchengs/p/10531050.html