标签:
ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。
ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ实现了JMS标准并提供了很多附加的特性。这些附加的特性包括,JMX管理(java Management Extensions,即java管理扩展),主从管理(master/salve,这是集群模式的一种,主要体现在可靠性方面,当主中介(代理)出现故障,那么从代理会替代主代理的位置,不至于使消息系统瘫痪)、消息组通信(同一组的消息,仅会提交给一个客户进行处理)、有序消息管理(确保消息能够按照发送的次序被接受者接收)。
ActiveMQ 支持JMS规范,ActiveMQ完全实现了JMS1.1规范。
JMS规范提供了同步消息和异步消息投递方式、有且仅有一次投递语义(指消息的接收者对一条消息必须接收到一次,并且仅有一次)、订阅消息持久接收等。如果仅使用JMS规范,表明无论您使用的是哪家厂商的消息代理,都不会影响到您的程序。
ActiveMQ主要涉及到5个方面:
<destinationPolicy> <policyMap> <policyEntries>
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" optimizedDispatch="true" memoryLimit=“2mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
找到下面这一段
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="90" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage>
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms2048M -Xmx2048M"
$cd $ACTIVEMQ_HOME $ ./activemq console
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.101:61616" /> <property name="useAsyncSend" value="true" /> <property name="alwaysSessionAsync" value="true" /> <property name="useDedicatedTaskRunner" value="true" /> </bean> <!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> </beans>
。。。。。。 <properties> <activemq_version>5.13.3</activemq_version> </properties> 。。。。。。 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq_version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq_version}</version> </dependency>
package webpoc; public class AMQSender { public static void sendWithAuto(ApplicationContext context) { ActiveMQConnectionFactory factory = null; Connection conn = null; Destination destination = null; Session session = null; MessageProducer producer = null; try { destination = (Destination) context.getBean("destination"); factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory"); conn = factory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); Message message = session.createTextMessage("...Hello JMS!"); producer.send(message); } catch (Exception e) { e.printStackTrace(); } finally { try { producer.close(); producer = null; } catch (Exception e) { } try { session.close(); session = null; } } catch (Exception e) { } try { conn.stop(); } catch (Exception e) { } try { conn.close(); } catch (Exception e) { } } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml"); sendWithAuto(context); } }
package webpoc; public class TranQConsumer extends Thread implements MessageListener { private Connection conn = null; private Destination destination = null; private Session session = null; public void run() { receive(); } public void receive() { ConnectionFactory factory = null; Connection conn = null; try { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq.xml"); factory = (ActiveMQConnectionFactory) context.getBean("targetConnectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = (Destination) context.getBean("destination"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); } catch (Exception e) { e.printStackTrace(); } } public void onMessage(Message message) { try { TextMessage tm = (TextMessage) message; System.out.println("TranQConsumer receive message: " + tm.getText()); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { TranQConsumer tranConsumer = new TranQConsumer(); tranConsumer.start(); } }
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
producer.send(message); System.out.println("send......" + Thread.currentThread().getId()); session.commit();
catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (Exception ex) { } }
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
try { TextMessage tm = (TextMessage) message; System.out.println("TranQConsumer receive message: " + tm.getText()); session.commit(); } catch (Exception e) { e.printStackTrace(); try { session.rollback(); } catch (Exception ex) { } }
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.0.101:61616" /> <property name="useAsyncSend" value="true" /> <property name="alwaysSessionAsync" value="true" /> <property name="useDedicatedTaskRunner" value="true" /> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> </bean> <!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#destination" redeliveryDelay="100"maximumRedeliveries="1" />
<!-- 发送消息的目的地(一个队列) --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.queue?consumer.prefetchSize=100" /> </bean> <bean id="replyDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg value="ymk.reply.queue" /> </bean>
String correlationId = RandomStringUtils.randomNumeric(5); consumer = session.createConsumer(replyDest); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); consumer.setMessageListener(this);
public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Client接收Server端消息:" + tm.getText()); } catch (Exception e) { e.printStackTrace(); } }
producer.send(message);
package webpoc.mq.dual; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang.RandomStringUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Client implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Client接收Server端消息:" + tm.getText()); } catch (Exception e) { e.printStackTrace(); } } public void start(ApplicationContext context) { ConnectionFactory factory = null; Connection conn = null; Destination destination = null; Destination replyDest = null; Session session = null; MessageProducer producer = null; MessageConsumer consumer = null; try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); TextMessage message = session.createTextMessage("...Hello JMS!"); String correlationId = RandomStringUtils.randomNumeric(5); consumer = session.createConsumer(replyDest); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); consumer.setMessageListener(this); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public void send(ApplicationContext context) { ConnectionFactory factory = null; Connection conn = null; Destination destination = null; Destination replyDest = null; Session session = null; MessageProducer producer = null; try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); TextMessage message = session.createTextMessage("...Hello JMS!"); String correlationId = RandomStringUtils.randomNumeric(5); message.setJMSReplyTo(replyDest); message.setJMSCorrelationID(correlationId); producer.send(message); System.out.println("send 1 message"); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml"); //sendWithAuto(context); Client c = new Client(); c.start(context); c.send(context); }
public void onMessage(Message message) { System.out.println("on message"); try { TextMessage response = this.session.createTextMessage(); if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String messageText = txtMsg.getText(); response.setText("服务器收到消息:" + messageText); System.out.println(response.getText()); } response.setJMSCorrelationID(message.getJMSCorrelationID()); producer.send(message.getJMSReplyTo(), response); } catch (Exception e) { e.printStackTrace(); } } }
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(replyDest); consumer = session.createConsumer(destination); consumer.setMessageListener(this);
package webpoc.mq.dual; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang.RandomStringUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Server implements MessageListener { private ConnectionFactory factory = null; private Connection conn = null; private Destination destination = null; Destination replyDest = null; private Session session = null; private MessageProducer producer = null; private MessageConsumer consumer = null; @Override public void onMessage(Message message) { System.out.println("on message"); try { // 若有消息传送到服务时,先创建一个文本消息 TextMessage response = this.session.createTextMessage(); // 若从客户端传送到服务端的消息为文本消息 if (message instanceof TextMessage) { // 先将传送到服务端的消息转化为文本消息 TextMessage txtMsg = (TextMessage) message; // 取得文本消息的内容 String messageText = txtMsg.getText(); // 将客户端传送过来的文本消息进行处理后,设置到回应消息里面 response.setText("服务器收到消息:" + messageText); System.out.println(response.getText()); } // 设置回应消息的关联ID,关联ID来自于客户端传送过来的关联ID response.setJMSCorrelationID(message.getJMSCorrelationID()); System.out.println("replyto===" + message.getJMSReplyTo()); // 生产者发送回应消息,目的由客户端的JMSReplyTo定义,内容即刚刚定义的回应消息 producer.send(message.getJMSReplyTo(), response); } catch (Exception e) { e.printStackTrace(); } } public void receive(ApplicationContext context) { try { destination = (Destination) context.getBean("destination"); replyDest = (Destination) context.getBean("replyDestination"); factory = (ActiveMQConnectionFactory) context.getBean("connectionFactory"); conn = factory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(replyDest); consumer = session.createConsumer(destination); consumer.setMessageListener(this); } catch (Exception e) { String errorMessage = "JMSException while queueing HTTP JMS Message"; e.printStackTrace(); } } public static void main(String[] args) { final ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/activemq_dual.xml"); Server s = new Server(); s.receive(context); } }
Apache ActiveMQ实战(1)-基本安装配置与消息类型
标签:
原文地址:http://blog.csdn.net/lifetragedy/article/details/51836557