consumer consume是什么意思

时间:2023-04-22 03:06/span> 作者:tiger 分类: 经验 浏览:1028 评论:0

RocketMQ客户端既支持MQPullConsumer,也支持MQPushConsumer。

DefaultMQPushConsumer是官方推荐使用的消费端,它不像MQPullConsumer需要用户主动调用pull方法来处理消息,而是要求用户订阅topic,并注册MessageListener,以一种貌似被动处理推送的消息形式呈现给用户。

先看下简单的DefaultLitePullConsumer:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace; // 1. 某些构造函数不含namespace,则与producer相同,使用时从namesrvaddr推测
        this.consumerGroup = consumerGroup; // 2. 某些构造函数不含consumerGroup,则使用默认的“DEFAULT_CONSUMER”
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }
	
    @Override
    public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); // 3. 订阅topic,给defaultLitePullConsumer注册MessageQueueListenerImpl,当MessageQueue变更时更新assignedMessageQueue(旧的ProcessQueue设为dropped,并移除,然后添加新的MessageQueue),最后启动PullTask
    }
	
    @Override
    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
        return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
    }	
	    @Override
    public void assign(Collection<MessageQueue> messageQueues) {
        defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); // 3. 启动后根据topic获取MessageQueue,再分配MessageQueue
    }
    @Override
    public void start() throws MQClientException {
        setTraceDispatcher(); // 4. 如果启用消息跟踪(默认不启用,可以在构造后设置enableMsgTrace),则创建AsyncTraceDispatcher,向defaultLitePullConsumerImpl注册ConsumeMessageTraceHookImpl这一ConsumeMessageHook
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // 5. 与Producer类似,将consumerGroup带上可能的namespace
        this.defaultLitePullConsumerImpl.start(); // 6. 启动defaultLitePullConsumerImpl
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 7. 启动traceDispatcher:创建并启动线程AsyncRunnable,负责向线程池提交发送trace消息任务(消息内容包含msgId等信息)
            } catch (MQClientException e) {
                log.warn(&34;trace dispatcher start failed &34;, e);
            }
        }
    }

