标签:ace ESS 消失 vat 消息 sub comm nfa com
static networkConnector是用于创建一个静态的配置对于网络中的多个Broker做集群,这种协议用于复合url,一个复合url包括多个url地址。
<networkConnectors> <networkConnector name="local network" duplex="true" uri="static://(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)"/> </networkConnectors>
常用networkConnector配置的可用属性:
conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理
duplex :默认false,设置是否能双向通信
消息发送代码
public class JmsSend { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue=session.createQueue("my-queue4"); MessageProducer producer=session.createProducer(queue); for(int i=0 ; i<20 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
192.168.174.104:61616 broker1 接收测试代码
public class JmsReceiver1 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.174.104:61616"); for (int i=0; i<10 ;i++){ new Myhread1(connectionFactory).start(); Thread.sleep(1000); } } } class Myhread1 extends Thread { private ConnectionFactory connectionFactory ; public Myhread1(ConnectionFactory connectionFactory) { super(); this.connectionFactory = connectionFactory; } public void run() { try { final Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("my-queue4"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("1======"+msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
192.168.174.104:61676 broker2 接收测试代码
public class JmsReceiver2 { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.174.104:61676"); for (int i=0; i<10 ;i++){ new Myhread2(connectionFactory).start(); Thread.sleep(1000); } } } class Myhread2 extends Thread { private ConnectionFactory connectionFactory ; public Myhread2(ConnectionFactory connectionFactory) { super(); this.connectionFactory = connectionFactory; } public void run() { try { final Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue("my-queue4"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("2======"+msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
“丢失”的消息
broker1和broker2通过networkConnector连接,一些consumers连接到broker2,消费broker1上的消息。消息先被broker2从broker1上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker2重启了,这些consumers发现broker2连接失败,通过failover连接到broker1上去了,但是有一部分他们还没有消费的消息被broker1已经分发到了broker2上去了。这些消息,就好像是消失了。
broker1 中my-queue4 接收到20条消息。
broker1通过静态网络与broker2连接,与broker2相连的消费者消费后,broker1中Number of Pending Messages为0,即消息先被broker2从broker1上消费掉。
一些consumers连接到broker1,没法从broker1获取消息消费。
针对“丢失”的消息,配置replayWhenNoConsumers选项
这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。
<policyEntries> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries>
标签:ace ESS 消失 vat 消息 sub comm nfa com
原文地址:https://www.cnblogs.com/xiaoliangup/p/9351461.html