Skip to content

Commit

Permalink
[ISSUE #9080] Fix tranfer logic when get large messages from cache in…
Browse files Browse the repository at this point in the history
… tiered storage (#9079)
  • Loading branch information
yuz10 authored Dec 26, 2024
1 parent 1c35adb commit 2089abd
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
private final String brokerName;
private final MetadataStore metadataStore;
private final MessageStoreConfig storeConfig;
private final org.apache.rocketmq.store.config.MessageStoreConfig messageStoreConfig;
private final TieredMessageStore messageStore;
private final IndexService indexService;
private final FlatFileStore flatFileStore;
Expand All @@ -71,6 +72,7 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf
FlatFileStore flatFileStore, IndexService indexService) {

this.storeConfig = storeConfig;
this.messageStoreConfig = messageStore.getMessageStoreConfig();
this.brokerName = storeConfig.getBrokerName();
this.flatFileStore = flatFileStore;
this.messageStore = messageStore;
Expand Down Expand Up @@ -148,6 +150,9 @@ protected GetMessageResultExt getMessageFromCache(
if (result.getMessageCount() == maxCount) {
break;
}
if (result.getBufferTotalSize() >= messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
break;
}
}
result.setStatus(result.getMessageCount() > 0 ?
GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void dispatchFromCommitLogTest() throws Exception {
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig());

// mock message
ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();
Expand Down

0 comments on commit 2089abd

Please sign in to comment.