跟进DefaultLitePullConsumerImpl:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig(); // 1. 检查consumerGroup(不能为null,字符长度小于255,字符只能是“%|a-zA-Z0-9_-”,不能是“DEFAULT_CONSUMER”),messageModel不能为null(默认为集群模式),allocateMessageQueueStrategy不能为null(默认为AllocateMessageQueueAveragely),consumerTimeoutMillisWhenSuspend不能小于broker的设置
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID(); // 2. (如果为设置系统属性“rocketmq.client.name”)重置实例名称
                }
                initMQClientFactory(); // 3. 创建MQClientInstance,然后往MQClientInstance.consumerTable注册consumerGroup对应的MQConsumerInner为当前对象
                initRebalanceImpl(); // 4. 设置rebalanceImpl的consumerGroup、messageModel、allocateMessageQueueStrategy、mqClientFactory
                initPullAPIWrapper(); // 5. 创建PullAPIWrapper,并给PullAPIWrapper注册FilterMessageHook
                initOffsetStore(); // 6. (如果未设置offsetStore)根据messageModel创建OffsetStore(集群模式是LocalFileOffsetStore,广播模式是RemoteBrokerOffsetStore),然后加载当前的offset(LocalFileOffsetStore默认加载&34;${user.home}/.rocketmq_offsets&34;,RemoteBrokerOffsetStore加载为空方法)
                mQClientFactory.start(); // 7. 参考Producer的启动:如果未设置namesrvAddr,则请求URL获取;启动netty;启动定时任务(定期获取namesrvAddr,定期从namesrvAddr更新topicRouteInfo,定期清理下线broker,定期给broker发送心跳包,定期持久化consumer的offset,定期调整消费的线程池核心线程数);创建并启动PullMessageService线程来拉取消息;创建并启动RebalanceService线程来根据topic里消费组下消费者数量和分配策略选择MessageQueue
                startScheduleTask(); // 8. 线程池中提交任务,遍历topicMessageQueueChangeListenerMap,根据topic分组得到MessageQueue,如果从namesrv获取的MessageQueue发生变化,则调用TopicMessageQueueChangeListener.onChanged
                this.serviceState = ServiceState.RUNNING;
                log.info(&34;the consumer [{}] start OK&34;, this.defaultLitePullConsumer.getConsumerGroup());
                operateAfterRunning(); // 9. 根据subscriptionType(调用subsrcibe时会设置为“SUBSCRIBE”,从namesrv获取TopicRouteData,更新topic对应的topicRouteTable、brokerAddrTable;调用assign会被设置为“ASSIGN”,根据分配的的MessageQueue启动PullTaskImpl);遍历topicMessageQueueChangeListenerMap,根据topic获取最新的MessageQuewue集合,更新messageQueuesForTopic
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException(&34;The PullConsumer service state not OK, maybe started once, &34;
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

DefaultLitePullConsumer.poll方法只是从consumeRequestCache这个队列取ConsumeRequest得到Message。

要么是因为调用DefaultLitePullConsumer.subscribe而启动了PullTaskImpl,要么是因为调用DefaultLitePullConsumer.assign而启动PullTaskImpl。

那我们看下PullTaskImpl做了些什么:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.PullTaskImpl
        public void run() {
            if (!this.isCancelled()) { // 1. 通过DefaultLitePullConsumer.unsubscribe可以取消拉取任务
                if (assignedMessageQueue.isPaused(messageQueue)) { // 2. 通过DefaultLitePullConsumer.pause方法可以暂停MessageQueue
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
                    log.debug(&34;Message Queue: {} has been paused!&34;, messageQueue);
                    return;
                }
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                if (null == processQueue || processQueue.isDropped()) {
                    log.info(&34;The message queue not be able to poll, because it&39;s dropped. group={}, messageQueue={}&34;, defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
                    return;
                }
				// 3. 未处理消息较多(数量、消息大小或未消费消息新老时间超出),流控,退出拉取
                if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
                        log.warn(&34;The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}&34;, consumeRequestCache.size(), consumeRequestFlowControlTimes);
                    }
                    return;
                }
                long cachedMessageCount = processQueue.getMsgCount().get();
                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
                if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            &34;The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}&34;,
                            defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                    }
                    return;
                }
                if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            &34;The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}&34;,
                            defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
                    }
                    return;
                }
                if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                        log.warn(
                            &34;The queue&39;s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}&34;,
                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
                    }
                    return;
                }
                long offset = 0L;
                try {
                    offset = nextPullOffset(messageQueue);
                } catch (Exception e) {
                    log.error(&34;Failed to get next pull offset&34;, e);
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
                    return;
                }
                if (this.isCancelled() || processQueue.isDropped()) { // 4. 再次判断
                    return;
                }
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    String topic = this.messageQueue.getTopic();
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
                    }
                    
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); // 5. 使用pullAPIWrapper同步拉取消息,拉取前执行RPCHook.doBeforeRequest,拉取后执行RPCHook.doAfterResponse,最后根据tag过滤,然后执行FilterMessageHook.filterMessage
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) {
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    processQueue.putMessage(pullResult.getMsgFoundList()); // 6. 将消息的queueOffset为key映射保存到msgTreeMap
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); // 7. 将consumeRequestCache放入consumeRequestCache
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn(&34;The pull request offset illegal, {}&34;, pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); // 8. (如果MessageQueue的seekOffset未设置)设置MessageQueue相关的pullOffset
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                    log.error(&34;An error occurred in pull message process.&34;, e);
                }
                if (!this.isCancelled()) {
                    scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
                } else {
                    log.warn(&34;The Pull Task is cancelled after doPullTask, {}&34;, messageQueue);
                }
            }
        }

现在回到主要的&34;推模式&34;,对于DefaultMQPushConsumer大致与DefaultLitePullConsumer相同:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
	
    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
        this.consumerGroup = consumerGroup; // 1. 某些构造函数不含consumerGroup,则使用默认的“DEFAULT_CONSUMER”
        this.namespace = namespace; // 2. 某些构造函数不含namespace,则与producer相同,使用时从namesrvaddr推测
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; // 3. 某些构造函数不含allocateMessageQueueStrategy,使用默认的AllocateMessageQueueAveragely
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
        if (enableMsgTrace) { // 4. 某些构造函数不含enableMsgTrace和customizedTraceTopic
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
                dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                traceDispatcher = dispatcher;
                this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( // 5. 创建AsyncTraceDispatcher,并给defaultMQPushConsumerImpl注册ConsumeMessageHook
                    new ConsumeMessageTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error(&34;system mqtrace hook init failed ,maybe can&39;t send msg trace data&34;);
            }
        }
    }	
	
    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); // 6. 保存订阅关系,给broker发送心跳
    }	
	
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); // 7. 设置MessageListener
    }	
	
    @Override
    public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); //8 .重置consumerGroup带上可能的namespace
        this.defaultMQPushConsumerImpl.start(); // 9. 启动defaultMQPushConsumerImpl
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 10. (如果开启消息跟踪)启动traceDispatcher,线程池中执行发送消息任务(消息内容为消费消息后通过ConsumeMessageHook留下的msgId等内容)
            } catch (MQClientException e) {
                log.warn(&34;trace dispatcher start failed &34;, e);
            }
        }
    }	

