码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq在艺龙业务系统中的实践

时间:2015-08-27 22:46:48      阅读:364      评论:0      收藏:0      [点我收藏+]

标签:

rabbitmq作为成熟的企业消息中间件,实现了应用程序间接口调用的解耦,提高系统的吞吐量。

下面介绍下rabbitmq的一些基本概念:

  • message acknowledgment: 消息确认,解决消息确认问题,只有收到ack之后才能从消息系统中删除。
  • message durability: 消 息持久化,当rabbitmq退出或崩溃后,会把queue中的消息持久化。但注意,RabbitMQ并不能百分之百保证消息一定不会丢失,因为为了提 升性能,RabbitMQ会把消息暂存在内存缓存中,直到达到阀值才会批量持久化到磁盘,也就是说如果在持久化到磁盘之前RabbitMQ崩溃了,那么就 会丢失一小部分数据,这对于大多数场景来说并不是不可接受的,如果确实需要保证任务绝对不丢失,那么应该使用事务机制
  • exchange: 映射关系,实现消息名和队列之间的映射,根据消息名将消息发送到相应的队列中。
  • 常见的映射模式:
  • direct:转发消息到routigKey指定的队列
  • topic:按规则转发消息(最灵活)
  • headers:
  • fanout:转发消息到所有绑定队列
  •  
  • routing:exchange和queue之间绑定的媒介,成为routing key

在elong,我们开发了一套基于rabbitmq的消息系统,可以实现消息的可靠传输,提供了简单的restful api, 减少业务使用rabbitmq的学习成本。

 
下面说下这套系统jmsg的主要组成部分,在说之前,需要首先连接数据库结构:
 
 
1.MessageConfig 发送端配置,消息->Queue映射关系
 
CREATE TABLE `MessageConfig` (
  `ID` int(11) NOT NULL AUTO_INCREMENT,
  `MessageName` varchar(200) NOT NULL,  --消息名称
  `ExchangeName` varchar(200) NOT NULL, --消息名和队列的映射关系
  `Priority` varchar(50) DEFAULT NULL,  -- exchange与queue之前绑定的媒介
  `UseDelayRetry` bit(1) DEFAULT NULL, — 是否使用重试
  `DelayTime` int(11) DEFAULT NULL,   —延迟多长时间重试 
  `MaxRetryCount` int(11) DEFAULT ‘3’,  —最大重试次数
  `_timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`),
  UNIQUE KEY `IX_MessageName` (`MessageName`)
) ENGINE=InnoDB AUTO_INCREMENT=409 DEFAULT CHARSET=utf8;
 
字段解释:
MessageName: 消息名称
ExchangeName: exchange名称
Priority: 优先级,一个业务线可以根据不同优先级有多个队列
UseDelayRetry:是否使用重试
DelayTime: 延迟多长时间重试
MaxRetryCount: 最大重试次数
 
表数据
 
技术分享
 
 
 
2.MessageConsumersConfig 表: 消费端配置,消息->接收方配置
 
