码迷,mamicode.com
首页 > 其他好文 > 详细

ActiveMQ集群搭建

时间:2018-05-10 19:28:58      阅读:206      评论:0      收藏:0      [点我收藏+]

标签:share   rod   ica   replicas   com   dap   教程   void   ini   

一、 Activemq主备搭建

  Shared Filesystem Master-Slave方式
  shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。
  多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

技术分享图片

技术分享图片

1  搭建配置步骤搭建 master-slave (一主 一备份)

准备mq的1节点 activemq-1
准备mq的2节点 activemq-2

  特点:
    只能本地不能分布式 和 集群。

针对每一个activemq的节点进行配置:

1.1 配置节点1:
首先创建共享目录,并创建两个节点
如图:

技术分享图片

  •  配置activemq-1,需要修改持久数据库位置,修改:activemq-1/conf/activemq.xml 

技术分享图片

  • 配置activemq-1,需要修改activemq-1/conf/activemq.xml

  如图:修改成61617

技术分享图片

  • 配置activemq-1 ,需要修改activemq-1/conf/jetty.xml

技术分享图片

2.2 配置节点2:

  • 配置activemq-2,需要修改 持久目录文件,修改:activemq-2/conf/activemq.xml 

技术分享图片

  • 配置activemq-2,需要修改activemq-2/conf/activemq.xml

技术分享图片

  如图:修改成61618

  • 配置activemq-1 ,需要修改activemq-2/conf/jetty.xml

技术分享图片

2 测试master-slave
2.1 生产者

 1 public class ProduceQueue {??
 2     @Test?
 3     public void sendMessage() throws Exception{?
 4         //1.创建一个连接工厂 connectionfactory? 参数:就是要连接的服务器的地址?
 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");?
 6         //2.通过工厂获取连接对象 创建连接?
 7         Connection connection = factory.createConnection();?
 8         //3.开启连接?
 9         connection.start();?
10         //4.创建一个session对象  提供发送消息等方法?
11         // 第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。?
12         // 第二个参数:就是设置消息的应答模式
13         // 如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答?
14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
15         //5.创建目的地 (destination)  queue? 参数:目的地的名称?
16         Queue queue = session.createQueue("queue-test-cluster");?
17         //6.创建个生产者?
18         MessageProducer producer = session.createProducer(queue);?
19         //7.构建消息的内容?
20         TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
21         // 8.发送消息?
22         producer.send(textMessage);?
23         //9.关闭资源?
24         producer.close();?
25         session.close();?
26         connection.close();?
27     }
28 ?}

2.2 消费者

 1 public class ConsumerQueue {??
 2     @Test?
 3     public void consumer() throws Exception{?
 4         //1.创建连接的工厂?
 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");?
 6         //2.创建连接?
 7         Connection connection = factory.createConnection();?
 8         //3.开启连接?
 9         connection.start();?
10         //4.创建session?
11         // 第一个参数:表示是否开启分布式事务(JTA)  一般是false 不开启。?
12         // 第二个参数:就是设置消息的应答模式   如果 第一个参数为false时,第二个参数设置才有意义。用的是自动应答?
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);?
14         // 5.创建接收消息的一个目的地?
15         Queue queue = session.createQueue("queue-test-cluster");?
16         // 6.创建消费者?
17         MessageConsumer consumer = session.createConsumer(queue);?
18         // 7.接收消息 打印?
19         // 第一种?
20         /*while(true){?
21             Message message = consumer.receive(1000000);//设置接收消息的超时时间?
22             //没有接收到消息就跳出循环?
23             if(message==null){?
24                 break;?
25             }?
26             if(message instanceof TextMessage){?
27                 TextMessage message2 = (TextMessage) message;?
28                 System.out.println("接收的消息为"+message2.getText());?
29                 }?
30          }*/?
31         //第二种??
32         // 设置一个监听器?
33         // System.out.println("start");?
34         // 这里其实开辟了一个新的线程?
35         consumer.setMessageListener(new MessageListener() {??
36             //当有消息的时候会执行以下的逻辑?
37             @Override?
38             public void onMessage(Message message) {?
39                 if(message instanceof TextMessage){?
40                     TextMessage message2 = (TextMessage) message;?
41                     try {?
42                         System.out.println("接收的消息为"+message2.getText());?
43                     } catch (JMSException e) {?
44                         e.printStackTrace();?
45                     }?
46                 }?
47             }?
48         });?
49         //System.out.println("end");?
50         Thread.sleep(199999);?
51         // 8.关闭资源?
52         consumer.close();?
53         session.close();?
54         connection.close();?
55     }??
56 }

