From cbbcec44bf9596448f57ea2552bddb8e6cb666f4 Mon Sep 17 00:00:00 2001 From: Ivan Galushko Date: Thu, 31 Oct 2024 11:43:07 +0300 Subject: [PATCH] [FIX]: session processor --- .../session/DefaultTelegramUpdateProcessor.java | 12 +++++++++--- .../telegram/session/SessionAutoConfiguration.java | 6 ++++-- .../session/DefaultTelegramUpdateProcessorTest.java | 6 +++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java b/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java index 86f7670..cd0a5cb 100644 --- a/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java +++ b/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java @@ -4,6 +4,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; import io.github.drednote.telegram.core.TelegramBot; +import io.github.drednote.telegram.core.TelegramMessageSource; import io.github.drednote.telegram.core.request.ParsedUpdateRequest; import io.github.drednote.telegram.core.request.UpdateRequest; import io.github.drednote.telegram.filter.FilterProperties; @@ -49,6 +50,8 @@ public class DefaultTelegramUpdateProcessor extends AbstractTelegramUpdateProces private final UserRateLimitRequestFilter userRateLimitRequestFilter; private final TelegramClient telegramClient; private final ReadWriteLock limitLock = new ReentrantReadWriteLock(); + @Nullable + private final TelegramMessageSource messageSource; /** * Constructs a {@code DefaultTelegramUpdateProcessor} with specified properties and Telegram @@ -64,13 +67,15 @@ public class DefaultTelegramUpdateProcessor extends AbstractTelegramUpdateProces */ public DefaultTelegramUpdateProcessor( SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, - TelegramClient telegramClient, ThreadFactory threadFactory + TelegramClient telegramClient, ThreadFactory threadFactory, + @Nullable TelegramMessageSource messageSource ) { super(properties, threadFactory); Assert.required(telegramBot, "TelegramBot"); Assert.required(filterProperties, "FilterProperties"); Assert.required(telegramClient, "TelegramClient"); + this.messageSource = messageSource; this.telegramClient = telegramClient; this.telegramBot = telegramBot; this.maxThreadsPerUser = properties.getMaxThreadsPerUser(); @@ -107,10 +112,10 @@ public DefaultTelegramUpdateProcessor( */ public DefaultTelegramUpdateProcessor( SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, - TelegramClient telegramClient + TelegramClient telegramClient, @Nullable TelegramMessageSource messageSource ) { this(properties, filterProperties, telegramBot, telegramClient, - Executors.defaultThreadFactory()); + Executors.defaultThreadFactory(), messageSource); } /** @@ -216,6 +221,7 @@ private void sendRateLimitResponse() { } updateRequests.iterator().forEachRemaining(request -> { try { + TooManyRequestsTelegramResponse.INSTANCE.setMessageSource(messageSource); TooManyRequestsTelegramResponse.INSTANCE.process(request); } catch (Exception e) { log.error("Cannot process response to telegram for request {}", request, e); diff --git a/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java b/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java index 9891990..f75ff85 100644 --- a/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java +++ b/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java @@ -2,6 +2,7 @@ import io.github.drednote.telegram.TelegramProperties; import io.github.drednote.telegram.core.TelegramBot; +import io.github.drednote.telegram.core.TelegramMessageSource; import io.github.drednote.telegram.filter.FilterProperties; import io.github.drednote.telegram.session.SessionProperties.ProxyType; import io.github.drednote.telegram.session.SessionProperties.ProxyUrl; @@ -100,10 +101,11 @@ public TelegramBotSession webhooksTelegramBotSession() { @ConditionalOnSingleCandidate(TelegramBot.class) public TelegramUpdateProcessor telegramUpdateProcessor( SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, - org.telegram.telegrambots.meta.generics.TelegramClient telegramClient + org.telegram.telegrambots.meta.generics.TelegramClient telegramClient, + TelegramMessageSource messageSource ) { return new DefaultTelegramUpdateProcessor( - properties, filterProperties, telegramBot, telegramClient); + properties, filterProperties, telegramBot, telegramClient, messageSource); } /** diff --git a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java index 48c1842..d9834f7 100644 --- a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java +++ b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java @@ -60,7 +60,7 @@ void setUp() { void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException { sessionProperties.setCacheLiveDuration(100); session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, - telegramBot, telegramClient); + telegramBot, telegramClient, null); List generate = generate(50); doAnswer(answer -> { @@ -79,7 +79,7 @@ void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException { void shouldExecuteUpdateOnlyOneOthersRejected() throws InterruptedException { sessionProperties.setCacheLiveDuration(1500); session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, - telegramBot, telegramClient); + telegramBot, telegramClient, null); List generate = generate(5, 1L); doAnswer(answer -> { Thread.sleep(100); @@ -99,7 +99,7 @@ void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() throws InterruptedExc filterProperties.setUserRateLimit(2000); sessionProperties.setMaxThreadsPerUser(5); session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, - telegramBot, telegramClient); + telegramBot, telegramClient, null); List generate = generate(5, 1L); doAnswer(answer -> { Thread.sleep(20);