标签:byte sicp bool factory 设置 author work persist rod
简单使用:
package com.imooc.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Producer代码 服务端-生产者
* @author cxsz-hp16
* @Title: Sender
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:02
*/
public class Sender {
//消息名
private final static String QUEUE_NAME = "MyQueue";
public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();
/**
* 队列声明
* 参数:queue:队列名、durable:是否持久化、exclusive:是否排外、arguments:设置队列消息什么时候被删除
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//设置消息内容
String message = "my first message";
/**
*
* 参数:
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("已发消息:"+message);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}
package com.imooc.cusumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 客户端-消费者
* @author cxsz-hp16
* @Title: Receiver
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2315:31
*/
public class Receiver {
private final static String QUEUE_NAME = "MyQueue";
public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//回调消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}
package com.imooc.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask {
//消息名
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}
package com.imooc.cusumer;
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker {
private final static String QUEUE_NAME = "newTask";
public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}
}
package com.imooc.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* @author cxsz-hp16
* @Title: NewTask
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:30
*/
public class NewTask23 {
//消息名
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) {
send();
}
//发送消息
public static void send(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
//设置参数
factory.setHost("localhost");
//创建连接
connection = factory.newConnection();
//创建管道
channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//设置消息内容
String message = "2.";
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN
,message.getBytes("UTF-8"));
System.out.println(" [x] Sent ‘" + message + "‘");
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
}
package com.imooc.cusumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author cxsz-hp16
* @Title: Worker
* @ProjectName demotest
* @Description: TODO
* @date 2018/8/2316:31
*/
public class Worker3 {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) {
receiver();
}
public static void receiver(){
//创建引用
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicQos(1);
//回调消费消息
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "‘");
try {
doWork(message);//设置一个任务
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,false,consumer);
}catch (Exception e){
e.printStackTrace();
}finally {
// try {
// //关闭资源
// channel.close();
// connection.close();
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
/**
* 任务
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
//将字符串转换为字符数组
for (char ch: task.toCharArray()) {
//当值为.时,阻塞线程来达到耗时的目的
if (ch == ‘.‘){
Thread.sleep(1000);
}
}
}
}
标签:byte sicp bool factory 设置 author work persist rod
原文地址:https://www.cnblogs.com/zhangbLearn/p/9525201.html