先启动的成为主节点,平常主节点工作,slave不工作但是一直做监听。当主节点挂掉,slave接手工作。

二、 基于zookeeper的activemq集群搭建(推荐)

2.1 基于可复制的 LevelDB
  LevelDB 是 Google 开发的一套用于持久化数据的高性能类库。 LevelDB 并不是一种服务,用户需要自行实现 Server。 是单进程的服务,能够处理十亿级别规模 Key-Value 型数据,占用内存小。
  http://activemq.apache.org/replicated-leveldb-store.html

 技术分享图片

  高可用的原理:使用 ZooKeeper(集群)注册所有的 ActiveMQ Broker。只有其中的一个 Broker 可以提供服务,被视为 Master,其他的 Broker 处于待机状态,被视为 Slave。如果 Master 因故障而不能提供服务,ZooKeeper 会从 Slave 中选举出一个 Broker 充当 Master。Slave 连接 Master 并同步他们的存储状态, Slave 不接受客户端连接。所有的存储操作都将被复制到连接至 Master 的 Slaves。 如果 Master 宕了,得到了最新更新的 Slave 会成为 Master。 故障节点在恢复后会重新加入到集群中并连接 Master 进入 Slave 模式。所有需要同步的 disk 的消息操作都将等待存储状态被复制到其他法定节点的操作完成才能完成。所以,如果你配置了 replicas=3,那么法定大小是(3/2)+1=2。 Master 将会存储并更新然后等待 (2-1)=1 个Slave 存储和更新完成,才汇报 success。 至于为什么是 2-1,熟悉 Zookeeper 的应该知道,有一个 node要作为观擦者存在。当一个新的 Master 被选中,你需要至少保障一个法定 node 在线以能够找到拥有最新状态的 node。这个 node 可以成为新的 Master。因此,推荐运行至少 3 个 replica nodes,以防止一个 node失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式类似)。

2.2 集群单机环境规划
  定义环境:3个activemq节点(node01 node02 node03)

Ip 集群通信端口 节点消息连接端口 Jetty后台运行端口
192.168.25.130 63631 51515 8361
192.168.25.130 63632 51516 8362
192.168.25.130 63633 51517 8363

2.3 配置zookeeper集群

见相关教程;

2.4 节点1的配置

  在activemq.xml中配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

  brokerName指定一个名字:任意即可。但是整个集群的所有的配置项名称都应该是一致的。

  broker标签下的配置:

<persistenceAdapter> 
  <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
  <replicatedLevelDB 
  directory="${activemq.data}/leveldb" 
  replicas="3" 
  bind="tcp://0.0.0.0:63631" 
  zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
  hostname="localhost" 
  zkPath="/activemq2/leveldb-stores"/> 
</persistenceAdapter>

  jetty.xml:配置项:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <property name="port" value="8361"/>
</bean>

2.5 节点2的配置
  参考节点1搭建即可,注意:端口不要一样。

  搭建后的截图:?

技术分享图片

  node01 -node03 是activemq的3个节点。

ActiveMQ集群搭建

标签:share   rod   ica   replicas   com   dap   教程   void   ini   

原文地址:https://www.cnblogs.com/gdwkong/p/9021156.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!