观看麻豆影视文化有限公司-国产 高清 在线-国产 日韩 欧美 亚洲-国产 日韩 欧美 综合-日日夜夜免费精品视频-日日夜夜噜

rocketmq源碼解析(rocketmq源碼部署)

  • 生活
  • 2023-04-25 12:29

多條告白如次劇本只需引入一次

正文重要領會RocketMQ中怎樣保護動靜無序的。

RocketMQ的本子為:4.2.0release。

一.時序圖

仍舊老規則,先把領會進程的時序圖擺出來:

1.Producer發送程序動靜

2.Consumer接受程序動靜(一)

3.Consumer接受程序動靜(二)

二.源碼領會–Producer發送程序動靜

1DefaultMQProducer#send:發送動靜,入參中有自設置的動靜部隊采用器。

//DefaultMQProducer#sendpublicSendResultsend(Messagemsg,MessageQueueSelectorselector,Objectarg)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{returnthis.defaultMQProducerImpl.send(msg,selector,arg);}1.1DefaultMQProducerImpl#makeSureStateOK:保證Producer的狀況是運奇跡態-ServiceState.RUNNING。

//DefaultMQProducerImpl#makeSureStateOKprivatevoidmakeSureStateOK()throwsMQClientException{if(this.serviceState!=ServiceState.RUNNING){thrownewMQClientException("TheproducerservicestatenotOK,"+this.serviceState+FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}}1.2DefaultMQProducerImpl#tryToFindTopicPublishInfo:按照Topic獲得頒布Topic用到的路由消息。

//DefaultMQProducerImpl#tryToFindTopicPublishInfoprivateTopicPublishInfotryToFindTopicPublishInfo(finalStringtopic){TopicPublishInfotopicPublishInfo=this.topicPublishInfoTable.get(topic);if(null==topicPublishInfo||!topicPublishInfo.ok()){this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//為空則從NameServer革新獲得,false,不傳入defaultMQProducertopicPublishInfo=this.topicPublishInfoTable.get(topic);}if(topicPublishInfo.isHaveTopicRouterInfo()||topicPublishInfo.ok()){//有了路由消息并且狀況OK,則歸來returntopicPublishInfo;}else{this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);topicPublishInfo=this.topicPublishInfoTable.get(topic);returntopicPublishInfo;}}1.3挪用自設置動靜部隊采用器的select本領。

//DefaultMQProducerImpl#sendSelectImplMessageQueuemq=null;try{mq=selector.select(topicPublishInfo.getMessageQueueList(),msg,arg);}catch(Throwablee){thrownewMQClientException("selectmessagequeuethrowedexception.",e);}//Producer#mainSendResultsendResult=producer.send(msg,newMessageQueueSelector(){@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg){Integerid=(Integer)arg;intindex=id%mqs.size();returnmqs.get(index);}},orderId);1.4DefaultMQProducerImpl#sendKernelImpl:發送動靜的中心實行本領。

//DefaultMQProducerImpl#sendKernelImpl......switch(communicationMode){caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeout<costTimeSync){thrownewRemotingTooMuchRequestException("sendKernelImplcalltimeout");}sendResult=this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout-costTimeSync,communicationMode,context,this);break;......1.4.1MQClientAPIImpl#sendMessage:發送動靜。

//MQClientAPIImpl#sendMessage......switch(communicationMode){//按照發送動靜的形式(同步/異步)采用各別的辦法,默許是同步caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeoutMillis<costTimeSync){thrownewRemotingTooMuchRequestException("sendMessagecalltimeout");}returnthis.sendMessageSync(addr,brokerName,msg,timeoutMillis-costTimeSync,request);......1.4.1.1MQClientAPIImpl#sendMessageSync:發送同步動靜。

//MQClientAPIImpl#sendMessageSyncprivateSendResultsendMessageSync(finalStringaddr,finalStringbrokerName,finalMessagemsg,finallongtimeoutMillis,finalRemotingCommandrequest)throwsRemotingException,MQBrokerException,InterruptedException{RemotingCommandresponse=this.remotingClient.invokeSync(addr,request,timeoutMillis);assertresponse!=null;returnthis.processSendResponse(brokerName,msg,response);}1.4.1.1.1NettyRemotingClient#invokeSync:結構RemotingCommand,挪用的辦法是同步。

