Skip to content

Commit

Permalink
[FIX]: session processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Drednote committed Oct 31, 2024
1 parent 8425f63 commit cbbcec4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import io.github.drednote.telegram.core.TelegramBot;
import io.github.drednote.telegram.core.TelegramMessageSource;
import io.github.drednote.telegram.core.request.ParsedUpdateRequest;
import io.github.drednote.telegram.core.request.UpdateRequest;
import io.github.drednote.telegram.filter.FilterProperties;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class DefaultTelegramUpdateProcessor extends AbstractTelegramUpdateProces
private final UserRateLimitRequestFilter userRateLimitRequestFilter;
private final TelegramClient telegramClient;
private final ReadWriteLock limitLock = new ReentrantReadWriteLock();
@Nullable
private final TelegramMessageSource messageSource;

/**
* Constructs a {@code DefaultTelegramUpdateProcessor} with specified properties and Telegram
Expand All @@ -64,13 +67,15 @@ public class DefaultTelegramUpdateProcessor extends AbstractTelegramUpdateProces
*/
public DefaultTelegramUpdateProcessor(
SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot,
TelegramClient telegramClient, ThreadFactory threadFactory
TelegramClient telegramClient, ThreadFactory threadFactory,
@Nullable TelegramMessageSource messageSource
) {
super(properties, threadFactory);
Assert.required(telegramBot, "TelegramBot");
Assert.required(filterProperties, "FilterProperties");
Assert.required(telegramClient, "TelegramClient");

this.messageSource = messageSource;
this.telegramClient = telegramClient;
this.telegramBot = telegramBot;
this.maxThreadsPerUser = properties.getMaxThreadsPerUser();
Expand Down Expand Up @@ -107,10 +112,10 @@ public DefaultTelegramUpdateProcessor(
*/
public DefaultTelegramUpdateProcessor(
SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot,
TelegramClient telegramClient
TelegramClient telegramClient, @Nullable TelegramMessageSource messageSource
) {
this(properties, filterProperties, telegramBot, telegramClient,
Executors.defaultThreadFactory());
Executors.defaultThreadFactory(), messageSource);
}

/**
Expand Down Expand Up @@ -216,6 +221,7 @@ private void sendRateLimitResponse() {
}
updateRequests.iterator().forEachRemaining(request -> {
try {
TooManyRequestsTelegramResponse.INSTANCE.setMessageSource(messageSource);
TooManyRequestsTelegramResponse.INSTANCE.process(request);
} catch (Exception e) {
log.error("Cannot process response to telegram for request {}", request, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.github.drednote.telegram.TelegramProperties;
import io.github.drednote.telegram.core.TelegramBot;
import io.github.drednote.telegram.core.TelegramMessageSource;
import io.github.drednote.telegram.filter.FilterProperties;
import io.github.drednote.telegram.session.SessionProperties.ProxyType;
import io.github.drednote.telegram.session.SessionProperties.ProxyUrl;
Expand Down Expand Up @@ -100,10 +101,11 @@ public TelegramBotSession webhooksTelegramBotSession() {
@ConditionalOnSingleCandidate(TelegramBot.class)
public TelegramUpdateProcessor telegramUpdateProcessor(
SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot,
org.telegram.telegrambots.meta.generics.TelegramClient telegramClient
org.telegram.telegrambots.meta.generics.TelegramClient telegramClient,
TelegramMessageSource messageSource
) {
return new DefaultTelegramUpdateProcessor(
properties, filterProperties, telegramBot, telegramClient);
properties, filterProperties, telegramBot, telegramClient, messageSource);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void setUp() {
void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException {
sessionProperties.setCacheLiveDuration(100);
session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties,
telegramBot, telegramClient);
telegramBot, telegramClient, null);

List<Update> generate = generate(50);
doAnswer(answer -> {
Expand All @@ -79,7 +79,7 @@ void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException {
void shouldExecuteUpdateOnlyOneOthersRejected() throws InterruptedException {
sessionProperties.setCacheLiveDuration(1500);
session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties,
telegramBot, telegramClient);
telegramBot, telegramClient, null);
List<Update> generate = generate(5, 1L);
doAnswer(answer -> {
Thread.sleep(100);
Expand All @@ -99,7 +99,7 @@ void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() throws InterruptedExc
filterProperties.setUserRateLimit(2000);
sessionProperties.setMaxThreadsPerUser(5);
session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties,
telegramBot, telegramClient);
telegramBot, telegramClient, null);
List<Update> generate = generate(5, 1L);
doAnswer(answer -> {
Thread.sleep(20);
Expand Down

0 comments on commit cbbcec4

Please sign in to comment.