码迷,mamicode.com
首页 > 其他好文 > 详细

rocketMQ

时间:2019-07-11 15:47:20      阅读:172      评论:0      收藏:0      [点我收藏+]

标签:rollback   eve   消息队列   strong   开发   调用   本地   fse   ndt   

1、RocketMQ结构

技术图片

 

 

通过topic确定将消息发送到不同broker;broker中有多个messagequeue,消息放松到broker后经过轮询算法、hash算法等将消息发送到不同的messagequeue上

messagequeue本身不存放消息,真正的消息存放在commitlog中,messagequeue只存放消息在commitlog中的对应位置信息,通过messagequeue找到对应存储在messagelog中数据;

不同topic、messagequeue消息都写到相同commitlog文件,即commitlog顺序消费;

rocketmq异常关闭重启后,如何寻找上次消费位置:rocketmq每次消费都有对应一个offset值,没消费一个offset++,重启后可以根据offset位置开始消费,有可能存在重复消费问题,可以通过幂等业务逻辑在消费端去重

 

参考(https://www.jianshu.com/p/2838890f3284)

   

2、RocketMQ顺序消费(MessagequeueSelector)

  RocketMQ中同一个队列不能被并行消费,但可以并行消费多个队列。基于此,Rocket可以保证将需要排序的内容放在同一个队列中便可以保证消费的顺序进行。举个例子,对于同一订单必须按照:订单创建——》订单付款——》订单发货方式进行,但对于不同订单之间可以并行消费的。

  同一个队列只能被一个消费端消费

  rocketMQ发送端源码:

   技术图片
/**
     * 生产者1
     */
    @Test
    public void send1() {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
            producer.setNamesrvAddr("192.168.229.5:9876;192.168.229.6:9876");
            producer.setRetryTimesWhenSendFailed(3);
            producer.start();
            String[] tags = new String[]{"创建订单", "支付", "发货", "收货", "五星好评"};
            for (int i = 5; i < 25; i++) {
                int orderId = i / 5;
                Message msg = new Message("OrderTopic1", tags[i % tags.length], "uniqueId:" + i,
                        ("order_" + orderId + " " + tags[i % tags.length]).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        //此刻arg == orderId,可以保证是每个订单进入同一个队列
                        Integer id = (Integer) arg;  //arg就是orderid
                        int index = id % mqs.size(); //orderi取模消息队列个数
                        return mqs.get(index);   //返回哪一个消息队列
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

producer.send()代码:
    @Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

代码解析:
    1、producer.send有三个参数producer(msg,MessageQueueSelector,arg1)
        ①msg:mq发送的内容;
        ③arg1 一个obj对象,作用就是messageQueueSelector中方法select(List<MessageQueue> mqs, Message msg, Object arg))第三个参数
        ②messageQueueSelector:消息队列选择器,这里是让用户决定消息msg发送到哪一个queue上。根据源码我们可以看出select方法返回的是哪一个具体的消息队列,如果同一个orderid则返回的是同一个消息队列,根据queue先进先出特性便可以保证发送消息的顺序性
2、这里是按照orderid排序的,如果业务开发中需要按照其他指标排序,只需修改send()方法第三个参数orderid为对应参数即可
View Code

 

     上边代码中比较绕一个地方值的注意:msg中tag是按照i取模(i%5),而orderid是i整除tag.len(i/5),这样5~9是同一orderid,这样确定同一个队列,而tag分别是按照tag[0]、tag[1]...tag[4]依次放入同一个队列;

使用hash取模法,让同一个订单发送到同一个queue中,再使用同步发送,只有消息A发送成功,再发送消息B,这样,我们保证了发送有序.
rocketmq的topic内的队列机制,可以保证存储满足FIFO,剩下的只需要消费者顺序消费即可

 

rocketMQ消费端源码

  

技术图片
 /**
     * 订阅
     */
    @Test
    public void consumer1() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
        consumer.setNamesrvAddr("192.168.229.5:9876;192.168.229.6:9876");
        try {
            //设置Consumer从哪开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("OrderTopic1", "*");
            // 实现了MessageListenerOrderly表示一个队列只会被一个线程取到, 第二个线程无法访问这个队列,MessageListenerOrderly默认单线程
//            consumer.setConsumeThreadMin(3);
//            consumer.setConsumeThreadMax(6);
            consumer.registerMessageListener(new MessageListenerOrderly() {
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    try {
                        System.out.println("orderInfo: " + new String(msgs.get(0).getBody(), "utf-8"));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer1 Started.");
        while (true) {
        }

    }
View Code  

              技术图片

没有保证全局有序,但局部有序,每个订单的操作消息保证有序了,这种局部顺序满足高并发,符合预期.

 

3、rocketMQ事务问题(https://blog.csdn.net/weixin_40533111/article/details/84587967)  

技术图片            

先发送PREPARED消息,返回其CommitLog Offset
执行本地逻辑,得到处理结果是Commit还是Rollback
将处理结果和CommitLog Offset发送到Broker
Broker先根据Offset从CommitLog中提起PREPARED消息,然后克隆此消息生成新的消息,消息Body(内容)和PREPARED的一致。先设置处理结果标识,然后根据处理结果,如果是Rollback,则清空body,否则不清空,最后存储进CommitLog
Broker在存储PREPARED消息时,不会将其PositionInfo存入ConsumeQueue,也就是正常情况下此消息不会被消费;但会为其生成IndexInfo存入IndexFile,也就是能通过key查询此消息。
当事务处理结果是Rollback时,克隆消息不会生成PositionInfo和IndexInfo,所以此消息不会被消费,不能被查询,事务流程就此结束;当事务处理结果是Commit时,克隆消息会生成PositionInfo和IndexInfo,也就是能被正常消费,也能正常查询。

事务消息的确认(Commit/Rollback)的执行方式是Oneway形式,也就是单向执行,没有结果返回,这种形式执行效率很高,但是有个问题就是不确定确认操作是否执行成功,可能因为网络问题或者Broker问题造成发送失败,消息回查就是解决这个问题,但现在没有了,所以需要我们自己设计解决。

事务确认操作失败,有两种补偿操作,一种是重新发送一次事务消息,另一种是去查询PREPARED消息。

3.1 producer发送消息事务(发送消息到broker)

  producer采用TransactionMQProducer ;consumer采用普通consumer

  【理论】 

      ①producer发送Prepared消息

      ②发送Prepared成功前提下执行本地事务;如果Prepared失败则不执行本地事务,并且将本地事务状态(localtransactionstate)置为ROLLBACK_MESSAGE

      ③根据本地事务执行结果(localtransactionstate状态),决定向broker发送confirm还是cancel指令;如果是commit则broker决定将消息投递给consumer,如果是cancel则broker不将消息投递给comsumer

  【源码】

    技术图片

 

  具体代码如下:

技术图片
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter tranExecuter, final Object arg)
    throws MQClientException {
    if (null == tranExecuter) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 标记消息是half消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 发送half消息,该方法是同步发送,事务消息也必须是同步发送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            // 只有在half消息发送成功的时候才会执行事务
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                // 执行本地事务
                localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 根据事务commit的情况来判断下一步操作
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 从broker返回的信息中获取half消息的offset
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    // 需要把transactionId和offset发送给broker,便于broker查找half消息
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            // 表明本地址事务成功commit,告诉broker可以提交事务
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            // 说明事物需要回滚,有可能是half消息发送失败,也有可能是本地事务执行失败
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            // 如果状态是UNKNOW,broker还会反查producer,也就是接口:org.apache.rocketmq.example.transaction.TransactionCheckListenerImpl#checkLocalTransactionState的作用,但是目前rmq4.2.0并没有向producer查询,也就是源码中都没有调用这个接口
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 这个发送消息是onway的,也就是不会等待返回
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}
View Code

 

  对比发送普通消息VS 事务(half)消息  

   普通消息发送:

      private DefaultMQProducer defaultMQProducer; //普通rocketmq生产者

      SendResult sendResult=defaultMQProducer.send(msg);

   half消息发送:

      private TransactionMQProducer transactionMQproducer ;

      SendResult sendResult = transactionMQproducer .sendMessageInTransaction(msg, transactionExecuter, "TopicTransaction");

   half消息和普通消息不一样,half消息执行send方法发送后会被消费,而half消息执行sendMessageInTransaction后并不会被consumer消费。原因是broker在将消息写入commitlog时候会判断消                        息类型,普通消息发送他的消息类型是Transaction_not_type;half消息发送他的消息类型可能有三种类型transaction_prepared_type、transaction_rollback_type、transaction_commit_type,如果                    是transaction_prepare_type和transaction_rollback_type类型时comsumeQueue的queueoffset(queueoffset对应消息在commitlog位置)不会增加,而comsumer在消费时会先读取                     comsuerQueue中queueoffset值,根据queueoffset值去commitlog中读取对应消息。所以comsumer在拉取消息时不会拉取到prepared和rollback的消息。

   相关代码如下:  

技术图片
/*
*第一步:TRANSACTION_PREPARED_TYPEconsumerqueue中queueoffset和TRANSACTION_ROLLBACK_TYPE不递增
*/
switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        // The next update ConsumeQueue information
        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        break;
    default:
        break;
}

// org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
/*
*第二步:TRANSACTION_PREPARED_TYPEconsumerqueue中queueoffset和TRANSACTION_ROLLBACK_TYPE不读取
*/
public void dispatch(DispatchRequest request) {
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    switch (tranType) {
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            DefaultMessageStore.this.putMessagePositionInfo(request);
            break;
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
    }
}
View Code

 

  以上两点保证了prepare消息也就是half消息不会被消费。

 3.2 broker处理结束事务消息

  producer端执行endtransaction()方法后,会将请求发往broker

  broker端收到endtransaction()后,调用EndTransactionProcessor.java中EndTransactionProcessor.processRequest(ChannelHandlerContext ctx, RemotingCommand request)方法将事务消息写入commitlog中并生成consumequeue和index数据,供consumer消费

 

3.3 broker发起事务消息回查机制

   3.1节中,如果endTransaction()方法执行失败,导致数据没有发送到broker,broker会有回查线程定时扫描每个存储事务状态表,如果是commit或者cancel状态消息则直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,producer会调用defaultMQProducerImpl.checkTransactionState()方法来处理broker定时回调请求,checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOneway让broker来更新消息最终状态

   流程图:

    技术图片

  代码:

技术图片
 @Override
 10: public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
 11:     Runnable request = new Runnable() {
 12:         private final String brokerAddr = addr;
 13:         private final MessageExt message = msg;
 14:         private final CheckTransactionStateRequestHeader checkRequestHeader = header;
 15:         private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
 16: 
 17:         @Override
 18:         public void run() {
 19:             TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
 20:             if (transactionCheckListener != null) {
 21:                 // 获取事务执行状态
 22:                 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 23:                 Throwable exception = null;
 24:                 try {
 25:                     localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
 26:                 } catch (Throwable e) {
 27:                     log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
 28:                     exception = e;
 29:                 }
 30: 
 31:                 // 处理事务结果,提交消息 COMMIT / ROLLBACK
 32:                 this.processTransactionState(//
 33:                     localTransactionState, //
 34:                     group, //
 35:                     exception);
 36:             } else {
 37:                 log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
 38:             }
 39:         }
 40: 
 41:         /**
 42:          * 处理事务结果,提交消息 COMMIT / ROLLBACK
 43:          *
 44:          * @param localTransactionState 【本地事务】状态
 45:          * @param producerGroup producerGroup
 46:          * @param exception 检查【本地事务】状态发生的异常
 47:          */
 48:         private void processTransactionState(//
 49:             final LocalTransactionState localTransactionState, //
 50:             final String producerGroup, //
 51:             final Throwable exception) {
 52:             final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
 53:             thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
 54:             thisHeader.setProducerGroup(producerGroup);
 55:             thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
 56:             thisHeader.setFromTransactionCheck(true);
 57: 
 58:             // 设置消息编号
 59:             String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
 60:             if (uniqueKey == null) {
 61:                 uniqueKey = message.getMsgId();
 62:             }
 63:             thisHeader.setMsgId(uniqueKey);
 64: 
 65:             thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
 66:             switch (localTransactionState) {
 67:                 case COMMIT_MESSAGE:
 68:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
 69:                     break;
 70:                 case ROLLBACK_MESSAGE:
 71:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
 72:                     log.warn("when broker check, client rollback this transaction, {}", thisHeader);
 73:                     break;
 74:                 case UNKNOW:
 75:                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
 76:                     log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
 77:                     break;
 78:                 default:
 79:                     break;
 80:             }
 81: 
 82:             String remark = null;
 83:             if (exception != null) {
 84:                 remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
 85:             }
 86: 
 87:             try {
 88:                 // 提交消息 COMMIT / ROLLBACK
 89:                 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
 90:                     3000);
 91:             } catch (Exception e) {
 92:                 log.error("endTransactionOneway exception", e);
 93:             }
 94:         }
 95:     };
 96: 
 97:     // 提交执行
 98:     this.checkExecutor.submit(request);
 99: }
100: 
101: // :arrow_down::arrow_down::arrow_down:【DefaultMQProducerImpl.java】
102: /**
103:  * 【事务消息回查】检查监听器
104:  */
105: public interface TransactionCheckListener {
106: 
107:     /**
108:      * 获取(检查)【本地事务】状态
109:      *
110:      * @param msg 消息
111:      * @return 事务状态
112:      */
113:     LocalTransactionState checkLocalTransactionState(final MessageExt msg);
114: 
115: }
View Code
 

3.4消息发送成功后消费端消费超时以及消费失败解决方案:

  消费超时:不断重试机制
  消费失败:人工解决,消息发送成功后会落入本地表中,表中可以设置一个字段用于标记该数据是否被成功消费,通过定时job扫描这个表将没有成功消费的数据进行人工解决,保证消息的最终一致性即可

rocketMQ

标签:rollback   eve   消息队列   strong   开发   调用   本地   fse   ndt   

原文地址:https://www.cnblogs.com/enhance/p/11073446.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!