标签:
一、基本概念:
二者都是用来区别不同业务发送方发送的消息:
又称组名, notify server根据groupId来识别客户端机器, 配置为同一groupId视为同一集群的机器.因此, 各个业务之间,发送方和订阅方之间的GroupId不能重复.MessageType和GroupId命名长度最大为64个字符, 且不能包含空格.
消息ID是消息唯一标识.发送方和订阅最好都在日志中记录消息ID,便于排查问题,获取消息ID方法。
UniqId.getInstance().bytes2string(stringMessage.getMessageId())
如果可以建议同时记录下TOPIC、MessageType。
订阅方的GroupId和Topic的映射关系.
一个GroupId可以对应多个Topic.客户端通过spring配置文件配置订阅方需要订阅的消息,在应用启动时,订阅关系会推送到Notify Server.
Notify Server端将这份订阅关系保存,并根据保存在Server上的订阅关系将不断到达消息投给订阅方.
二、使用示例:
发送消息的spring配置:
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans default-autowire="byName"> <!-- 应用发送接入请先到申请平台申请 --> <!-- 日常:http://ops.jm.taobao.net/notify-side/index --> <!-- 线上(配置自动同步到预发):http://ops.jm.taobao.org:9999/notify-side/index --> <!-- notify 发送方配置 --> <bean id="notifyManager" class="com.taobao.notify.remotingclient.NotifyManagerBean" init-method="init"> <property name="publishTopics"> <!-- 配置所需发布的topics,可以设置多个 --> <list> <value>SHENXUN_TEST</value> <!-- 日常测试公用的Topic,可发任意的MessageType --> </list> </property> <property name="groupId" value="P-appname-detail" /> <!-- 发送方的GroupId --> </bean>
发送消息的部分Java代码片段:
package com.taobao.notify.example.publish; import java.io.UnsupportedEncodingException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.taobao.notify.message.BytesMessage; import com.taobao.notify.remotingclient.NotifyManager; import com.taobao.notify.remotingclient.SendResult; import com.taobao.notify.utils.UniqId; public class SimplePublisher { private static final String TOPIC = "SHENXUN_TEST"; private static final String MESSAGE_TYPE = "test-messagType"; public static void main(String[] args) throws Exception { ApplicationContext ctx = new ClassPathXmlApplicationContext("publish/publisher-bean.xml"); NotifyManager publisher = (NotifyManager) ctx.getBean("notifyManager"); // 因为连接是异步的。第一次发送需要等待连接建立成功。 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } sendMessage(publisher); } /** * 同步发送消息 * * @param notifyManager */ private static void sendMessage(NotifyManager notifyManager) throws UnsupportedEncodingException { BytesMessage bytesMessage = new BytesMessage(); bytesMessage.setBody("NoTranscationTestStringMessage 中文".getBytes("UTF-8")); /** * 必填属性 */ bytesMessage.setTopic(TOPIC); bytesMessage.setMessageType(MESSAGE_TYPE); bytesMessage.setStringProperty("customHeader", "12345"); // 用户自定义属性,可用户订阅方过滤 SendResult result = notifyManager.sendMessage(bytesMessage); if (result.isSuccess()) { // 发送成功后处理 System.out.println("success!消息发送成功"); System.out.println("MessageId is: " + UniqId.getInstance().bytes2string(bytesMessage.getMessageId())); } else { System.out.println("failure!消息发送失败,原因:" + result.getErrorMessage()); System.out.println("MessageId is: " + UniqId.getInstance().bytes2string(bytesMessage.getMessageId())); } } }
订阅消息的Spring配置片段:
保证消息不丢 就必然可能出现重投的情况,需要订阅者在消息监听器里做好 幂等 ,保证消息收到一次和收到多次的结果是一样的,需要对消息id去重 spring代码片段 <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans default-autowire="byName"> <!-- 应用订阅接入请先到申请平台申请 --> <!-- 日常:http://ops.jm.taobao.net/notify-side/index --> <!-- 线上(配置自动同步到预发):http://ops.jm.taobao.org:9999/notify-side/index --> <bean id="notifySubscribeManager" class="com.taobao.notify.remotingclient.NotifyManagerBean" init-method="init"> <property name="subscribeMessages"> <map> <entry key="SHENXUN_TEST"> <!-- topic --> <map> <entry key="test-messagType"> <!-- 已存在的 messageType--> <bean class="com.taobao.notify.config.SubscriptMsgDetailInfo" /> </entry> </map> </entry> </map> </property> <property name="groupId" value="S-appanme-test"/> <!-- 订阅方的GroupId --> <property name="messageListener" ref="messageListener" /> <!-- 消息监听处理器 --> </bean> <bean id="messageListener" class="com.taobao.notify.example.subscribe.SimpleMessageListener" /> </beans>
订阅消息的部分Java代码片段:
package com.taobao.notify.example.subscribe; import com.taobao.notify.message.BytesMessage; import com.taobao.notify.message.Message; import com.taobao.notify.message.StringMessage; import com.taobao.notify.remotingclient.MessageListener; import com.taobao.notify.remotingclient.MessageStatus; import com.taobao.notify.utils.UniqId; public class SimpleMessageListener implements MessageListener{ public void receiveMessage(Message message, MessageStatus status){ StringBuilder generalInfo = new StringBuilder(); generalInfo.append("收到消息"); generalInfo.append("messageId is:").append(UniqId.getInstance().bytes2string(message.getMessageId())); generalInfo.append("topic is:").append(message.getTopic()); generalInfo.append("messageType is:").append(message.getMessageType()); /** * 只读属性 */ generalInfo.append("消息在客户端产生时间:").append(message.getBornTime()); generalInfo.append("消息到达notify server时间:").append(message.getGMTCreate()); generalInfo.append("消息最后一次投递时间:").append(message.getGMTLastDelivery()); //消息 body 处理 if (message instanceof StringMessage) { generalInfo.append("String消息body内容:"); StringMessage stringMessage = (StringMessage) message; generalInfo.append(stringMessage.getBody()); } else if (message instanceof BytesMessage) { generalInfo.append("Byte消息body内容:"); BytesMessage bytesMessage = (BytesMessage) message; generalInfo.append(new String(bytesMessage.getBody())); } System.out.println(generalInfo.toString()); //业务处理失败,回滚继续重投 boolean isSuccess = true; if(!isSuccess){ status.setRollbackOnly(); status.setReason("失败原因"); } } }
<property name="maxRetry" value="3"></property>
默认是重试三次,在发送配置里可以配置重试次数。超时时间是3秒,可以在发送消息手动设置超时时间:
message.setClientPostTimeout(MILLISECONDS);
三、notify使用场景
Notify是一款高可靠、高实时的消息中间件,是交易链路的核心节点。具备了以下的特性:
把应用的业务操作和消息发送组成一个分布式事务,保证业务操作和消息发送是个原子操作。 案例:交易平台本地操作创建订单,然后发送订单创建消息,需要让处理订单的系统最终看到的订单状态是一致的。如果订单创建成功,但是消息发送失败;或者消息发送成功,订单创建失败。就会导致处理订单消息的相关系统所看到的订单状态是不一致的,业务会出大问题。基于notify分布式事务的特性就可以保证订单创建和消息发送同时成功或者同时失败,而notify本身提供了可靠的消息服务,这样就保证相关系统所看到的订单状态保持最终一致性。
许多场景都需要实时性的保证,而推送和无序的模型保证了高实时性,ms级别。无序模型,消息投递的并发度最高,而且不会因为一个消息消费失败,导致后面的消息消费不了;推送模型使得消息一旦到达服务器会立马推送给客户端,无间歇。 案例:淘金币兑换商品,收到交易成功消息后,迅速扣除金币。如果实时性太差就有可能出现业务漏洞。比如某个用户总共只有10个金币,用10个金币兑换商品后,剩余0个金币。如果消息推送不实时,那么用户的金币在一定的时间窗口之内没有被扣除,这样他就可以继续兑换其他商品。
数据如果只写一份的话,那么当磁盘出现问题的时候,数据就会丢失。为了解决单点故障问题,notify提供双写机制,大大提高了消息数据的可靠性。 案例:交易需要保证数据的绝对安全可靠,交易集群开启了双写机制。交易每发送一条消息都会同时写入到两个mysql实例,双写成功才代表消息发送成功。
有些业务场景比较复杂,纯粹的主题+消息类型二元组订阅已经无法满足需求。header订阅支持消息属性表达式过滤,提供更加动态灵活的订阅机制。 案例:交易消息,某个业务订阅主题为交易,消息类型为订单创建的消息,但是他只关注类目A。按照传统的订阅方式,主题+消息类型二元组,notify会把这个主题+消息类型的消息全部投递给订阅者,订阅者收到消息后只处理类目A的消息,其他忽略。而采用header订阅,他可以在主题+消息类型之上,在加一个属性过滤条件property.rootcat == A,这样一来notify只会把类目A的消息投给订阅者,订阅者的处理逻辑简化了。另外,当消息量大的时候,header订阅可以节约了服务器的带宽成本,同时节约了订阅者的资源消耗。
四、Notify源码包和Notify的Demo示例下载
五、参考资料
1.阿里中间件平台:http://middleware.alibaba-inc.com/
2.Notify中间件首页:http://ops.jm.taobao.net/notify-side/index
3.淘宝百科:http://baike.corp.taobao.com/index.php
4.阿里中间件团队博客:http://jm.taobao.org/
5.阿里巴巴wiki文档空间:http://docs.alibaba-inc.com/
标签:
原文地址:http://www.cnblogs.com/RunForLove/p/5439002.html