From 2089abd3be5c4f34d2a981caf32747a1e624f2ed Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 26 Dec 2024 14:49:00 +0800 Subject: [PATCH] [ISSUE #9080] Fix tranfer logic when get large messages from cache in tiered storage (#9079) --- .../rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 5 +++++ .../tieredstore/core/MessageStoreDispatcherImplTest.java | 1 + 2 files changed, 6 insertions(+) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 7f79dbcd984..e94185626a7 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher { private final String brokerName; private final MetadataStore metadataStore; private final MessageStoreConfig storeConfig; + private final org.apache.rocketmq.store.config.MessageStoreConfig messageStoreConfig; private final TieredMessageStore messageStore; private final IndexService indexService; private final FlatFileStore flatFileStore; @@ -71,6 +72,7 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf FlatFileStore flatFileStore, IndexService indexService) { this.storeConfig = storeConfig; + this.messageStoreConfig = messageStore.getMessageStoreConfig(); this.brokerName = storeConfig.getBrokerName(); this.flatFileStore = flatFileStore; this.messageStore = messageStore; @@ -148,6 +150,9 @@ protected GetMessageResultExt getMessageFromCache( if (result.getMessageCount() == maxCount) { break; } + if (result.getBufferTotalSize() >= messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { + break; + } } result.setStatus(result.getMessageCount() > 0 ? GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java index 6b960769489..7a43e1ede83 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java @@ -106,6 +106,7 @@ public void dispatchFromCommitLogTest() throws Exception { Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor); Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore); Mockito.when(messageStore.getIndexService()).thenReturn(indexService); + Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig()); // mock message ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();