而DefaultMQPushConsumerImpl内容也与基本相同:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info(&34;the consumer [{}] start beginning. messageModel={}, isUnitMode={}&34;, this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig(); // 1. 比DefaultMQPushConsumerImpl要多检查,比如subscription、messageListener、线程池参数、拉取间隔、每批次拉取的最大消息数量等
                this.copySubscription(); // 2. 将订阅信息复制到rebalanceImpl,(如果是集群模式)设置重试消息主题和其订阅关系放入rebalanceImpl
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID(); // 3. 同DefaultMQPushConsumerImpl,设置实例名
                }
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 4. 同DefaultMQPushConsumerImpl,创建MQClientInstance
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 5. 同DefaultMQPushConsumerImpl,设置rebalanceImpl的consumerGroup、messageModel、allocateMessageQueueStrategy、mqClientFactory
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper( // 6. 同DefaultMQPushConsumerImpl,创建PullAPIWrapper,注册FilterMessageHook
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load(); // 7. 同DefaultMQPushConsumerImpl,(如果未设置offsetStore)根据messageModel创建OffsetStore,然后可能加载MessageQueue对应的offset
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start(); // 8. 根据MessageListener的类型创建ConsumeMessageService,给定时线程池提交任务,清除失效的消息
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); // 9. 同DefaultMQPushConsumerImpl,向MQClientInstance.consumerTable注册consumerGroup对应的MQConsumerInner为当前对象
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException(&34;The consumer group[&34; + this.defaultMQPushConsumer.getConsumerGroup()
                        + &34;] has been created before, specify another name please.&34; + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start(); // 10. 同DefaultMQPushConsumerImpl,
                log.info(&34;the consumer [{}] start OK.&34;, this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException(&34;The PushConsumer service state not OK, maybe started once, &34;
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 11. 从namesrv更新topic相关的路由信息
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 12. 给broker发送心跳
        this.mQClientFactory.rebalanceImmediately(); // 13. 唤醒RebalanceService,执行重平衡
    }

DefaultMQPushConsumer.subscribe跟PullTaskImpl并没有直接关系,消息在哪个地方拉取呢?

回到MQClientInstance.start阶段,PullMessageService会由于pullRequestQueue为空而阻塞,不能执行pullMessage。

那也就是说谁给pullRequestQueue添加了PullRequest,才会让PullMessageService代码往下走去拉取消息。其实这段代码就隐藏在rebalanceImmediately里。

先看调用栈:

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImplstart
	  org.apache.rocketmq.client.impl.factory.MQClientInstancerebalanceImmediately
		    org.apache.rocketmq.common.ServiceThreadwakeup // 注意此处是唤醒RebalanceService,在MQClientInstance.start时,已经启动RebalanceService线程,但如果没被唤醒,默认会等待20秒,才能执行MQClientInstancedoRebalance
			      org.apache.rocketmq.client.impl.factory.MQClientInstancedoRebalance
				        org.apache.rocketmq.client.impl.consumer.MQConsumerInnerdoRebalance // 此处自然是DefaultMQPushConsumerImpl了(pull的不在此处介绍),rebalanceImpl默认是RebalancePushImpl
					      org.apache.rocketmq.client.impl.consumer.RebalanceImpldoRebalance
				        org.apache.rocketmq.client.impl.consumer.RebalanceImplrebalanceByTopic
				            org.apache.rocketmq.client.impl.consumer.RebalanceImplupdateProcessQueueTableInRebalance // 获取MessageQueue后,如果没有对应的ProcessQueue则创建,然后封装成PullRequest
				                org.apache.rocketmq.client.impl.consumer.RebalancePushImpldispatchPullRequest
												    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImplexecutePullRequestImmediately
				                        org.apache.rocketmq.client.impl.consumer.PullMessageServiceexecutePullRequestImmediately // 便是此处往队列添加了PullRequest
				            org.apache.rocketmq.client.impl.consumer.RebalanceImpltruncateMessageQueueNotMyTopic // 根据订阅的topic和ProcessQueue里MessageQueue的topic,凡不是我们订阅的,将ProcessQueue置为dropped

最后看下如何拉取消息的:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.PullMessageService
    @Override
    public void run() {
        log.info(this.getServiceName() + &34; service started&34;);
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error(&34;Pull Message Service Run Method exception&34;, e);
            }
        }
        log.info(this.getServiceName() + &34; service end&34;);
    }
	
    private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); // 1. 根据consumerGroup找到DefaultMQPushConsumerImpl(start时注册了consumerGroup和DefaultMQPushConsumerImpl到consumerTable)
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest); // 2. 拉取消息
        } else {
            log.warn(&34;No matched consumer for the PullRequest {}, drop it&34;, pullRequest);
        }
    }	
 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) { // 1. 判断ProcessQueue
            log.info(&34;the pull request[{}] is dropped.&34;, pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 2. 设置ProcessQueue最近拉取时间(流控用到)
        try {
            this.makeSureStateOK(); // 3. 确保当前是RUNNING状态
        } catch (MQClientException e) {
            log.warn(&34;pullMessage exception, consumer state not ok&34;, e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            return;
        }
        if (this.isPause()) { // 4. 调用DefaultMQPushConsumer.suspend会导致停止拉取(可通过调用DefaultMQPushConsumer.resume恢复)
            log.warn(&34;consumer was paused, execute pull request later. instanceName={}, group={}&34;, this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
		
		//5. ProcessQueue里消息数量、消息大小或消息新老时间超出时,做流控(延时将PullRequest重新放到pullRequestQueue)
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    &34;the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}&34;,
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    &34;the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}&34;,
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        &34;the queue&39;s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}&34;,
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isPreviouslyLocked()) {
                    long offset = -1L;
                    try {
                        offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                    } catch (Exception e) {
                        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                        log.error(&34;Failed to compute pull offset, pullResult: {}&34;, pullRequest, e);
                        return;
                    }
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info(&34;the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}&34;,
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info(&34;[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}&34;,
                            pullRequest, offset);
                    }
                    pullRequest.setPreviouslyLocked(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                log.info(&34;pull message later because not locked in broker, {}&34;, pullRequest);
                return;
            }
        }
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) { // 6. topic相关的路由信息不存在,稍后再试
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.warn(&34;find the consumer&39;s subscription failed, {}&34;, pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() { // 7. 创建PullCallback,当拉取成功但消息为空时立即创建PullRequest,否则将消息放入ProcessQueue.msgTreeMap,并创建ConsumeRequest提交到线程池
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    &34;[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}&34;,
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn(&34;the pull request offset illegal, {} {}&34;,
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn(&34;fix the pull request offset, {}&34;, pullRequest);
                                    } catch (Throwable e) {
                                        log.error(&34;executeTaskLater Exception&34;, e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn(&34;execute the pull request exception&34;, e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            this.pullAPIWrapper.pullKernelImpl( // 8. 使用netty异步拉取消息,拉去前执行RPCHook.doBeforeRequest,操作成功调用PullCallback
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error(&34;pullKernelImpl exception&34;, e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }

然后是拉取消息完毕,导致PullCallback回调,提交了ConsumeRequest。以非顺序性消费为例:

 rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest
        public void run() {
            if (this.processQueue.isDropped()) {
                log.info(&34;the message queue not be able to consume, because it&39;s dropped. group={} {}&34;, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                return;
            }
            MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
            ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
            ConsumeConcurrentlyStatus status = null;
            defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); // 1. 从消息属性“RETRY_TOPIC”重置topic(仅当消息属性值为&34;%RETRY%{consumerGroup}&34;),再去除namespace
            ConsumeMessageContext consumeMessageContext = null;
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
                consumeMessageContext.setProps(new HashMap<String, String>());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); // 2. 执行ConsumeMessageHook.consumeMessageBefore,比如trace相关的ConsumeMessageTraceHookImpl
            }
            long beginTimestamp = System.currentTimeMillis();
            boolean hasException = false;
            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
            try {
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); // 3. 回调用户注册的MessageListener
            } catch (Throwable e) {
                log.warn(String.format(&34;consumeMessage exception: %s Group: %s Msgs: %s MQ: %s&34;,
                    RemotingHelper.exceptionSimpleDesc(e),
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue), e);
                hasException = true;
            }
            long consumeRT = System.currentTimeMillis() - beginTimestamp;
            if (null == status) {
                if (hasException) {
                    returnType = ConsumeReturnType.EXCEPTION;
                } else {
                    returnType = ConsumeReturnType.RETURNNULL;
                }
            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                returnType = ConsumeReturnType.TIME_OUT;
            } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                returnType = ConsumeReturnType.FAILED;
            } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                returnType = ConsumeReturnType.SUCCESS;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
            }
            if (null == status) {
                log.warn(&34;consumeMessage return null, Group: {} Msgs: {} MQ: {}&34;,
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext.setStatus(status.toString());
                consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); // 4. 执行ConsumeMessageHook.consumeMessageAfter,比如trace相关的ConsumeMessageTraceHookImpl
            }
            ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 5. 记录&34;{topic}@{consumerGroup}&34;的消费耗时
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); // 6. 记录&34;{topic}@{consumerGroup}&34;相关的成功/失败TPS,从ProcessQueue移除消息,最后调用OffsetStore更新MessageQueue的offset(仅更新内存,未持久化到本地文件或远程broker)
            } else {
                log.warn(&34;processQueue is dropped without process consume result. messageQueue={}, msgs={}&34;, messageQueue, msgs);
            }
        }

时序图如下:

文章评论