大家好,接下上篇文章继续吧。
10,boolean initResult = controller.initialize();
这才是controller初始化的地方
boolean result = true;
result = result && this.topicConfigManager.load();
加载topic配置JSON串 如下:
{
"dataVersion":{
"counter":4,
"timestatmp":1429155762598
},计数器,时间戳
"topicConfigTable":{
"DefaultCluster":{
"order":false,
"perm":7,
"readQueueNums":16,
"topicFilterType":"SINGLE_TAG",
"topicName":"DefaultCluster",
"topicSysFlag":0,
"writeQueueNums":16
},默认集群topic
"OFFSET_MOVED_EVENT":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"OFFSET_MOVED_EVENT",
"topicSysFlag":0,
"writeQueueNums":1
},内存偏移量变动topic
"BenchmarkTest":{
"order":false,
"perm":6,
"readQueueNums":1024,
"topicFilterType":"SINGLE_TAG",
"topicName":"BenchmarkTest",
"topicSysFlag":0,
"writeQueueNums":1024
},标准的topic我猜是做线上测试用的
"SELF_TEST_TOPIC":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"SELF_TEST_TOPIC",
"topicSysFlag":0,
"writeQueueNums":1
},自己测试用的topic
"%RETRY%QuickStartConsumer":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%RETRY%QuickStartConsumer",
"topicSysFlag":0,
"writeQueueNums":1
},重试队列
"TBW102":{
"order":false,
"perm":7,
"readQueueNums":8,
"topicFilterType":"SINGLE_TAG",
"topicName":"TBW102",
"topicSysFlag":0,
"writeQueueNums":8
},默认的
"TopicTest":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
}
}
result = result && this.consumerOffsetManager.load();
加载topic消费进度 json 如下:
{
"offsetTable":{
"TopicTest@haqiaolong":{0:750,2:750,1:750,3:750
0:代码分区 750代表索引位置
},
"QuickStart@QuickStartConsumer":{0:250,2:250,1:250,3:250
}
}
}
result = result && this.subscriptionGroupManager.load();
加载订阅消息配置 json 如下:
{
"dataVersion":{
"counter":1,
"timestatmp":1429155593251
},
"subscriptionGroupTable":{
"QuickStartConsumer":{
"brokerId":0,
"consumeBroadcastEnable":true,
"consumeEnable":true,
"consumeFromMinEnable":true,
"groupName":"QuickStartConsumer",
"retryMaxTimes":16,
"retryQueueNums":1,
"whichBrokerWhenConsumeSlowly":1
}
}
}
10.1,this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager);
this.messageStoreConfig = messageStoreConfig;
消息存储配置
this.brokerStatsManager = brokerStatsManager;
broker状态管理
this.allocateMapedFileService = new AllocateMapedFileService();
Pagecache文件封装 就是NIO的MappedByteBuffer内存映射
this.commitLog = new CommitLog(this);
落地所有的元数据信息,数据可靠性保护
this.consumeQueueTable =
new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(
32);
消费队列实现
this.flushConsumeQueueService = new FlushConsumeQueueService();
逻辑队列刷盘服务
this.cleanCommitLogService = new CleanCommitLogService();
清理物理文件服务
this.cleanConsumeQueueService = new CleanConsumeQueueService();
清理逻辑文件服务
this.dispatchMessageService =
new DispatchMessageService(this.messageStoreConfig.getPutMsgIndexHightWater());
分发消息索引服务
this.storeStatsService = new StoreStatsService();
存储层内部统计服务
this.indexService = new IndexService(this);
消息索引服务
this.haService = new HAService(this);
HA服务,负责同步双写,异步复制功能
switch (this.messageStoreConfig.getBrokerRole()) {
case SLAVE:
this.reputMessageService = new ReputMessageService();
从物理队列Load消息,并分发到各个逻辑队列
reputMessageService依赖scheduleMessageService做定时消息的恢复,确保储备数据一致
this.scheduleMessageService = new ScheduleMessageService(this);
定时消息服务
break;
case ASYNC_MASTER:
case SYNC_MASTER:
this.reputMessageService = null;
this.scheduleMessageService = new ScheduleMessageService(this);
break;
default:
this.reputMessageService = null;
this.scheduleMessageService = null;
}
load过程依赖此服务,所以提前启动
this.allocateMapedFileService.start();
this.dispatchMessageService.start();
因为下面的recover会分发请求到索引服务,如果不启动,分发过程会被流控
this.indexService.start();
10.2,result = result && this.messageStore.load();
boolean lastExitOK = !this.isTempFileExist();
是否是异常停机 根据store目录下 abort文件判断
log.info("last shutdown {}", (lastExitOK ? "normally" : "abnormally"));
load 定时进度
这个步骤要放置到最前面,从CommitLog里Recover定时消息需要依赖加载的定时级别参数
slave依赖scheduleMessageService做定时消息的恢复
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
加载延迟配置
}
// load Commit Log
result = result && this.commitLog.load();
加载消息物理文件 位置为commitlog
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig
.getStorePathRootDir()));
记录存储模型最终一致的时间点
this.indexService.load(lastExitOK);
// 尝试恢复数据
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
10.2,this.remotingServer =
new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
这里是创建线程池及nettyNioEventLoopGroup设置,不知道可以看下第一章
this.sendMessageExecutor = new ThreadPoolExecutor(//
this.brokerConfig.getSendMessageThreadPoolNums(),//
this.brokerConfig.getSendMessageThreadPoolNums(),//
1000 * 60,//
TimeUnit.MILLISECONDS,//
this.sendThreadPoolQueue,//
new ThreadFactoryImpl("SendMessageThread_"));
发送消息线程池
this.pullMessageExecutor = new ThreadPoolExecutor(//
this.brokerConfig.getPullMessageThreadPoolNums(),//
this.brokerConfig.getPullMessageThreadPoolNums(),//
1000 * 60,//
TimeUnit.MILLISECONDS,//
this.pullThreadPoolQueue,//
new ThreadFactoryImpl("PullMessageThread_"));
订阅消息线程池
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(),
new ThreadFactoryImpl("AdminBrokerThread_"));
admin broker线程池
this.clientManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getClientManageThreadPoolNums(),
new ThreadFactoryImpl("ClientManageThread_"));
客户端管理线程池
10.3,this.registerProcessor();
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor,
this.pullMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor,
this.pullMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
clientProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor,
this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor,
this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, clientProcessor,
this.clientManageExecutor);
/**
* Offset存储更新转移到ClientProcessor处理
*/
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, clientProcessor,
this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, clientProcessor,
this.clientManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this),
this.sendMessageExecutor);
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
根据RequestCode注册不同的处理场景 个人感觉这种模式很不错 扩展性比较强 入口统一易维护
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
初始化broker状态统计
10.4,final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
设置每天的MSG put and get调用总数
}
catch (Exception e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
定时写入消费记录到文件
}
catch (Exception e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.scanUnsubscribedTopic();
扫描数据被删除了的topic,offset记录也对应删除 }
catch (Exception e) {
log.error("schedule scanUnsubscribedTopic error.", e);
}
}
}, 10, 60, TimeUnit.MINUTES);
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
更新namesrv地址
}
else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
是否从地址服务器查找服务
默认地址为:http://jmenv.tbsite.net:8080/rocketmq/nsaddr
线上关闭
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
}
catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null
&& this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
}
else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
同步数据 包括 队列 消费进度 延迟消息 订阅
主从同步
}
catch (Exception e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
打印出 master 与 slave有什么不同
}
catch (Exception e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
原文地址:http://haqiaolong.blog.51cto.com/2834720/1633836