//NettyRemotingClient#invokeSyncRemotingCommandresponse=this.invokeSyncImpl(channel,request,timeoutMillis-costTime);if(this.rpcHook!=null){this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel),request,response);}returnresponse;三.源碼領會–Consumer接受程序動靜(一)

1DefaultMQPushConsumer#registerMessageListener:把Consumer傳入的動靜監聽器介入到messageListener中。

//DefaultMQPushConsumer#registerMessageListenerpublicvoidregisterMessageListener(MessageListenerOrderlymessageListener){this.messageListener=messageListener;this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);}1.1DefaultMQPushConsumerImpl#registerMessageListener:把Consumer傳入的動靜監聽器介入到messageListenerInner中。

//DefaultMQPushConsumerImpl#registerMessageListenerpublicvoidregisterMessageListener(MessageListenermessageListener){this.messageListenerInner=messageListener;}2DefaultMQPushConsumer#start:啟用Consumer。

//DefaultMQPushConsumer#startpublicvoidstart()throwsMQClientException{this.defaultMQPushConsumerImpl.start();}2.1DefaultMQPushConsumerImpl#start:啟用ConsumerImpl。

//DefaultMQPushConsumerImpl#startswitch(this.serviceState){caseCREATE_JUST://方才創造......if(this.getMessageListenerInner()instanceofMessageListenerOrderly){//無序動靜效勞this.consumeOrderly=true;this.consumeMessageService=newConsumeMessageOrderlyService(this,(MessageListenerOrderly)this.getMessageListenerInner());}elseif(this.getMessageListenerInner()instanceofMessageListenerConcurrently){//并發無序動靜效勞this.consumeOrderly=false;this.consumeMessageService=newConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently)this.getMessageListenerInner());}......this.consumeMessageService.start();//啟用動靜效勞......mQClientFactory.start();//啟用MQClientInstance......2.1.1newConsumeMessageOrderlyService():結構程序動靜效勞。

//ConsumeMessageOrderlyService#ConsumeMessageOrderlyServicepublicConsumeMessageOrderlyService(DefaultMQPushConsumerImpldefaultMQPushConsumerImpl,MessageListenerOrderlymessageListener){this.defaultMQPushConsumerImpl=defaultMQPushConsumerImpl;this.messageListener=messageListener;this.defaultMQPushConsumer=this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup=this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue=newLinkedBlockingQueue<Runnable>();this.consumeExecutor=newThreadPoolExecutor(//主動靜耗費線程池,平常實行收到的ConsumeRequest。多線程this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000*60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,newThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService=Executors.newSingleThreadScheduledExecutor(newThreadFactoryImpl("ConsumeMessageScheduledThread_"));}2.1.2ConsumeMessageOrderlyService#start:啟用動靜部隊存戶端范例。

//DefaultMQPushConsumerImpl#startthis.consumeMessageService.start();//ConsumeMessageOrderlyService#startpublicvoidstart(){if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())){this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){ConsumeMessageOrderlyService.this.lockMQPeriodically();//準時向broker發送批量鎖住暫時正在耗費的部隊匯合的動靜}},1000*1,ProcessQueue.REBALANCE_LOCK_INTERVAL,TimeUnit.MILLISECONDS);}}2.1.2.1ConsumeMessageOrderlyService#lockMQPeriodically:準時向broker發送批量鎖住暫時正在耗費的部隊匯合的動靜。

2.1.2.1.1RebalanceImpl#lockAll:鎖住一切正在動靜的部隊。

