这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

黔西ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:
在broker对message queue加锁,保证当前client占有该队列
consumer端对MessageQueue加锁,保证当前线程占有该队列
consumer端对ProcessQueue加锁,保证当前线程占有该队列
对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:
// ConsumeMessageOrderlySerivce
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
// 通过LOCK_BATCH_MQ请求在broker批量锁定mq
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}ConsumeMessageOrderlyService中的消费请求提交:
// ConsumeMessageOrderlySerivce public void submitConsumeRequest( final Listmsgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
顺序处理了逻辑:
// ConsumeMessageOrderlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 1.获取MessageQueue上的锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) { // 循环处理
// ...
// 单个ConsumeRequest最长处理时间默认60s
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 2. 从ProcessQueue顺序获取batchSize个消息
List msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
// ....
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 3. 获取ProcessQueue上的锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 4. 推给业务处理逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock(); // 解锁
}
// ...
// 5. 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false; // ProcessQueue为空,停止本次推送
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
} 在processConsumeResult中主要会执行2步操作:
在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除
更新OffsetStore
感谢你能够认真阅读完这篇文章,希望小编分享的“RocketMQ中如何实现push consumer顺序消费”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!