标签:
一:为什么要使用消息队列呢?
在开发上一个APP后台时候,其中很重要的一块就是消息,通讯模块,使用的是开源的Openfire。
架构:
两台API服务器
两台Openfire服务器
若干数据库服务器集群
其中业务的很大一部分都需要发送消息,用户下了订单,用户取消订单,等等都需要服务器给用户来发送消息。使用的解决方式就是在Openfire的基础上规定了自己的消息格式。用户去操作,然后API服务器通知Openfire服务器去发送消息。Openfire服务器去接受来自API服务器的请求,然后去给用户发送消息。
这个其中就有一个问题,无数的接口都会告诉Openfire服务器去发送消息,当请求多到一定程度的时候Openfire服务器肯定是处理不过来的。
解决办法:
先使用速度快的东西先把需要发送的消息首先存下来。然后测试出每秒Openfire服务器在可以接受的请求次数,然后每秒从存下来的消息中按顺序取出消息来发送。
在没有太多经验的情况下,选了个据说最简单的redis,测试了下勉强还可以。
这是我第一次使用消息队列的经历。
二:使用消息队列
在接手新项目时候,是电商类APP,其中有一个非常重要的功能就是,在不停的降价当中,在到一定的时候,公司内部的托一出手,后面会跟着很多人来出价。其中后台近期可能存在的压力很可能就出在这里。这里也是非常重要的一环。
为了保证这个环节的稳定性,决定采用activeMQ(为什么采用这个呢,多试点技术换换哈哈)
下面正式开始ActiveMQ的使用。
三:在测试服务器上面搭建ActiveMQ
首先下载ActiveMQ
下载网址:http://activemq.apache.org/activemq-5132-release.html
tar xzfv apache-activemq-5.13.2-bin.tar.gz 解压
cd cd apache-activemq-5.13.2/ 进入目录
./bin/activemq start 启动ActiveMQ
/sbin/iptables -I INPUT -p tcp --dport 8161 -j ACCEPT 打开ActiveMQ默认控制台端口
/sbin/iptables -I INPUT -p tcp --dport 61616 -j ACCEPT 打开ActiveMQ默认连接端口
/etc/rc.d/init.d/iptables save 保存
在测试服务器上安装ActiveMQ完毕
四:打开ActiveMQ控制台
然后点击queues 创建一个queues
然后我们就可以开始代码代码之路了!!!!!
五:使用java来测试ActiveMQ
创建一个生产者:
package com.nico.utils.ActiveMq; /** * Created by Administrator on 2016/4/27. */ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender extends Thread { public static void sendMessage(Session session, MessageProducer producer, int i) throws Exception { TextMessage message = session .createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } @Override public void run() { int messageNo = 1; while (true) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://{ip}:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer,messageNo); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } messageNo++; try { sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
创建一个消费者:
package com.nico.utils.ActiveMq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver extends Thread{ @Override public void run() { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://{ip}:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } sleep(3000); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
然后开启我们的测试:
package com.nico.utils.ActiveMq; /** * Created by Administrator on 2016/4/27. */ public class ActiveDemo { public static void main(String[] args){ Receiver receiverThread=new Receiver(); receiverThread.start(); Sender senderThread=new Sender(); senderThread.start(); } }
打印结果:
发送消息:ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息1
发送消息:ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息2
发送消息:ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息3
发送消息:ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息4
然后就可以简单看到测试结果。
然后将队列集成到系统中去就可以!
标签:
原文地址:http://www.cnblogs.com/wuwulalala/p/5439792.html