多條告白如次劇本只需引入一次
正文重要領會RocketMQ中怎樣保護動靜無序的。
RocketMQ的本子為:4.2.0release。
一.時序圖
仍舊老規則,先把領會進程的時序圖擺出來:
1.Producer發送程序動靜
2.Consumer接受程序動靜(一)
3.Consumer接受程序動靜(二)
二.源碼領會–Producer發送程序動靜
1DefaultMQProducer#send:發送動靜,入參中有自設置的動靜部隊采用器。
//DefaultMQProducer#sendpublicSendResultsend(Messagemsg,MessageQueueSelectorselector,Objectarg)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{returnthis.defaultMQProducerImpl.send(msg,selector,arg);}1.1DefaultMQProducerImpl#makeSureStateOK:保證Producer的狀況是運奇跡態-ServiceState.RUNNING。
//DefaultMQProducerImpl#makeSureStateOKprivatevoidmakeSureStateOK()throwsMQClientException{if(this.serviceState!=ServiceState.RUNNING){thrownewMQClientException("TheproducerservicestatenotOK,"+this.serviceState+FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}}1.2DefaultMQProducerImpl#tryToFindTopicPublishInfo:按照Topic獲得頒布Topic用到的路由消息。
//DefaultMQProducerImpl#tryToFindTopicPublishInfoprivateTopicPublishInfotryToFindTopicPublishInfo(finalStringtopic){TopicPublishInfotopicPublishInfo=this.topicPublishInfoTable.get(topic);if(null==topicPublishInfo||!topicPublishInfo.ok()){this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//為空則從NameServer革新獲得,false,不傳入defaultMQProducertopicPublishInfo=this.topicPublishInfoTable.get(topic);}if(topicPublishInfo.isHaveTopicRouterInfo()||topicPublishInfo.ok()){//有了路由消息并且狀況OK,則歸來returntopicPublishInfo;}else{this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);topicPublishInfo=this.topicPublishInfoTable.get(topic);returntopicPublishInfo;}}1.3挪用自設置動靜部隊采用器的select本領。
//DefaultMQProducerImpl#sendSelectImplMessageQueuemq=null;try{mq=selector.select(topicPublishInfo.getMessageQueueList(),msg,arg);}catch(Throwablee){thrownewMQClientException("selectmessagequeuethrowedexception.",e);}//Producer#mainSendResultsendResult=producer.send(msg,newMessageQueueSelector(){@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg){Integerid=(Integer)arg;intindex=id%mqs.size();returnmqs.get(index);}},orderId);1.4DefaultMQProducerImpl#sendKernelImpl:發送動靜的中心實行本領。
//DefaultMQProducerImpl#sendKernelImpl......switch(communicationMode){caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeout<costTimeSync){thrownewRemotingTooMuchRequestException("sendKernelImplcalltimeout");}sendResult=this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout-costTimeSync,communicationMode,context,this);break;......1.4.1MQClientAPIImpl#sendMessage:發送動靜。
//MQClientAPIImpl#sendMessage......switch(communicationMode){//按照發送動靜的形式(同步/異步)采用各別的辦法,默許是同步caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeoutMillis<costTimeSync){thrownewRemotingTooMuchRequestException("sendMessagecalltimeout");}returnthis.sendMessageSync(addr,brokerName,msg,timeoutMillis-costTimeSync,request);......1.4.1.1MQClientAPIImpl#sendMessageSync:發送同步動靜。
//MQClientAPIImpl#sendMessageSyncprivateSendResultsendMessageSync(finalStringaddr,finalStringbrokerName,finalMessagemsg,finallongtimeoutMillis,finalRemotingCommandrequest)throwsRemotingException,MQBrokerException,InterruptedException{RemotingCommandresponse=this.remotingClient.invokeSync(addr,request,timeoutMillis);assertresponse!=null;returnthis.processSendResponse(brokerName,msg,response);}1.4.1.1.1NettyRemotingClient#invokeSync:結構RemotingCommand,挪用的辦法是同步。
//NettyRemotingClient#invokeSyncRemotingCommandresponse=this.invokeSyncImpl(channel,request,timeoutMillis-costTime);if(this.rpcHook!=null){this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel),request,response);}returnresponse;三.源碼領會–Consumer接受程序動靜(一)
1DefaultMQPushConsumer#registerMessageListener:把Consumer傳入的動靜監聽器介入到messageListener中。
//DefaultMQPushConsumer#registerMessageListenerpublicvoidregisterMessageListener(MessageListenerOrderlymessageListener){this.messageListener=messageListener;this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);}1.1DefaultMQPushConsumerImpl#registerMessageListener:把Consumer傳入的動靜監聽器介入到messageListenerInner中。
//DefaultMQPushConsumerImpl#registerMessageListenerpublicvoidregisterMessageListener(MessageListenermessageListener){this.messageListenerInner=messageListener;}2DefaultMQPushConsumer#start:啟用Consumer。
//DefaultMQPushConsumer#startpublicvoidstart()throwsMQClientException{this.defaultMQPushConsumerImpl.start();}2.1DefaultMQPushConsumerImpl#start:啟用ConsumerImpl。
//DefaultMQPushConsumerImpl#startswitch(this.serviceState){caseCREATE_JUST://方才創造......if(this.getMessageListenerInner()instanceofMessageListenerOrderly){//無序動靜效勞this.consumeOrderly=true;this.consumeMessageService=newConsumeMessageOrderlyService(this,(MessageListenerOrderly)this.getMessageListenerInner());}elseif(this.getMessageListenerInner()instanceofMessageListenerConcurrently){//并發無序動靜效勞this.consumeOrderly=false;this.consumeMessageService=newConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently)this.getMessageListenerInner());}......this.consumeMessageService.start();//啟用動靜效勞......mQClientFactory.start();//啟用MQClientInstance......2.1.1newConsumeMessageOrderlyService():結構程序動靜效勞。
//ConsumeMessageOrderlyService#ConsumeMessageOrderlyServicepublicConsumeMessageOrderlyService(DefaultMQPushConsumerImpldefaultMQPushConsumerImpl,MessageListenerOrderlymessageListener){this.defaultMQPushConsumerImpl=defaultMQPushConsumerImpl;this.messageListener=messageListener;this.defaultMQPushConsumer=this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup=this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue=newLinkedBlockingQueue<Runnable>();this.consumeExecutor=newThreadPoolExecutor(//主動靜耗費線程池,平常實行收到的ConsumeRequest。多線程this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000*60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,newThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService=Executors.newSingleThreadScheduledExecutor(newThreadFactoryImpl("ConsumeMessageScheduledThread_"));}2.1.2ConsumeMessageOrderlyService#start:啟用動靜部隊存戶端范例。
//DefaultMQPushConsumerImpl#startthis.consumeMessageService.start();//ConsumeMessageOrderlyService#startpublicvoidstart(){if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())){this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){ConsumeMessageOrderlyService.this.lockMQPeriodically();//準時向broker發送批量鎖住暫時正在耗費的部隊匯合的動靜}},1000*1,ProcessQueue.REBALANCE_LOCK_INTERVAL,TimeUnit.MILLISECONDS);}}2.1.2.1ConsumeMessageOrderlyService#lockMQPeriodically:準時向broker發送批量鎖住暫時正在耗費的部隊匯合的動靜。
2.1.2.1.1RebalanceImpl#lockAll:鎖住一切正在動靜的部隊。
//ConsumeMessageOrderlyService#lockMQPeriodicallyif(!this.stopped){this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}//RebalanceImpl#lockAllHashMap<String,Set<MessageQueue>>brokerMqs=this.buildProcessQueueTableByBrokerName();//按照brokerName從processQueueTable獲得正在耗費的部隊匯合......Set<MessageQueue>lockOKMQSet=this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(),requestBody,1000);//向Broker發送鎖住動靜部隊的訓令for(MessageQueuemq:lockOKMQSet){ProcessQueueprocessQueue=this.processQueueTable.get(mq);if(processQueue!=null){if(!processQueue.isLocked()){log.info("themessagequeuelockedOK,Group:{}{}",this.consumerGroup,mq);}processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}......2.1.3MQClientInstance#start:啟用MQClientInstance。進程較攙雜,放到大題目四中領會。
//DefaultMQPushConsumerImpl#startmQClientFactory.start();四.源碼領會–Consumer接受程序動靜(二)
1MQClientInstance#start:啟用存戶端范例MQClientInstance。
//MQClientInstance#startsynchronized(this){switch(this.serviceState){caseCREATE_JUST:......//Startpullservice啟用拉廢除息效勞this.pullMessageService.start();//Startrebalanceservice啟用耗費端負載平衡效勞this.rebalanceService.start();......1.1PullMessageService#run:啟用拉廢除息效勞。本質挪用的是DefaultMQPushConsumerImpl的pullMessage本領。
//PullMessageService#runpublicvoidrun(){log.info(this.getServiceName()+"servicestarted");while(!this.isStopped()){try{PullRequestpullRequest=this.pullRequestQueue.take();this.pullMessage(pullRequest);}catch(InterruptedExceptionignored){}catch(Exceptione){log.error("PullMessageServiceRunMethodexception",e);}}log.info(this.getServiceName()+"serviceend");}//PullMessageService#pullMessageprivatevoidpullMessage(finalPullRequestpullRequest){finalMQConsumerInnerconsumer=this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if(consumer!=null){DefaultMQPushConsumerImplimpl=(DefaultMQPushConsumerImpl)consumer;impl.pullMessage(pullRequest);//挪用DefaultMQPushConsumerImpl的pullMessage}else{log.warn("NomatchedconsumerforthePullRequest{},dropit",pullRequest);}}1.1.1.1DefaultMQPushConsumerImpl#pullMessage:拉廢除息。提交到ConsumeMessageOrderlyService的線程池consumeExecutor中實行。
//DefaultMQPushConsumerImpl#pullMessage......PullCallbackpullCallback=newPullCallback(){@OverridepublicvoidonSuccess(PullResultpullResult){switch(pullResult.getPullStatus()){caseFOUND:longprevRequestOffset=pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());longpullRT=System.currentTimeMillis()-beginTimestamp;......DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);......1.1.1.1.1.1.1