CREATE TABLE `MessageConsumersConfig` (
  `ID` int(11) NOT NULL AUTO_INCREMENT, 
  `MessageName` varchar(200) NOT NULL, — 消息名
  `Url` varchar(400) NOT NULL,  — 消息消费的url
  `TimeOut` int(11) DEFAULT ‘10’,  
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=266 DEFAULT CHARSET=utf8;
 
MessageName: 消息名
Url: 消息消费url
Timeout: 消费超时时间
 
技术分享
 
 
CREATE TABLE `QueueSetting` (
  `ID` bigint(20) NOT NULL AUTO_INCREMENT,
  `QueueName` varchar(50) DEFAULT NULL,
  `QOS` int(11) DEFAULT NULL,
  `ParallelCount` int(11) DEFAULT NULL,
  `LastUpdateTime` datetime DEFAULT NULL,
  `LastUpdateUserName` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8;
 
技术分享
 
 
rabbimq 配置

serverIP : 服务器ip

Port:服务端口号

UserName: 用户名

PassWord: 密码

MaxPoolSize: 最大连接池大小

RequestedHeartbeat: 请求心跳检查时间(s)

RequestedConnectionTimeout: 请求连接超时时间

FailedLogBaseDir: 失败日志存储目录

ConnectionTimeOut: 连接保持时间(ms)

SendTimeOut: 发送超时(s)

ReceiveTimeOut: 接收超时时间(s)

SendLogBaseDir(发送日志目录)

1  jmsg-client

消息发送客户端,提供发送消息的接口

流程图:

技术分享

其中比较重要的是RabbitConnectPool(单例创建连接池),该类中比较重要的属性和方法

_max: //可以创建的最大连接数

_created: 已经使用的连接数

_used: 已经使用的连接数

_sendTimeOut: 发送连接请求超时时间

_receiveTimeOut: 接收连接成功的超时时间

_clientExpires: 连接到期时间

_connectionTimeOut: 连接超时

_qos:

 

重要的方法:

getSendingConnection(): 获取一个发送端的连接, 如果不是强制,就从连接池中获取连接,否则强制创建一个连接

getNextProxy:() 从连接池中过去连接(返回RabbitSendProxy), 如果超过最大连接数,则创建新连接, 否则加锁获取 proxy(pollProxy),如果返回为空,这等待,直到获取连接为止

pollProxy(): 获取连接, 从proxyqueue中poll,如果连接不可用,这_created–, 然后_used++, 如果创建条数 < 最大数, 这获取新连接(newProxy(), create++, _used++

returnToPool(): 返回到连接池,

getNewProxy(): 三次重试, getProxy(), 重试间隔0.1s

getProxy(): 通过工厂模式生成连接

 

 
 
RabbitProxy: 客户端连接rabbitmq 代理接口,做为连接池中的单元连接代理,可以由发送端和接收端继承

主要属性:

isAvailable: 是否可用,默认true

createTime: 创建时间,

DisposeListener: 连接池关闭需要执行的接口

connectionTimeout: 连接超时时间, 当前时间-createTime <= connectionTimeout可用

receiveTimeOut: 接收超时时间

Connection: 最主要的类,com.rabbitmq.client.Connection 连接

qos: 服务器一次可以传输的消息条数

Channel : 管道,连接创建管道,进行数据传输

ConnectionFactory: 连接工厂,创建rabbitmq连接

主要操作:

isAvailable(): 连接是否可用

 
dispose(): 关闭连接 需要关闭channel和connection
 
RabbitSendProxy 发送端代理, 默认开通channel confirsSelect,即确认机制

send(): 发送方法

流程:转换成byte数组->检查消息长度(小于64K) -> 缓存数据,等待确认->发送(basicPublish) -> 在接收到后删除缓存数据

 

下面说下如何保证数据一定能发送到rabbit queue中:

为了解决发送失败的问题,解决的思路无非是消息持久化,采用文件做持久化是比较好的选择。

具体的实现是消息失败后,放入blockingqueue作为数据换出的地方,定期从queue中读取数据存储文件,开启定时任务读取数据,重新send到queue中。

2 jmsg-server

作用: 从rabbitmq中读取消息,通过http接口调用消费者

数据库如图:

技术分享

jmg-server的流程:

  • 从数据库拿到该机器需要处理的queue,初始化rabbitmq连接池
  •   遍历queuelist,注册监听器,对每个queue获取的消息处理
  •  对每个queue开启MessageReceiver线程,监听该queue数据
  •  messageReceiver 开启线程池,qos是线程池大小
  •  messageReceiver是一个循环,不断获取rabbitmq server连接
  •  获取到数据后,开启线程进行处理MessageProcessTask, 该任务主要是查找ImessageListener的实现类,调用receive方法
  • 接收到消息处理:

消 息校验 -> 获取消息配置,找到消费者-> 判断没有正在处理 -> 消息还没有处理成功or 没有达到最大处理失败次数 – > 首次接收的消息入库- > 广播消息到接收方 -> 处理成功,记录messageLog,修改状态; 处理失败,发送到rabbitmq-server,等待下次处理.

 

 

 

3  rabbitmq-server

采用集群的方式搭建, 通过nginx对外提供统一的url

集群中一些重要的概念:

network partition: 网络中断,一般是子网之间的设备中断,这样在不同子网的设备通信会出现问题

搭建集群:

abbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在 $home/.erlang.cookie 中(像我的root用户安装的就是放在我的root/.erlang.cookie中),文件是400的权限。所以必须包管各节点cookie对峙一致,不然节点之间就无法通信。

方案1: 普通集群

erlang 通过cookie来决定是否能和另外一个节点通信,通常的做法是在一个机器上生成cookie文件,拷贝到集群中的其他机器。

集群可以通过单逻辑broker的方式来连接多个机器。各机器间通过Erlang消息传递来通信,因此,集群内所有节点都必须有相同的Erlang cookie。集群内机器间的网络连接必须是可信的,且所有机器必须运行相同版本的Erlang和RabbitMQ。

虚拟机、交换机、用户和权限会自动镜像到集群内所有节点。队列可能位于单节点上,或者镜像到多个节点上。客户端连接到集群内任何节点都能看到集群内所有队列。

步骤

1

rabbit1$ rabbitmq-server -detached
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached

2 加入以rabbit3为集群,集群名为rabbit@rabbit1,则需要在rabbit1和rabbit2上执行下面操作,加入rabbit@rabbit1,

rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

3 同样在rabbit3,上操作,加入rabbit@rabbit2

rabbit3$ rabbitmqctl stop_app
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

方案2:镜像队列

上述配置的RabbitMQ默认集群模式,但并不包管队列的高可用性,尽管互换机、绑定这些可以复制到 集群里的任何一个节点,然则队列内容不会复 制,固然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能守候重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内 容到集群里的每个节点,须要创建镜像队列

Federation允许一个broker上的交换机接收发布到另一个broker(这个broker可能是单独的机器或者集群)上的交换机的消息。为了节点间能够通过AMQP(带上SSL选项)通信,组成federation的两个交换机之间必须授予适当的用户和权限。

组成federation的交换机之间通过单向点对点连接。缺省情况下,在federation连接上,消息仅仅被转发一次,但是这样可增加更多、更复杂的路由拓扑。

在federation连接上,有些消息可能不会被转发;如果一条消息到达federated交换机后不能被路由到某个队列,则它不会被转发。

你可以在Internet上通过federation连接各个broker来pub/sub消息。

 

方案3: shovel

相比federation,工作在更低一层,shovel简单从一个broker的一个queue中消费消息,并传递到下一个broker的exchange上

the shovel simply consumes messages from a queue on one broker, and forwards them to an exchange on another.

参考资料:

 

http://www.rabbitmq.com/documentation.html

 
 
 

rabbitmq在艺龙业务系统中的实践

标签:

原文地址:http://www.cnblogs.com/200911/p/4764779.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!