码迷,mamicode.com
首页 > 其他好文 > 详细

【rocketMQ】1、搭建MQ服务器,生产一个订单与消费一个订单

时间:2018-01-21 23:52:38      阅读:507      评论:0      收藏:0      [点我收藏+]

标签:配置   maven   order   ring   term   bubuko   访问   long   ide   

1、 先解压

 

 技术分享图片 

2、 maven编译安装、(注意虚拟机采用nat网络模式,需要联网)

 

mvn -Prelease-all -DskipTests clean install -U

 

 技术分享图片

技术分享图片

技术分享图片

 

 技术分享图片

 

 

 

启动nameser节点

 技术分享图片

 

 

启动broker

 技术分享图片

 

 

 

 

nohup sh bin/mqbroker -n localhost:9876 & tail -f namesrv.log

 

 

出错,

 技术分享图片

 

 

 

 

修改内存配置

 

 技术分享图片

 

 技术分享图片

 

 

修改为

 

 技术分享图片

 

 

 

修改broken

 

 技术分享图片

 

 

 技术分享图片

 

 

 

 这里我吃了大亏,主机对虚拟机中的端口访问不通!!!

注意一定要关闭防火墙,或者开启9876等需要使用的端口,不然无法远程调用!

再次启动

 

nohup bin/mqnamesrv > namesrv.log 2>&1 & tail -f namesrv.log

 

 技术分享图片

 

 

 

nohup bin/mqbroker -n 127.0.0.1:9876 > broker.log 2>&1 & tail -f broker.log

 技术分享图片

 

 

 

 

、、测试案例

 这个是官网的,其实这个无所谓,等会使用代码远程发送订单

> export NAMESRV_ADDR=localhost:9876

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

 

 

停止服务,这个也无所谓,实在不行直接 kill -9 pid  吧进程杀死也是可以的

 

 技术分享图片

 

来,开始发送第一单!!!

 

package tttt.mq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Test;

public class MqProductTest {

	@Test
	public void test1() {
		DefaultMQProducer producer = new DefaultMQProducer("xiaof_test");
		producer.setNamesrvAddr("192.168.0.128:9876");
		try {
			producer.start();
			for (int i = 0; i < 2; i++)
				try {
					{
						Message msg = new Message("Topic1", "TagA", "OrderID188",
								"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//						SendResult sendResult = producer.send(msg);
//						System.out.printf("%s%n", sendResult);
						
						producer.sendOneway(msg);
						
					}
				} catch (Exception e) {
					e.printStackTrace();
				}

		} catch (MQClientException e) {
			e.printStackTrace();
		} finally {
			producer.shutdown();
		}
	}

}

  

解压来,我们消费掉这个

package tttt.mq;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Test;

public class MqConsumeTest {
	
	@Test
	public void test1() {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaof_test");
		consumer.setNamesrvAddr("192.168.0.128:9876");
		
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		
		try {
			consumer.subscribe("Topic1", "TagA");
			
			consumer.registerMessageListener(new MessageListenerOrderly() {
				
				AtomicLong consumeTimes = new AtomicLong(0);
				
				@Override
				public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
					
					//这个要是false,服务器就会不断重复发送消息
					context.setAutoCommit(true);
					MessageExt msg = msgs.get(0);
					String data = new String(msg.getBody());

					System.out.printf("%s 消费信息线程与数据: %s %n", Thread.currentThread().getName(), data);
					
					this.consumeTimes.incrementAndGet();
					if ((this.consumeTimes.get() % 2) == 0) {
						return ConsumeOrderlyStatus.SUCCESS;
					} else if ((this.consumeTimes.get() % 3) == 0) {
						return ConsumeOrderlyStatus.ROLLBACK;
					} else if ((this.consumeTimes.get() % 4) == 0) {
						return ConsumeOrderlyStatus.COMMIT;
					} else if ((this.consumeTimes.get() % 5) == 0) {
						context.setSuspendCurrentQueueTimeMillis(3000);
						return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
					}
					return ConsumeOrderlyStatus.SUCCESS;

				}
			});
			
			consumer.start();

	        System.out.printf("Consumer Started.%n");
			
		} catch (MQClientException e) {
			e.printStackTrace();
		}
	}
}

  

来一发效果:

 

技术分享图片

 

 

 技术分享图片

 

 

 技术分享图片

 

 技术分享图片

 

技术分享图片

 

 

 

 

 技术分享图片

 

 

 这个是消费msg中的全部信息:

 

技术分享图片

 

【rocketMQ】1、搭建MQ服务器,生产一个订单与消费一个订单

标签:配置   maven   order   ring   term   bubuko   访问   long   ide   

原文地址:https://www.cnblogs.com/cutter-point/p/8325959.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!