码迷,mamicode.com
首页 > 其他好文 > 详细

rocketmq broker 第三章注册过程2

时间:2015-04-17 18:40:12      阅读:509      评论:0      收藏:0      [点我收藏+]

标签:java   分布式   rocketmq   

   大家好,接下上篇文章继续吧。

10,boolean initResult = controller.initialize();

    这才是controller初始化的地方

    boolean result = true;
    result = result && this.topicConfigManager.load();

    加载topic配置JSON串 如下:

      {
        "dataVersion":{
                "counter":4,
                "timestatmp":1429155762598
        },计数器,时间戳
        "topicConfigTable":{
                "DefaultCluster":{
                        "order":false,
                        "perm":7,
                        "readQueueNums":16,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"DefaultCluster",
                        "topicSysFlag":0,
                        "writeQueueNums":16
                },默认集群topic
                "OFFSET_MOVED_EVENT":{
                        "order":false,
                        "perm":6,
                        "readQueueNums":1,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"OFFSET_MOVED_EVENT",
                        "topicSysFlag":0,
                        "writeQueueNums":1
                },内存偏移量变动topic
                "BenchmarkTest":{
                        "order":false,
                        "perm":6,
                        "readQueueNums":1024,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"BenchmarkTest",
                        "topicSysFlag":0,
                        "writeQueueNums":1024
                },标准的topic我猜是做线上测试用的
                "SELF_TEST_TOPIC":{
                        "order":false,
                        "perm":6,
                        "readQueueNums":1,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"SELF_TEST_TOPIC",
                        "topicSysFlag":0,
                        "writeQueueNums":1
                },自己测试用的topic
                "%RETRY%QuickStartConsumer":{
                        "order":false,
                        "perm":6,
                        "readQueueNums":1,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"%RETRY%QuickStartConsumer",
                        "topicSysFlag":0,
                        "writeQueueNums":1
                },重试队列
                "TBW102":{
                        "order":false,
                        "perm":7,
                        "readQueueNums":8,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"TBW102",
                        "topicSysFlag":0,
                        "writeQueueNums":8
                },默认的
                "TopicTest":{
                        "order":false,
                        "perm":6,
                        "readQueueNums":4,
                        "topicFilterType":"SINGLE_TAG",
                        "topicName":"TopicTest",
                        "topicSysFlag":0,
                        "writeQueueNums":4
                }
        }

    result = result && this.consumerOffsetManager.load();

    加载topic消费进度 json 如下:

         {
           "offsetTable":{
                  "TopicTest@haqiaolong":{0:750,2:750,1:750,3:750

                   0:代码分区 750代表索引位置
                   },
                  "QuickStart@QuickStartConsumer":{0:250,2:250,1:250,3:250
                  }
             }
         }

    result = result && this.subscriptionGroupManager.load();

    加载订阅消息配置 json 如下:

     {
        "dataVersion":{
                "counter":1,
                "timestatmp":1429155593251
        },
        "subscriptionGroupTable":{
                "QuickStartConsumer":{
                        "brokerId":0,
                        "consumeBroadcastEnable":true,
                        "consumeEnable":true,
                        "consumeFromMinEnable":true,
                        "groupName":"QuickStartConsumer",
                        "retryMaxTimes":16,
                        "retryQueueNums":1,
                        "whichBrokerWhenConsumeSlowly":1
                }
        }
    }

    10.1,this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager);

        this.messageStoreConfig = messageStoreConfig;

        消息存储配置
        this.brokerStatsManager = brokerStatsManager;

        broker状态管理
        this.allocateMapedFileService = new AllocateMapedFileService();

        Pagecache文件封装 就是NIO的MappedByteBuffer内存映射

        this.commitLog = new CommitLog(this);

        落地所有的元数据信息,数据可靠性保护
        this.consumeQueueTable =
                new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(
                    32);
        消费队列实现

        this.flushConsumeQueueService = new FlushConsumeQueueService();

        逻辑队列刷盘服务

        this.cleanCommitLogService = new CleanCommitLogService();

        清理物理文件服务

        this.cleanConsumeQueueService = new CleanConsumeQueueService();

        清理逻辑文件服务

        this.dispatchMessageService =
                new DispatchMessageService(this.messageStoreConfig.getPutMsgIndexHightWater());

        分发消息索引服务

        this.storeStatsService = new StoreStatsService();

        存储层内部统计服务

        this.indexService = new IndexService(this);

        消息索引服务

        this.haService = new HAService(this);
        HA服务,负责同步双写,异步复制功能

        switch (this.messageStoreConfig.getBrokerRole()) {
        case SLAVE:
            this.reputMessageService = new ReputMessageService();

            从物理队列Load消息,并分发到各个逻辑队列

            reputMessageService依赖scheduleMessageService做定时消息的恢复,确保储备数据一致
            this.scheduleMessageService = new ScheduleMessageService(this);

            定时消息服务

            break;
        case ASYNC_MASTER:
        case SYNC_MASTER:
            this.reputMessageService = null;
            this.scheduleMessageService = new ScheduleMessageService(this);
            break;
        default:
            this.reputMessageService = null;
            this.scheduleMessageService = null;
        }

        load过程依赖此服务,所以提前启动
        this.allocateMapedFileService.start();
        this.dispatchMessageService.start();
        因为下面的recover会分发请求到索引服务,如果不启动,分发过程会被流控
        this.indexService.start();

    10.2,result = result && this.messageStore.load();

         boolean lastExitOK = !this.isTempFileExist();

         是否是异常停机 根据store目录下 abort文件判断
         log.info("last shutdown {}", (lastExitOK ? "normally" : "abnormally"));
         load 定时进度
         这个步骤要放置到最前面,从CommitLog里Recover定时消息需要依赖加载的定时级别参数
         slave依赖scheduleMessageService做定时消息的恢复
         if (null != scheduleMessageService) {
              result = result && this.scheduleMessageService.load();

              加载延迟配置
         }

         // load Commit Log
         result = result && this.commitLog.load();
         加载消息物理文件 位置为commitlog

         // load Consume Queue
         result = result && this.loadConsumeQueue();
         if (result) {
                this.storeCheckpoint =
                        new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig
                            .getStorePathRootDir()));

                记录存储模型最终一致的时间点  

                this.indexService.load(lastExitOK);

                // 尝试恢复数据
                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
          }


    10.2,this.remotingServer =
                    new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);

          这里是创建线程池及nettyNioEventLoopGroup设置,不知道可以看下第一章

          this.sendMessageExecutor = new ThreadPoolExecutor(//
                this.brokerConfig.getSendMessageThreadPoolNums(),//
                this.brokerConfig.getSendMessageThreadPoolNums(),//
                1000 * 60,//
                TimeUnit.MILLISECONDS,//
                this.sendThreadPoolQueue,//
                new ThreadFactoryImpl("SendMessageThread_"));
          发送消息线程池
          this.pullMessageExecutor = new ThreadPoolExecutor(//
                this.brokerConfig.getPullMessageThreadPoolNums(),//
                this.brokerConfig.getPullMessageThreadPoolNums(),//
                1000 * 60,//
                TimeUnit.MILLISECONDS,//
                this.pullThreadPoolQueue,//
                new ThreadFactoryImpl("PullMessageThread_"));
          订阅消息线程池

          this.adminBrokerExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(),
                        new ThreadFactoryImpl("AdminBrokerThread_"));
           admin broker线程池
           this.clientManageExecutor =
                    Executors.newFixedThreadPool(this.brokerConfig.getClientManageThreadPoolNums(),
                        new ThreadFactoryImpl("ClientManageThread_"));

            客户端管理线程池

    10.3,this.registerProcessor();

          /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
            this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
            this.sendMessageExecutor);

        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
            this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

        /**
         * QueryMessageProcessor
         */
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor,
            this.pullMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor,
            this.pullMessageExecutor);

        /**
         * ClientManageProcessor
         */
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        clientProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor,
            this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor,
            this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, clientProcessor,
            this.clientManageExecutor);

        /**
         * Offset存储更新转移到ClientProcessor处理
         */
        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, clientProcessor,
            this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, clientProcessor,
            this.clientManageExecutor);

        /**
         * EndTransactionProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this),
            this.sendMessageExecutor);

        /**
         * Default
         */
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);

        根据RequestCode注册不同的处理场景 个人感觉这种模式很不错 扩展性比较强 入口统一易维护

       this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);

       初始化broker状态统计

    10.4,final long initialDelay = UtilAll.computNextMorningTimeMillis() -                   System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();

                        设置每天的MSG put and get调用总数
                    }
                    catch (Exception e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();

                        定时写入消费记录到文件
                    }
                    catch (Exception e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.scanUnsubscribedTopic();

                    扫描数据被删除了的topic,offset记录也对应删除                    }
                    catch (Exception e) {
                        log.error("schedule scanUnsubscribedTopic error.", e);
                    }
                }
            }, 10, 60, TimeUnit.MINUTES);

            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());

                     更新namesrv地址
            }
            else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {

                     是否从地址服务器查找服务

                     默认地址为:http://jmenv.tbsite.net:8080/rocketmq/nsaddr  

                     线上关闭

               this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        }
                        catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null
                        && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                }
                else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();

                            同步数据 包括 队列 消费进度 延迟消息 订阅

                            主从同步
                        }
                        catch (Exception e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
            else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();

                            打印出 master 与 slave有什么不同
                        }
                        catch (Exception e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }

rocketmq broker 第三章注册过程2

标签:java   分布式   rocketmq   

原文地址:http://haqiaolong.blog.51cto.com/2834720/1633836

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!