consumer consume是什么意思
RocketMQ客户端既支持MQPullConsumer,也支持MQPushConsumer。
DefaultMQPushConsumer是官方推荐使用的消费端,它不像MQPullConsumer需要用户主动调用pull方法来处理消息,而是要求用户订阅topic,并注册MessageListener,以一种貌似被动处理推送的消息形式呈现给用户。
先看下简单的DefaultLitePullConsumer:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
this.namespace = namespace; // 1. 某些构造函数不含namespace,则与producer相同,使用时从namesrvaddr推测
this.consumerGroup = consumerGroup; // 2. 某些构造函数不含consumerGroup,则使用默认的“DEFAULT_CONSUMER”
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
}
@Override
public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); // 3. 订阅topic,给defaultLitePullConsumer注册MessageQueueListenerImpl,当MessageQueue变更时更新assignedMessageQueue(旧的ProcessQueue设为dropped,并移除,然后添加新的MessageQueue),最后启动PullTask
}
@Override
public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
}
@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); // 3. 启动后根据topic获取MessageQueue,再分配MessageQueue
}
@Override
public void start() throws MQClientException {
setTraceDispatcher(); // 4. 如果启用消息跟踪(默认不启用,可以在构造后设置enableMsgTrace),则创建AsyncTraceDispatcher,向defaultLitePullConsumerImpl注册ConsumeMessageTraceHookImpl这一ConsumeMessageHook
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // 5. 与Producer类似,将consumerGroup带上可能的namespace
this.defaultLitePullConsumerImpl.start(); // 6. 启动defaultLitePullConsumerImpl
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 7. 启动traceDispatcher:创建并启动线程AsyncRunnable,负责向线程池提交发送trace消息任务(消息内容包含msgId等信息)
} catch (MQClientException e) {
log.warn(&34;trace dispatcher start failed &34;, e);
}
}
}
跟进DefaultLitePullConsumerImpl:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); // 1. 检查consumerGroup(不能为null,字符长度小于255,字符只能是“%|a-zA-Z0-9_-”,不能是“DEFAULT_CONSUMER”),messageModel不能为null(默认为集群模式),allocateMessageQueueStrategy不能为null(默认为AllocateMessageQueueAveragely),consumerTimeoutMillisWhenSuspend不能小于broker的设置
if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultLitePullConsumer.changeInstanceNameToPID(); // 2. (如果为设置系统属性“rocketmq.client.name”)重置实例名称
}
initMQClientFactory(); // 3. 创建MQClientInstance,然后往MQClientInstance.consumerTable注册consumerGroup对应的MQConsumerInner为当前对象
initRebalanceImpl(); // 4. 设置rebalanceImpl的consumerGroup、messageModel、allocateMessageQueueStrategy、mqClientFactory
initPullAPIWrapper(); // 5. 创建PullAPIWrapper,并给PullAPIWrapper注册FilterMessageHook
initOffsetStore(); // 6. (如果未设置offsetStore)根据messageModel创建OffsetStore(集群模式是LocalFileOffsetStore,广播模式是RemoteBrokerOffsetStore),然后加载当前的offset(LocalFileOffsetStore默认加载&34;${user.home}/.rocketmq_offsets&34;,RemoteBrokerOffsetStore加载为空方法)
mQClientFactory.start(); // 7. 参考Producer的启动:如果未设置namesrvAddr,则请求URL获取;启动netty;启动定时任务(定期获取namesrvAddr,定期从namesrvAddr更新topicRouteInfo,定期清理下线broker,定期给broker发送心跳包,定期持久化consumer的offset,定期调整消费的线程池核心线程数);创建并启动PullMessageService线程来拉取消息;创建并启动RebalanceService线程来根据topic里消费组下消费者数量和分配策略选择MessageQueue
startScheduleTask(); // 8. 线程池中提交任务,遍历topicMessageQueueChangeListenerMap,根据topic分组得到MessageQueue,如果从namesrv获取的MessageQueue发生变化,则调用TopicMessageQueueChangeListener.onChanged
this.serviceState = ServiceState.RUNNING;
log.info(&34;the consumer [{}] start OK&34;, this.defaultLitePullConsumer.getConsumerGroup());
operateAfterRunning(); // 9. 根据subscriptionType(调用subsrcibe时会设置为“SUBSCRIBE”,从namesrv获取TopicRouteData,更新topic对应的topicRouteTable、brokerAddrTable;调用assign会被设置为“ASSIGN”,根据分配的的MessageQueue启动PullTaskImpl);遍历topicMessageQueueChangeListenerMap,根据topic获取最新的MessageQuewue集合,更新messageQueuesForTopic
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(&34;The PullConsumer service state not OK, maybe started once, &34;
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
}
DefaultLitePullConsumer.poll方法只是从consumeRequestCache这个队列取ConsumeRequest得到Message。
要么是因为调用DefaultLitePullConsumer.subscribe而启动了PullTaskImpl,要么是因为调用DefaultLitePullConsumer.assign而启动PullTaskImpl。
那我们看下PullTaskImpl做了些什么:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.PullTaskImpl
public void run() {
if (!this.isCancelled()) { // 1. 通过DefaultLitePullConsumer.unsubscribe可以取消拉取任务
if (assignedMessageQueue.isPaused(messageQueue)) { // 2. 通过DefaultLitePullConsumer.pause方法可以暂停MessageQueue
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug(&34;Message Queue: {} has been paused!&34;, messageQueue);
return;
}
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
if (null == processQueue || processQueue.isDropped()) {
log.info(&34;The message queue not be able to poll, because it&39;s dropped. group={}, messageQueue={}&34;, defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}
// 3. 未处理消息较多(数量、消息大小或未消费消息新老时间超出),流控,退出拉取
if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn(&34;The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}&34;, consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
return;
}
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}&34;,
defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}&34;,
defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, queueFlowControlTimes);
}
return;
}
if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;The queue&39;s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}&34;,
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), queueMaxSpanFlowControlTimes);
}
return;
}
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
} catch (Exception e) {
log.error(&34;Failed to get next pull offset&34;, e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
}
if (this.isCancelled() || processQueue.isDropped()) { // 4. 再次判断
return;
}
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize()); // 5. 使用pullAPIWrapper同步拉取消息,拉取前执行RPCHook.doBeforeRequest,拉取后执行RPCHook.doAfterResponse,最后根据tag过滤,然后执行FilterMessageHook.filterMessage
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList()); // 6. 将消息的queueOffset为key映射保存到msgTreeMap
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); // 7. 将consumeRequestCache放入consumeRequestCache
}
}
break;
case OFFSET_ILLEGAL:
log.warn(&34;The pull request offset illegal, {}&34;, pullResult.toString());
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); // 8. (如果MessageQueue的seekOffset未设置)设置MessageQueue相关的pullOffset
} catch (Throwable e) {
pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error(&34;An error occurred in pull message process.&34;, e);
}
if (!this.isCancelled()) {
scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
} else {
log.warn(&34;The Pull Task is cancelled after doPullTask, {}&34;, messageQueue);
}
}
}
现在回到主要的&34;推模式&34;,对于DefaultMQPushConsumer大致与DefaultLitePullConsumer相同:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup; // 1. 某些构造函数不含consumerGroup,则使用默认的“DEFAULT_CONSUMER”
this.namespace = namespace; // 2. 某些构造函数不含namespace,则与producer相同,使用时从namesrvaddr推测
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; // 3. 某些构造函数不含allocateMessageQueueStrategy,使用默认的AllocateMessageQueueAveragely
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) { // 4. 某些构造函数不含enableMsgTrace和customizedTraceTopic
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( // 5. 创建AsyncTraceDispatcher,并给defaultMQPushConsumerImpl注册ConsumeMessageHook
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error(&34;system mqtrace hook init failed ,maybe can&39;t send msg trace data&34;);
}
}
}
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression); // 6. 保存订阅关系,给broker发送心跳
}
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); // 7. 设置MessageListener
}
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); //8 .重置consumerGroup带上可能的namespace
this.defaultMQPushConsumerImpl.start(); // 9. 启动defaultMQPushConsumerImpl
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); // 10. (如果开启消息跟踪)启动traceDispatcher,线程池中执行发送消息任务(消息内容为消费消息后通过ConsumeMessageHook留下的msgId等内容)
} catch (MQClientException e) {
log.warn(&34;trace dispatcher start failed &34;, e);
}
}
}
而DefaultMQPushConsumerImpl内容也与基本相同:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info(&34;the consumer [{}] start beginning. messageModel={}, isUnitMode={}&34;, this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig(); // 1. 比DefaultMQPushConsumerImpl要多检查,比如subscription、messageListener、线程池参数、拉取间隔、每批次拉取的最大消息数量等
this.copySubscription(); // 2. 将订阅信息复制到rebalanceImpl,(如果是集群模式)设置重试消息主题和其订阅关系放入rebalanceImpl
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID(); // 3. 同DefaultMQPushConsumerImpl,设置实例名
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 4. 同DefaultMQPushConsumerImpl,创建MQClientInstance
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 5. 同DefaultMQPushConsumerImpl,设置rebalanceImpl的consumerGroup、messageModel、allocateMessageQueueStrategy、mqClientFactory
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper( // 6. 同DefaultMQPushConsumerImpl,创建PullAPIWrapper,注册FilterMessageHook
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load(); // 7. 同DefaultMQPushConsumerImpl,(如果未设置offsetStore)根据messageModel创建OffsetStore,然后可能加载MessageQueue对应的offset
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start(); // 8. 根据MessageListener的类型创建ConsumeMessageService,给定时线程池提交任务,清除失效的消息
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); // 9. 同DefaultMQPushConsumerImpl,向MQClientInstance.consumerTable注册consumerGroup对应的MQConsumerInner为当前对象
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException(&34;The consumer group[&34; + this.defaultMQPushConsumer.getConsumerGroup()
+ &34;] has been created before, specify another name please.&34; + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start(); // 10. 同DefaultMQPushConsumerImpl,
log.info(&34;the consumer [{}] start OK.&34;, this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(&34;The PushConsumer service state not OK, maybe started once, &34;
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 11. 从namesrv更新topic相关的路由信息
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 12. 给broker发送心跳
this.mQClientFactory.rebalanceImmediately(); // 13. 唤醒RebalanceService,执行重平衡
}
DefaultMQPushConsumer.subscribe跟PullTaskImpl并没有直接关系,消息在哪个地方拉取呢?
回到MQClientInstance.start阶段,PullMessageService会由于pullRequestQueue为空而阻塞,不能执行pullMessage。
那也就是说谁给pullRequestQueue添加了PullRequest,才会让PullMessageService代码往下走去拉取消息。其实这段代码就隐藏在rebalanceImmediately里。
先看调用栈:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImplstart
org.apache.rocketmq.client.impl.factory.MQClientInstancerebalanceImmediately
org.apache.rocketmq.common.ServiceThreadwakeup // 注意此处是唤醒RebalanceService,在MQClientInstance.start时,已经启动RebalanceService线程,但如果没被唤醒,默认会等待20秒,才能执行MQClientInstancedoRebalance
org.apache.rocketmq.client.impl.factory.MQClientInstancedoRebalance
org.apache.rocketmq.client.impl.consumer.MQConsumerInnerdoRebalance // 此处自然是DefaultMQPushConsumerImpl了(pull的不在此处介绍),rebalanceImpl默认是RebalancePushImpl
org.apache.rocketmq.client.impl.consumer.RebalanceImpldoRebalance
org.apache.rocketmq.client.impl.consumer.RebalanceImplrebalanceByTopic
org.apache.rocketmq.client.impl.consumer.RebalanceImplupdateProcessQueueTableInRebalance // 获取MessageQueue后,如果没有对应的ProcessQueue则创建,然后封装成PullRequest
org.apache.rocketmq.client.impl.consumer.RebalancePushImpldispatchPullRequest
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImplexecutePullRequestImmediately
org.apache.rocketmq.client.impl.consumer.PullMessageServiceexecutePullRequestImmediately // 便是此处往队列添加了PullRequest
org.apache.rocketmq.client.impl.consumer.RebalanceImpltruncateMessageQueueNotMyTopic // 根据订阅的topic和ProcessQueue里MessageQueue的topic,凡不是我们订阅的,将ProcessQueue置为dropped
最后看下如何拉取消息的:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.PullMessageService
@Override
public void run() {
log.info(this.getServiceName() + &34; service started&34;);
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error(&34;Pull Message Service Run Method exception&34;, e);
}
}
log.info(this.getServiceName() + &34; service end&34;);
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); // 1. 根据consumerGroup找到DefaultMQPushConsumerImpl(start时注册了consumerGroup和DefaultMQPushConsumerImpl到consumerTable)
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest); // 2. 拉取消息
} else {
log.warn(&34;No matched consumer for the PullRequest {}, drop it&34;, pullRequest);
}
}
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) { // 1. 判断ProcessQueue
log.info(&34;the pull request[{}] is dropped.&34;, pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); // 2. 设置ProcessQueue最近拉取时间(流控用到)
try {
this.makeSureStateOK(); // 3. 确保当前是RUNNING状态
} catch (MQClientException e) {
log.warn(&34;pullMessage exception, consumer state not ok&34;, e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
if (this.isPause()) { // 4. 调用DefaultMQPushConsumer.suspend会导致停止拉取(可通过调用DefaultMQPushConsumer.resume恢复)
log.warn(&34;consumer was paused, execute pull request later. instanceName={}, group={}&34;, this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//5. ProcessQueue里消息数量、消息大小或消息新老时间超出时,做流控(延时将PullRequest重新放到pullRequestQueue)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}&34;,
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}&34;,
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
&34;the queue&39;s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}&34;,
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error(&34;Failed to compute pull offset, pullResult: {}&34;, pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info(&34;the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}&34;,
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info(&34;[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}&34;,
pullRequest, offset);
}
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info(&34;pull message later because not locked in broker, {}&34;, pullRequest);
return;
}
}
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) { // 6. topic相关的路由信息不存在,稍后再试
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn(&34;find the consumer&39;s subscription failed, {}&34;, pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() { // 7. 创建PullCallback,当拉取成功但消息为空时立即创建PullRequest,否则将消息放入ProcessQueue.msgTreeMap,并创建ConsumeRequest提交到线程池
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
&34;[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}&34;,
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn(&34;the pull request offset illegal, {} {}&34;,
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn(&34;fix the pull request offset, {}&34;, pullRequest);
} catch (Throwable e) {
log.error(&34;executeTaskLater Exception&34;, e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(&34;execute the pull request exception&34;, e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl( // 8. 使用netty异步拉取消息,拉去前执行RPCHook.doBeforeRequest,操作成功调用PullCallback
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error(&34;pullKernelImpl exception&34;, e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
然后是拉取消息完毕,导致PullCallback回调,提交了ConsumeRequest。以非顺序性消费为例:
rocketmq-client-4.9.1.jar!/org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.info(&34;the message queue not be able to consume, because it&39;s dropped. group={} {}&34;, ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); // 1. 从消息属性“RETRY_TOPIC”重置topic(仅当消息属性值为&34;%RETRY%{consumerGroup}&34;),再去除namespace
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); // 2. 执行ConsumeMessageHook.consumeMessageBefore,比如trace相关的ConsumeMessageTraceHookImpl
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); // 3. 回调用户注册的MessageListener
} catch (Throwable e) {
log.warn(String.format(&34;consumeMessage exception: %s Group: %s Msgs: %s MQ: %s&34;,
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn(&34;consumeMessage return null, Group: {} Msgs: {} MQ: {}&34;,
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); // 4. 执行ConsumeMessageHook.consumeMessageAfter,比如trace相关的ConsumeMessageTraceHookImpl
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 5. 记录&34;{topic}@{consumerGroup}&34;的消费耗时
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); // 6. 记录&34;{topic}@{consumerGroup}&34;相关的成功/失败TPS,从ProcessQueue移除消息,最后调用OffsetStore更新MessageQueue的offset(仅更新内存,未持久化到本地文件或远程broker)
} else {
log.warn(&34;processQueue is dropped without process consume result. messageQueue={}, msgs={}&34;, messageQueue, msgs);
}
}
时序图如下: