RocketMQ中有两类消费者——Push和Pull。Push指的是服务端主动推送消息给客户端,及时性好。RocketMQ的Push消费者类是DefaultMQPushConsumer。但在RocketMQ中,其实际是一种伪Push的方式,因为客户端底层实现仍然是使用Pull的方式将消息从服务器拉取到客户端,然后由消费线程进行消费。Push模式是对Pull模式的封装,类似于一个高级API。
本文将以MQ的标准Push Consumer——DefaultMQPushConsumer为例,进行源码分析。
1. 整体结构
1.1 DefaultMQPushConsumer核心参数
DefaultMQPushConsumer的整体框架如上图所示,其中DefaultMQPushConsumer是门面,它提供了可以操作的API接口以及可以修改的属性。具体逻辑都写在实现类DefaultMQPushConsumerImpl。DefaultMQPushConsumer的核心参数如下:
// 这个是消费者一个 final 的属性,用来记录 RocketMQ Consumer 在运作过程中的一些日志,其日志文件默认路径为 `${user.home}/logs/rocketmqlogs/rocketmq_cliente.log`。
InternalLogger log
// 消费组的名称,在 RocketMQ 中,对于消费中来说,一个消费组就是一个独立的隔离单位,例如多个消费组订阅同一个主题,其消息进度(消息处理的进展)是相互独立的,两者不会有任何的干扰。
String consumerGroup
// 消息组消息消费模式,在 RocketMQ 中支持集群模式、广播模式。集群模式值得是一个消费组内多个消费者共同消费一个 Topic 中的消息,即一条消息只会被集群内的某一个消费者处理;而广播模式是指一个消费组内的每一个消费者负责 Topic 中的所有消息。
MessageModel messageModel
// 一个消费者初次启动时(即消费进度管理器中无法查询到该消费组的进度)时从哪个位置开始消费的策略
ConsumeFromWhere consumeFromWhere
// 指定从什么时间戳开始消费,其格式为 yyyyMMddHHmmss,默认值为 30 分钟之前,该参数只在 consumeFromWhere 为 CONSUME_FROM_TIMESTAMP 时生效。
String consumeTimestamp
// 消息队列负载策略
AllocateMessageQueueStrategy allocateMessageQueueStrategy
// 一致性 Hash 算法。
AllocateMessageQueueConsistentHash
// 消息进度存储管理器,该属性为私有属性,不能通过 API 进行修改,
OffsetStore offsetStore
// 消费者每一个消费组线程池中最小的线程数量,默认为 20。在 RocketMQ 消费者中,会为每一个消费者创建一个独立的线程池。
int consumeThreadMin
// 消费者最大线程数量,在当前的 RocketMQ 版本中,该参数通常与 consumeThreadMin 保持一致,大于没有意义,因为 RocketMQ 创建的线程池内部创建的队列为一个无界队列。
int consumeThreadMax
// 并发消息消费时处理队列中最大偏移量与最小偏移量的差值的阔值,如差值超过该值,触发消费端限流。限流的具体做法是不再向 Broker 拉取该消息队列中的消息,默认值为 2000
int consumeConcurrentlyMaxSpan
// 消费端允许消费端端单队列积压的消息数量,如果处理队列中超过该值,会触发消息消费端的限流。默认值为 1000,不建议修改该值
int pullThresholdForQueue
// 消费端允许消费端但队列中挤压的消息体大小,默认为 100MB
pullThresholdSizeForQueue
// 按 Topic 级别进行消息数量限流,默认不开启,为 -1,如果设置该值,会使用该值除以分配给当前消费者的队列数,得到每个消息消费队列的消息阔值,从而改变 pullThresholdForQueue
pullThresholdForTopic
// 按 Topic 级别进行消息消息体大小进行限流,默认不开启,其最终通过改变 pullThresholdSizeForQueue 达到限流效果
pullThresholdSizeForTopic
// 消息拉取的间隔,默认 0 表示,消息客户端在拉取一批消息提交到线程池后立即向服务端拉取下一批,PUSH 模式不建议修改该值。
long pullInterval = 0
// 一次消息拉取请求最多从 Broker 返回的消息条数,默认为 32。
int pullBatchSize = 32
// 消息消费一次最大消费的消息条数,这个值得是下图中参数 `ist<MessageExt> msgs` 中消息的最大条数。
int consumeMessageBatchMaxSize
// 消息消费重试次数,并发消费模式下默认重试 16 次后进入到死信队列,如果是顺序消费,重试次数为 Integer.MAX_VALUE。
int maxReconsumeTimes
// 消费模式为顺序消费时设置每一次重试的间隔时间,提高重试成功率。
long suspendCurrentQueueTimeMillis
// 消息消费超时时间,默认为 15 分钟。
long consumeTimeout = 15
1.2 MQClientInstance
DefaultMQPushConsumerImpl中持有MQClientInstance,该类实现了客户端一些通用逻辑,不管是producer还是consumer都内置有这个类,并且一个JVM中所有的消费者、生产者持有同一个MQClientInstance,且MQClientInstance只启动一次。其中有两个Map,producerTable和consumerTable分别用于记录【生产者group:生产者MQProducerInner】和【消费者group:消费者MQConsumerInner】。
1.3 RebalanceImpl
RebalanceImpl会给当前消费者分配对应的MessageQueue。
1.4 OffsetStore
OffsetStore是用来管理Consumer消费位点的接口,该参数主要是根据消费模式在内部自动创建,RocketMQ 在广播消息、集群消费两种模式下分别对应两种OffsetStore——LocalFileOffsetStore和RemoteBrokerOffsetStore,其消息消费进度的存储策略会有所不同:
- 集群模式:RocketMQ 会将消息消费进度存储在 Broker 服务器,存储路径为
${ROCKET_HOME}/store/config/ consumerOffset.json
文件中。 - 广播模式:RocketMQ 会将消息消费进存在在消费端所在的机器上,存储路径为
${user.home}/.rocketmq_offsets
中。
consumerOffset.json文件如下所示,使用 topic@consumerGroup 为键,其值是一个 Map,键为 Topic 的队列序列,值为当前的消息消费位点。
举例,订阅dw_test这个topic的order_topic_activity_consumer消费组的消费情况如下:该topic有四个队列,其中0号队列消费到了2,1号队列消费到了0,2号队列消费到了0,3号队列消费到了3。
可见,topic和消费组唯一的确定一个消费进度。
1.5 ConsumeMessageService
ConsumeMessageService维护有多个线程池,与具体的消息消费有关。
1.6 ServiceThread
PullMessageService和RebalanceService都继承了ServiceThread这个抽象类,而ServiceThread又实现了Runnable方法,因此这两个Service可以理解为都是后台线程方法。
2. 源码分析
2.1 使用样例
public static void main(String[] args) {
// 设置消费组
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
// 设置名字发现服务地址
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
}
LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}
2.2 订阅Topic
topic订阅主要通过subscribe实现,首先看下DefaultMQPushConsumer的subscribe实现
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
可见其内部是调用了DefaultMQPushConsumerImpl的subscribe方法。
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
// 发送心跳信息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
可见其首先生成SubscriptionData,这是一个订阅信息类。然后将该类放入到RebalanceImpl类中的Map——subscrpitionInner。该Map的key是topic name,value是SubscriptionData对象。
然后向集群内所有的Broker发送心跳信息,继续跟踪sendHeartbeatToAllBrokerWithLock方法可见其发送的心跳信息HeartbeatData为:
// 客户端ID
private String clientID;
// 生产者信息
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// 消费者信息, 包含订阅信息
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
其中ConsumerData即为消费者的信息
private String groupName;
private ConsumeType consumeType;
private MessageModel messageModel;
private ConsumeFromWhere consumeFromWhere;
// 订阅信息
private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
private boolean unitMode;
在MQClientInstance类中,HeatbeatData信息的生成方法prepareHeartbeatData
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID
heartbeatData.setClientID(this.clientId);
// Consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
可见其是通过consumerTable和producerTable分别映射到MQConsumerInner以及MQProducerInner,然后拿到相关的配置,生成heartbeatData。
我们继续看看 broker 如何处理 HeartbeatData 数据,客户端发送 HeartbeatData 时的请求类型为 HEART_BEAT,我们直接找到 broker 处理 HEART_BEAT 请求类型的逻辑:
org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 解码获得心跳信息
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion());
// 循环注册消费者信息
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// 按消费组得到订阅信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// 注册消费者订阅信息
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
}
// 注册生产者订阅信息
...
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 得到消费组内的消费者信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 如果消费组的消费者信息为空,则新建一个
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);
// 更新订阅信息,订阅信息是按照消费组存放的,因此这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group,
consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
这步骤是 Broker 更新消费者订阅信息的核心方法,如果消费组的消费者信息 ConsumerGroupInfo 为空,则新建一个,从名字可知道,订阅信息是按照消费组进行存放的,因此在更新订阅信息时,订阅信息是按照消费组存放的,这步骤就会导致同一个消费组内的各个消费者客户端的订阅信息相互被覆盖。
2.3 启动消费客户端
在DefaultMQPushConsumer初始化完成后,通过start()方法启动客户端,追到DefaultMQPushConsumerImpl.start()方法
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
// 初始化MQClientInstance,如果已经存在直接使用存在的MQClientInstance
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 为消费者负载均衡rebalanceImpl设置属性
// 1. 设置消费组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 2. 设置消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 3. 设置rebalance策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(
this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 4. 设置当前MQClientInstance实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 不同消息模式OffsetStore不一样
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播模式,使用本地存储方式
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
// 集群模式,使用远端Broker存储方式存储offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载当前的offset
this.offsetStore.load();
// 根据Listener的不同选择不同的ConsumeMessageService,分为顺序消息消费服务和并行消息消费服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动ConsumeMessageService
this.consumeMessageService.start();
// 向MQClientInstance中注册消费者,并启动MQClientInstance
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),
this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(
FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
...
}
}
整个DefaultMQPushConsumer的启动以及消费流程课件下图所示,
- 经过队列负载机制后,会分配给当前消费者一些队列,注意一个消费组可以订阅多个主题,正如上面 pullRequestQueue 中所示,topic_test、topic_test1 这两个主题都分配了一个队列。
- 轮流从 pullRequestQueue 中取出一个 PullRequest 对象,根据该对象中的拉取偏移量向 Broker 发起拉取请求,默认拉取 32 条,可通过上文中提到的DefaultMQPushConsumer中的pullBatchSize 参数进行改变,该方法不仅会返回消息列表,还会返更改 PullRequest 对象中的下一次拉取的偏移量。
- 接收到 Broker 返回的消息后,会首先放入 ProccessQueue,该对象内部持有一个 TreeMap,key 存放的是消息在消息消费队列(consume queue)中的偏移量(offset),而 value 为具体的消息对象(MessageExt)。
- 然后将拉取到的消息提交到消费组内部的线程池,并立即返回,并将 PullRequest 对象放入到 pullRequestQueue 中,然后取出下一个 PullRequest 对象继续重复消息拉取的流程,从这里可以看出,消息拉取与消息消费是不同的线程。
- 消息消费组线程池处理完一条消息后,会将消息从 ProccessQueue 中,然后会向 Broker 汇报消息消费进度,以便下次重启时能从上一次消费的位置开始消费。
在了解完DefaultMQPushConsumer整体的消费流程后,下面分析各个类的源码。
2.4 MQClientInstance类
在初始化MQClientInstance对象的时候,在其构造方法中会构建PullMessageService类和RebalanceService类。
PullMessageService类和RebalanceService类都可以视为后台线程类,在MQClientInstance对象的start方法中也都会分别新建线程去运行这两个类的run方法。RebalanceService暂且不表,下面先介绍下跟消费者主流程有关的PullMessageService。
2.5 PullMessageService
PullMessageService的run方法如下:
public void run() {
while (!this.isStopped()) {
try {
// pullRequest在RebalanceImpl中创建
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
}
可见在该方法中会不断从pullRequestQueue中拿PullRequest,如果拿不到会阻塞;如果拿到,则调用pullMessage方法。
private void pullMessage(final PullRequest pullRequest) {
// 从MQClientInstance中根据消费组得到该消费组的ConsumerImpl
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 执行ConsumerImpl的pullMessage方法
impl.pullMessage(pullRequest);
}
}
其中PullRequest有如下所示属性:
// 消费组
private String consumerGroup;
// 消费队列相关信息
private MessageQueue messageQueue;
// 消费队列的本地缓存,其中包含msgTreeMap:TreeMap<Long,MessageExt>, key为offset
private ProcessQueue processQueue;
private long nextOffset;
private boolean previouslyLocked = false;
接着我们看DefaultMQPushConsumerImpl中的pullMessage方法,该方法从Broker中拉取一定数量的消息到本地的ProcessQueue中,我将该方法分为三部分(超长代码预警)
第一部分是限流逻辑,即代码中的QueueFlowControl,在触发限流后会将pullRequest延缓一定时间,然后放回到PullMessageService的pullRequestQueue中
final ProcessQueue processQueue = pullRequest.getProcessQueue();
...
// 设置当前拉取消息的时间
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
...
// 以下代码都触发了限流逻辑, 我们称之为Queue flow control
// 1. 如果暂停了,则将pullRequest延迟一定时间后再次放回到PullMessageService的pullRequestQueue中
if (this.isPause()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
// 缓存的消息数
long cachedMessageCount = processQueue.getMsgCount().get();
// 缓存的容量
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 2. 如果缓存的消息数已经大于threshold,
// 则将pullRequest延迟一定时间后再次返回到PullMessageService的pullRequestQueue中
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
...
return;
}
// 3. 如果缓存的消息容量大于threshold,
// 则将pullRequest延迟一定时间后再次返回到PullMessageService的pullRequestQueue中
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
...
return;
}
if (!this.consumeOrderly) {
// processQueue.getMaxSpan()指的是缓存的消息最大偏移量和最小偏移量之间的差值
// defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()为这个差值的最大阈值
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
...
return;
}
} else {
// 顺序消费逻辑暂时不看
...
}
接下来是与Broker通讯之前各种准备材料的构造,其中PullCallback方法是在与Broker通讯后需要执行的回调方法。
// 拿到Topic对应的订阅关系
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner()
.get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
// 设置开始拉取的时间
final long beginTimestamp = System.currentTimeMillis();
// 处理拉取到消息后的回调逻辑
PullCallback pullCallback = new PullCallback() {
// 请求正常完成
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 拉取结果
switch (pullResult.getPullStatus()) {
// 拉取到了新消息
case FOUND:
// 从pullRequest中拿到当前拉取消息时候的offset
long prevRequestOffset = pullRequest.getNextOffset();
// 从pullResult中拿到下一次开始拉取消息的offset,并更新到pullRequest中
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager()
.incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// 如果pullResult中拉取到的消息为null,则立即再拉取一次(容错措施,不应该出现)
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager()
.incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(),
pullResult.getMsgFoundList().size());
// 将拉取到的消息放入到PullRequest的processQueue中的treeMap中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// * 提交消费请求
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 如果DefaultMQPushConsumer设置的PullInterval为0,则立即再从Broker中拉一次消息
// 否则,则在制定时间后再次拉消息
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
// 没有新消息 或 过滤结果后没有匹配的消息
case NO_NEW_MSG:
case NO_MATCHED_MSG:
...
break;
// 非法的offset
case OFFSET_ILLEGAL:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新本地的offset
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(
pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化offset,在本地或者到Broker
DefaultMQPushConsumerImpl.this.offsetStore.persist(
pullRequest.getMessageQueue());
// 将消息队列的缓存移除
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(
pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
// 请求出现异常
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 集群模式下从本地读取消息的offset
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(),
ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
最后调用API,从Broker拉取消息
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
// 下一个offset
pullRequest.getNextOffset(),
// 拉取的消息数
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
// 已提交的offset
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
// 拉取消息后,回调逻辑
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
2.6 ConsumeMessageService
在客户端启动过程中,会根据注册的Listener的类型,选择初始化ConsumeMessageConcurrentlyService或者ConsumeMessageOrderlyService,然后执行其start方法。
以ConsumeMessageConcurrentlyService为例,其在start方法只是定期清理过期消息,不是很重要。核心逻辑在其构造方法中,可见初始化了三个线程池。
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
// 初始化消费请求队列为LinkedBlockingQueue无界队列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
// 线程名
String consumeThreadPrefix = null;
if (consumerGroup.length() > 100) {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0,100).append("_").toString();
} else {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_")
.toString();
}
// 初始化线程池,指向消费调度线程池
// 最小线程数为consumeThreadMin
// 最大线程数为consumeThreadMax
// 队列为this.consumeRequestQueue, 即一个无界阻塞队列,因此线程数取决于consumeThreadMin
// 线程名为上段设置的consumeThreadPrefix
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
// 初始化消费定时任务线程池,线程数=1
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
// 初始化清除过期消息线程池,线程数=1
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
在上文介绍的pullMessage方法中,当客户端从Broker拉取消息到本地后,会调用ConsumeMessageService#submitConsumeRequest方法去提交消费请求,接下来我们会看下这个方法
public void submitConsumeRequest(
// PullResult中从Broker拉取到的消息
final List<MessageExt> msgs,
// 消息队列副本
final ProcessQueue processQueue,
// 消息队列
final MessageQueue messageQueue,
// 是否转发到消费线程池 并发消费时忽略该参数
final boolean dispatchToConsume) {
// 得到批量消费的数量,由ConsumeMessageBatchMaxSize决定
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
// 如果拉取的消息小于等于consumeBatchSize, 将消费请求提交到消费线程池consumeExecutor中执行
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 否则,分页处理,每页大小为 consumeBatchSize,分批提交到consumeExecutor中执行
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
通过上段代码可以看到,把ConsumeRequest对象放到线程池consumeExecutor中去执行,那消费逻辑就要看下ConsumeRequest#run方法,其中ConsumeRequest是ConsumeMessageConcurrentlyService的保留类,继承了Runnable接口,其run方法如下所示
public void run() {
// 当发生消息rebalance的时候,会设置dropped为true,防止消费者消费不属于自己的消息队列
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}",
ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
// 消费前钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(
consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
// 重点!
// 判断msg是否为空,如果不为空,则迭代msgs,设置消费开始时间戳
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 让Listener消费消息,返回消费结果
// 通过Collections.unmodifiableList将msgs包装成不可修改的视图
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
// 如果报错,这里置为true
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
// 消费超时
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
// 业务侧返回CONSUME_LATER, 消费失败
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
// 业务侧返回CONSUME_SUCCESS, 消费成功
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
// 钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
// 如果客户端返回的status为null, 则赋值为RECONSUME_LATER
if (null == status) {
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消费后钩子
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(
consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
consumeRT);
if (!processQueue.isDropped()) {
// 处理消息消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}",
messageQueue, msgs);
}
}
之所以当processQueue的dropped状态为true时不做任何处理,是因为当processQueue.dropped==true时,说明此时可能出现了新消费者的加入/原消费者down机等情况,导致原先消费者的队列在rebalance之后分配给了新的消费者。那么,这部分消息会被重新消费,因此此处就不需要做多余的处理,等待重新消费就可以了。
接下来看ConsumeMessageConcurrentlyService#processConsumeResult方法
public void processConsumeResult(
// 并行消费结果,RECONSUME_LATER或CONSUME_SUCCESS
final ConsumeConcurrentlyStatus status,
// 并行消费的上下文,其中有MessageQueue, ackIndex和delayLevelWhenNextConsume
// ackIndex是consumeRequest.getMsgs()这个list中消费成功的idx, 初始化为Integer.MAX_VALUE
// delayLevleWhenNextConsume用于表示该消息已经被重试了多少次
final ConsumeConcurrentlyContext context,
// 消费请求
final ConsumeRequest consumeRequest) {
// consumeRequest.getMsgs()中消费成功的idx
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) {
return;
}
// 判断消费结果,如果是CONSUMESUCCESS则设置ackIndex=msgs.size()-1;
// 如果是RECONSUMELATER则设置ackIndex=-1。为发送消息确认ACK做准备。
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager()
.incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager()
.incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
// 消费失败,重置ack
ackIndex = -1;
this.getConsumerStatsManager()
.incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 广播模式下,失败的消息不做任何处理
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
// 集群模式下,失败的消息重发
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 将 ackIndex+1 -> 队尾 的消息重新消费
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 会根据context中的delayLevleWhenNextConsume决定重试间隔
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 从ProcessQueue中将成功消费的消息移出,并同步offset进度
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}