//ConsumeMessageOrderlyService#lockMQPeriodicallyif(!this.stopped){this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}//RebalanceImpl#lockAllHashMap<String,Set<MessageQueue>>brokerMqs=this.buildProcessQueueTableByBrokerName();//按照brokerName從processQueueTable獲得正在耗費的部隊匯合......Set<MessageQueue>lockOKMQSet=this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(),requestBody,1000);//向Broker發送鎖住動靜部隊的訓令for(MessageQueuemq:lockOKMQSet){ProcessQueueprocessQueue=this.processQueueTable.get(mq);if(processQueue!=null){if(!processQueue.isLocked()){log.info("themessagequeuelockedOK,Group:{}{}",this.consumerGroup,mq);}processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}......2.1.3MQClientInstance#start:啟用MQClientInstance。進程較攙雜,放到大題目四中領會。

//DefaultMQPushConsumerImpl#startmQClientFactory.start();四.源碼領會–Consumer接受程序動靜(二)

1MQClientInstance#start:啟用存戶端范例MQClientInstance。

//MQClientInstance#startsynchronized(this){switch(this.serviceState){caseCREATE_JUST:......//Startpullservice啟用拉廢除息效勞this.pullMessageService.start();//Startrebalanceservice啟用耗費端負載平衡效勞this.rebalanceService.start();......1.1PullMessageService#run:啟用拉廢除息效勞。本質挪用的是DefaultMQPushConsumerImpl的pullMessage本領。

//PullMessageService#runpublicvoidrun(){log.info(this.getServiceName()+"servicestarted");while(!this.isStopped()){try{PullRequestpullRequest=this.pullRequestQueue.take();this.pullMessage(pullRequest);}catch(InterruptedExceptionignored){}catch(Exceptione){log.error("PullMessageServiceRunMethodexception",e);}}log.info(this.getServiceName()+"serviceend");}//PullMessageService#pullMessageprivatevoidpullMessage(finalPullRequestpullRequest){finalMQConsumerInnerconsumer=this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if(consumer!=null){DefaultMQPushConsumerImplimpl=(DefaultMQPushConsumerImpl)consumer;impl.pullMessage(pullRequest);//挪用DefaultMQPushConsumerImpl的pullMessage}else{log.warn("NomatchedconsumerforthePullRequest{},dropit",pullRequest);}}1.1.1.1DefaultMQPushConsumerImpl#pullMessage:拉廢除息。提交到ConsumeMessageOrderlyService的線程池consumeExecutor中實行。

//DefaultMQPushConsumerImpl#pullMessage......PullCallbackpullCallback=newPullCallback(){@OverridepublicvoidonSuccess(PullResultpullResult){switch(pullResult.getPullStatus()){caseFOUND:longprevRequestOffset=pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());longpullRT=System.currentTimeMillis()-beginTimestamp;......DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);......1.1.1.1.1.1.1

猜你喜歡

主站蜘蛛池模板: 亚洲欧美日本综合 | 日本三级一区二区三区 | 国产亚洲欧美成人久久片 | 日韩大片高清播放器大全 | 欧美在线视频二区 | 欧洲一级鲁丝片免费 | 亚洲欧美日韩久久一区 | 日本乱理伦片在线观看网址 | 亚洲精品国产国语 | 亚洲高清在线观看视频 | 中文字幕波多野不卡一区 | 欧美日韩在线观看精品 | 草草影院ccyy免费看片 | 在线播放人成午夜免费视频 | 黄视频欧美 | 国产亚洲美女精品久久 | 一级毛片黄片 | 国产日本在线视频 | 久久在线免费观看视频 | 欧美日产国产亚洲综合图区一 | 三级国产三级在线 | 久草精品在线 | 国产夫妇肉麻对白 | 成人网18免费软件 | 国产xvideos国产在线 | 99热精品在线免费观看 | 亚洲欧美韩国 | 制服丝袜在线视频香蕉 | 国产精品免费大片 | 久久久久久久免费视频 | 久久精品国产一区二区三区日韩 | 夜色精品国产一区二区 | 国产精品一区高清在线观看 | 色噜噜狠狠大色综合 | 99视频有精品视频免费观看 | 国产午夜精品久久久久九九 | 久久曰视频 | 亚洲精品久久九九精品 | 男人使劲躁女人视频小v | 国产亚洲一区呦系列 | 日韩特级黄色片 |