From cc48af6649c3983cd79148ccbc517d7832a388c9 Mon Sep 17 00:00:00 2001 From: Ivan Galushko Date: Thu, 10 Oct 2024 17:59:17 +0300 Subject: [PATCH 1/2] [FIX]: default session and jar name --- README.md | 23 +- build.gradle | 5 + .../telegram/core/DefaultTelegramBot.java | 21 +- .../core/request/ParsedUpdateRequest.java | 7 +- .../drednote/telegram/filter/FilterOrder.java | 3 +- .../telegram/filter/FilterProperties.java | 2 +- .../filter/FiltersAutoConfiguration.java | 7 - .../pre/UserRateLimitRequestFilter.java | 126 -------- .../handler/UpdateHandlerProperties.java | 18 ++ .../response/AbstractTelegramResponse.java | 11 +- .../response/GenericTelegramResponse.java | 22 +- .../AbstractTelegramUpdateProcessor.java | 158 ++++++++++ .../session/DefaultTelegramBotSession.java | 246 --------------- .../DefaultTelegramUpdateProcessor.java | 283 ++++++++++++++++++ .../telegram/session/LongPollingSession.java | 15 +- .../session/SessionAutoConfiguration.java | 23 +- .../telegram/session/SessionProperties.java | 17 +- .../session/TelegramUpdateProcessor.java | 9 + .../session/UserRateLimitRequestFilter.java | 114 +++++++ .../pre/UserRateLimitRequestFilterTest.java | 25 +- .../response/GenericTelegramResponseTest.java | 8 +- .../DefaultTelegramBotSessionTest.java | 107 ------- .../DefaultTelegramUpdateProcessorTest.java | 120 ++++++++ 23 files changed, 796 insertions(+), 574 deletions(-) delete mode 100644 src/main/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilter.java create mode 100644 src/main/java/io/github/drednote/telegram/session/AbstractTelegramUpdateProcessor.java delete mode 100644 src/main/java/io/github/drednote/telegram/session/DefaultTelegramBotSession.java create mode 100644 src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java create mode 100644 src/main/java/io/github/drednote/telegram/session/TelegramUpdateProcessor.java create mode 100644 src/main/java/io/github/drednote/telegram/session/UserRateLimitRequestFilter.java delete mode 100644 src/test/java/io/github/drednote/telegram/session/DefaultTelegramBotSessionTest.java create mode 100644 src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java diff --git a/README.md b/README.md index 6e58ca3..8be04eb 100644 --- a/README.md +++ b/README.md @@ -669,17 +669,18 @@ All settings tables contain 4 columns: ### Session properties -| Name | Description | Default Value | Required | -|-------------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------------------|----------| -| maxUserParallelRequests | Max number of threads used for consumption messages from a telegram for concrete user. 0 - no restrictions | 1 | true | -| consumeMaxThreads | Max number of threads used for consumption messages from a telegram | 1 | true | -| updateStrategy | The strategy to receive updates from Telegram API. Long polling or webhooks. | LONG_POLLING | true | -| backOffStrategy | Backoff strategy for failed requests to Telegram API. Impl of BackOff interface must be with public empty constructor | ExponentialBackOff | true | -| proxyType | The proxy type for executing requests to Telegram API. | NO_PROXY | true | -| proxyUrl | The proxy url in format `host:port` or if auth needed `host:port:username:password`. | - | false | -| cacheLiveDuration | Cache lifetime used in `DefaultTelegramBotSession` | 1 | true | -| cacheLiveDurationUnit | The `TimeUnit` which will be applied to `cacheLiveDuration` | hours | true | -| longPolling | LongPolling properties. | [LongPolling properties](#Longpolling-properties) | false | +| Name | Description | Default Value | Required | +|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------|----------| +| maxUserParallelRequests | Max number of threads used for consumption messages from a telegram for concrete user. 0 - no restrictions | 1 | true | +| consumeMaxThreads | Max number of threads used for consumption messages from a telegram | 10 | true | +| maxMessagesInQueue | Limits the number of updates to be store in memory queue for update processing. 0 - no limit. Defaults to (consumeMaxThreads * 1.5). | 15 | true | +| updateStrategy | The strategy to receive updates from Telegram API. Long polling or webhooks. | LONG_POLLING | true | +| backOffStrategy | Backoff strategy for failed requests to Telegram API. Impl of BackOff interface must be with public empty constructor | ExponentialBackOff | true | +| proxyType | The proxy type for executing requests to Telegram API. | NO_PROXY | true | +| proxyUrl | The proxy url in format `host:port` or if auth needed `host:port:username:password`. | - | false | +| cacheLiveDuration | Cache lifetime used in `DefaultTelegramBotSession` | 1 | true | +| cacheLiveDurationUnit | The `TimeUnit` which will be applied to `cacheLiveDuration` | hours | true | +| longPolling | LongPolling properties. | [LongPolling properties](#Longpolling-properties) | false | #### LongPolling properties diff --git a/build.gradle b/build.gradle index b3c6dde..3247f6d 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,11 @@ sourceCompatibility = '17' bootJar.enabled = false +jar { + enabled = true + archiveClassifier = '' +} + configurations { compileOnly { extendsFrom annotationProcessor diff --git a/src/main/java/io/github/drednote/telegram/core/DefaultTelegramBot.java b/src/main/java/io/github/drednote/telegram/core/DefaultTelegramBot.java index 617645e..6aa68c3 100644 --- a/src/main/java/io/github/drednote/telegram/core/DefaultTelegramBot.java +++ b/src/main/java/io/github/drednote/telegram/core/DefaultTelegramBot.java @@ -9,6 +9,7 @@ import io.github.drednote.telegram.filter.post.PostUpdateFilter; import io.github.drednote.telegram.filter.pre.PreUpdateFilter; import io.github.drednote.telegram.handler.UpdateHandler; +import io.github.drednote.telegram.response.AbstractTelegramResponse; import io.github.drednote.telegram.response.SimpleMessageTelegramResponse; import io.github.drednote.telegram.response.TelegramResponse; import io.github.drednote.telegram.utils.Assert; @@ -23,12 +24,12 @@ import org.telegram.telegrambots.meta.generics.TelegramClient; /** - * The {@code DefaultTelegramBot} class extends the {@code TelegramLongPollingBot} class and - * serves as the main bot implementation for handling updates. The bot overrides the - * `onUpdateReceived()` method to handle incoming updates. Within the {@link #onUpdateReceived} - * method, a {@link DefaultUpdateRequest} is created to encapsulate the {@link Update}. The request - * is then processed through a series of steps: pre-filtering, handling, post-filtering, and - * answering. Any exceptions thrown during processing are handled by the exception handler + * The {@code DefaultTelegramBot} class extends the {@code TelegramBot} class and serves as the main + * bot implementation for handling updates. The bot overrides the `onUpdateReceived()` method to + * handle incoming updates. Within the {@link #onUpdateReceived} method, a + * {@link DefaultUpdateRequest} is created to encapsulate the {@link Update}. The request is then + * processed through a series of steps: pre-filtering, handling, post-filtering, and answering. Any + * exceptions thrown during processing are handled by the exception handler * * @author Ivan Galushko * @see UpdateHandler @@ -69,8 +70,8 @@ public class DefaultTelegramBot implements TelegramBot { private final TelegramClient telegramClient; /** - * Creates a new instance of the {@code DefaultTelegramBot} class with the provided - * properties and dependencies + * Creates a new instance of the {@code DefaultTelegramBot} class with the provided properties + * and dependencies * * @param properties the Telegram properties, not null * @param updateHandlers the collection of update handlers, not null @@ -222,6 +223,10 @@ private void doAnswer(DefaultUpdateRequest request) throws TelegramApiException if (response instanceof SimpleMessageTelegramResponse simpleMessageTelegramResponse) { simpleMessageTelegramResponse.setMessageSource(messageSource); } + if (response instanceof AbstractTelegramResponse abstractTelegramResponse) { + abstractTelegramResponse.setParseMode( + telegramProperties.getUpdateHandler().getParseMode()); + } response.process(request); } } diff --git a/src/main/java/io/github/drednote/telegram/core/request/ParsedUpdateRequest.java b/src/main/java/io/github/drednote/telegram/core/request/ParsedUpdateRequest.java index 1652029..582788b 100644 --- a/src/main/java/io/github/drednote/telegram/core/request/ParsedUpdateRequest.java +++ b/src/main/java/io/github/drednote/telegram/core/request/ParsedUpdateRequest.java @@ -19,13 +19,16 @@ */ public class ParsedUpdateRequest extends AbstractUpdateRequest { - public ParsedUpdateRequest(Update update) { + private final TelegramClient telegramClient; + + public ParsedUpdateRequest(Update update, TelegramClient telegramClient) { super(update); + this.telegramClient = telegramClient; } @Override public TelegramClient getAbsSender() { - throw new UnsupportedOperationException("Not supported in this implementation"); + return telegramClient; } @Override diff --git a/src/main/java/io/github/drednote/telegram/filter/FilterOrder.java b/src/main/java/io/github/drednote/telegram/filter/FilterOrder.java index 4c26779..0c84fc7 100644 --- a/src/main/java/io/github/drednote/telegram/filter/FilterOrder.java +++ b/src/main/java/io/github/drednote/telegram/filter/FilterOrder.java @@ -8,7 +8,6 @@ import io.github.drednote.telegram.filter.pre.PriorityPreUpdateFilter; import io.github.drednote.telegram.filter.pre.RoleFilter; import io.github.drednote.telegram.filter.pre.ScenarioUpdateHandlerPopular; -import io.github.drednote.telegram.filter.pre.UserRateLimitRequestFilter; import java.util.Map; import org.springframework.core.Ordered; @@ -24,7 +23,7 @@ public final class FilterOrder { // ------- Orders for priority pre filters ------- // public static final Map, Integer> PRIORITY_PRE_FILTERS = Map.of( - UserRateLimitRequestFilter.class, FilterOrder.HIGHEST_PRECEDENCE, +// UserRateLimitRequestFilter.class, FilterOrder.HIGHEST_PRECEDENCE, RoleFilter.class, FilterOrder.HIGHEST_PRECEDENCE + 100, ScenarioUpdateHandlerPopular.class, FilterOrder.HIGHEST_PRECEDENCE + 200, ControllerUpdateHandlerPopular.class, FilterOrder.HIGHEST_PRECEDENCE + 300 diff --git a/src/main/java/io/github/drednote/telegram/filter/FilterProperties.java b/src/main/java/io/github/drednote/telegram/filter/FilterProperties.java index b04b578..caa66d0 100644 --- a/src/main/java/io/github/drednote/telegram/filter/FilterProperties.java +++ b/src/main/java/io/github/drednote/telegram/filter/FilterProperties.java @@ -2,7 +2,7 @@ import io.github.drednote.telegram.filter.post.NotHandledUpdateFilter; import io.github.drednote.telegram.filter.pre.AccessPermissionFilter; -import io.github.drednote.telegram.filter.pre.UserRateLimitRequestFilter; +import io.github.drednote.telegram.session.UserRateLimitRequestFilter; import io.github.drednote.telegram.response.NotHandledTelegramResponse; import java.time.temporal.ChronoUnit; import lombok.Getter; diff --git a/src/main/java/io/github/drednote/telegram/filter/FiltersAutoConfiguration.java b/src/main/java/io/github/drednote/telegram/filter/FiltersAutoConfiguration.java index 24aa896..7e49cf2 100644 --- a/src/main/java/io/github/drednote/telegram/filter/FiltersAutoConfiguration.java +++ b/src/main/java/io/github/drednote/telegram/filter/FiltersAutoConfiguration.java @@ -9,7 +9,6 @@ import io.github.drednote.telegram.filter.pre.HasRoleRequestFilter; import io.github.drednote.telegram.filter.pre.PreUpdateFilter; import io.github.drednote.telegram.filter.pre.RoleFilter; -import io.github.drednote.telegram.filter.pre.UserRateLimitRequestFilter; import io.github.drednote.telegram.handler.UpdateHandlerAutoConfiguration; import io.github.drednote.telegram.utils.FieldProvider; import org.springframework.beans.factory.ObjectProvider; @@ -56,12 +55,6 @@ public AccessPermissionFilter accessPermissionFilter(PermissionProperties permis return new AccessPermissionFilter(permissionProperties); } - @Bean - @ConditionalOnMissingBean - public UserRateLimitRequestFilter concurrentUserRequestFilter(FilterProperties properties) { - return new UserRateLimitRequestFilter(properties); - } - @Bean @ConditionalOnMissingBean public NotHandledUpdateFilter notHandledUpdateFilter() { diff --git a/src/main/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilter.java b/src/main/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilter.java deleted file mode 100644 index 0024506..0000000 --- a/src/main/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilter.java +++ /dev/null @@ -1,126 +0,0 @@ -package io.github.drednote.telegram.filter.pre; - -import static org.apache.commons.lang3.ObjectUtils.max; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; -import io.github.bucket4j.Bandwidth; -import io.github.bucket4j.Bucket; -import io.github.bucket4j.Refill; -import io.github.bucket4j.local.LocalBucket; -import io.github.bucket4j.local.SynchronizationStrategy; -import io.github.drednote.telegram.core.request.UpdateRequest; -import io.github.drednote.telegram.filter.FilterOrder; -import io.github.drednote.telegram.filter.FilterProperties; -import io.github.drednote.telegram.response.TooManyRequestsTelegramResponse; -import io.github.drednote.telegram.utils.Assert; -import java.time.Duration; -import org.springframework.lang.NonNull; -import org.springframework.lang.Nullable; - -/** - * Implementation of a priority pre-update filter for rate-limiting user requests. - * - *

This filter uses a caching mechanism to manage rate limits for individual users. It tracks - * the rate of incoming requests per user and responds with a - * {@link TooManyRequestsTelegramResponse} if the rate limit is exceeded. - * - * @author Ivan Galushko - * @see TooManyRequestsTelegramResponse - */ -public class UserRateLimitRequestFilter implements PriorityPreUpdateFilter { - - private final FilterProperties filterProperties; - private final Cache cache; - private final Duration duration; - /** - * Callback that called on remove cached LocalBucket. Using in testing, but you can use this if - * you needed - */ - @Nullable - private RemovalListener cacheEvictionCallback; - - /** - * Constructs a {@code UserRateLimitRequestFilter} with the specified {@link FilterProperties} - * - * @param filterProperties The filter properties for rate-limiting user requests. - * @throws IllegalArgumentException if filterProperties is null. - */ - public UserRateLimitRequestFilter(FilterProperties filterProperties) { - Assert.required(filterProperties, "FilterProperties"); - - this.filterProperties = filterProperties; - this.duration = Duration.of(filterProperties.getUserRateLimit(), - filterProperties.getUserRateLimitUnit()); - this.cache = Caffeine.newBuilder() - .expireAfterAccess(getCacheLiveDuration()) - .evictionListener(this::onRemoval) - .build(); - } - - private Duration getCacheLiveDuration() { - Duration cacheExpireDuration = Duration.of(filterProperties.getUserRateLimitCacheExpire(), - filterProperties.getUserRateLimitCacheExpireUnit()); - return max(cacheExpireDuration, duration); - } - - /** - * Pre-filters the incoming Telegram update request to rate-limit user requests. - * - *

This method checks the rate of incoming requests per user using a caching mechanism. - * If the rate limit is exceeded, it sets the {@link TooManyRequestsTelegramResponse} as the - * response for the update request. - * - * @param request The incoming Telegram update request to be pre-filtered, not null - */ - @Override - public void preFilter(@NonNull UpdateRequest request) { - Assert.notNull(request, "UpdateRequest"); - - Long chatId = request.getChatId(); - if (filterProperties.getUserRateLimit() > 0) { - LocalBucket bucket = getBucket(chatId); - if (!bucket.tryConsume(1)) { - request.getAccessor().setResponse(TooManyRequestsTelegramResponse.INSTANCE); - } - } - } - - private LocalBucket getBucket(Long chatId) { - return cache.get(chatId, key -> - Bucket.builder() - .addLimit(Bandwidth.classic(1, Refill.intervally(1, this.duration))) - .withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED) - .withMillisecondPrecision() - .build() - ); - } - - private void onRemoval(@Nullable Long key, @Nullable LocalBucket value, RemovalCause cause) { - if (cacheEvictionCallback != null) { - cacheEvictionCallback.onRemoval(key, value, cause); - } - } - - @Override - public int getPreOrder() { - return FilterOrder.PRIORITY_PRE_FILTERS.get(this.getClass()); - } - - /** - * Sets the cache eviction callback for this filter. - * - *

Using in testing, but you can use this if you needed - * - * @param cacheEvictionCallback The cache eviction callback to be set, not null - * @throws IllegalArgumentException if cacheEvictionCallback is null - */ - public void setCacheEvictionCallback( - RemovalListener cacheEvictionCallback - ) { - Assert.notNull(cacheEvictionCallback, "RemovalListener"); - this.cacheEvictionCallback = cacheEvictionCallback; - } -} diff --git a/src/main/java/io/github/drednote/telegram/handler/UpdateHandlerProperties.java b/src/main/java/io/github/drednote/telegram/handler/UpdateHandlerProperties.java index d35aef8..c7d162f 100644 --- a/src/main/java/io/github/drednote/telegram/handler/UpdateHandlerProperties.java +++ b/src/main/java/io/github/drednote/telegram/handler/UpdateHandlerProperties.java @@ -10,6 +10,7 @@ import io.github.drednote.telegram.response.InternalErrorTelegramResponse; import io.github.drednote.telegram.session.SessionProperties; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -55,6 +56,12 @@ public class UpdateHandlerProperties { */ @NonNull private boolean serializeJavaObjectWithJackson = true; + /** + * Default parse mode of a text message sent to telegram. Applies only if you return raw string + * from update processing ({@link UpdateHandler}) + */ + @NonNull + private ParseMode parseMode = ParseMode.NO; /** * If scenario is enabled and {@link SessionProperties#getMaxThreadsPerUser} is set value other * than 1, throws an error with a warning about using scenario safe only when @@ -62,4 +69,15 @@ public class UpdateHandlerProperties { */ @NonNull private boolean enabledWarningForScenario = true; + + @RequiredArgsConstructor + @Getter + public enum ParseMode { + NO(null), + MARKDOWN("Markdown"), + MARKDOWN_V2("MarkdownV2"), + HTML("html"); + + private final String value; + } } diff --git a/src/main/java/io/github/drednote/telegram/response/AbstractTelegramResponse.java b/src/main/java/io/github/drednote/telegram/response/AbstractTelegramResponse.java index 99f3f41..0dc74db 100644 --- a/src/main/java/io/github/drednote/telegram/response/AbstractTelegramResponse.java +++ b/src/main/java/io/github/drednote/telegram/response/AbstractTelegramResponse.java @@ -1,8 +1,7 @@ package io.github.drednote.telegram.response; -import static org.telegram.telegrambots.meta.api.methods.ParseMode.MARKDOWN; - import io.github.drednote.telegram.core.request.UpdateRequest; +import io.github.drednote.telegram.handler.UpdateHandlerProperties.ParseMode; import org.telegram.telegrambots.meta.api.methods.send.SendMessage; import org.telegram.telegrambots.meta.api.objects.message.Message; import org.telegram.telegrambots.meta.exceptions.TelegramApiException; @@ -15,6 +14,8 @@ */ public abstract class AbstractTelegramResponse implements TelegramResponse { + protected ParseMode parseMode = ParseMode.NO; + /** * Sends a text message to the specified chat using the provided string * @@ -28,7 +29,11 @@ protected Message sendString(String string, UpdateRequest request) TelegramClient absSender = request.getAbsSender(); Long chatId = request.getChatId(); SendMessage sendMessage = new SendMessage(chatId.toString(), string); - sendMessage.setParseMode(MARKDOWN); + sendMessage.setParseMode(parseMode.getValue()); return absSender.execute(sendMessage); } + + public void setParseMode(ParseMode parseMode) { + this.parseMode = parseMode; + } } diff --git a/src/main/java/io/github/drednote/telegram/response/GenericTelegramResponse.java b/src/main/java/io/github/drednote/telegram/response/GenericTelegramResponse.java index e9275a3..146edee 100644 --- a/src/main/java/io/github/drednote/telegram/response/GenericTelegramResponse.java +++ b/src/main/java/io/github/drednote/telegram/response/GenericTelegramResponse.java @@ -1,7 +1,5 @@ package io.github.drednote.telegram.response; -import static org.telegram.telegrambots.meta.api.methods.ParseMode.MARKDOWN; - import com.fasterxml.jackson.core.JsonProcessingException; import io.github.drednote.telegram.core.request.UpdateRequest; import io.github.drednote.telegram.utils.Assert; @@ -10,9 +8,6 @@ import java.util.ArrayList; import java.util.Collection; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.InvalidPropertyException; -import org.springframework.beans.PropertyAccessException; -import org.springframework.beans.PropertyAccessorFactory; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.telegram.telegrambots.meta.api.methods.botapimethods.BotApiMethod; @@ -77,10 +72,8 @@ public void process(UpdateRequest request) throws TelegramApiException { } else if (response instanceof byte[] bytes) { responseMessage = sendString(new String(bytes, StandardCharsets.UTF_8), request); } else if (response instanceof BotApiMethod botApiMethod) { - postProcessApiMethod(botApiMethod, request); responseMessage = request.getAbsSender().execute(botApiMethod); - } else if (response instanceof SendMediaBotMethod sendMediaBotMethod) { - postProcessApiMethod(sendMediaBotMethod, request); + } else if (response instanceof SendMediaBotMethod) { responseMessage = tryToSendMedia(request); } else if (response instanceof TelegramResponse telegramResponse) { telegramResponse.process(request); @@ -104,19 +97,6 @@ public void process(UpdateRequest request) throws TelegramApiException { } } - private void postProcessApiMethod(Object botApiMethod, UpdateRequest request) { - try { - var propertyAccessor = PropertyAccessorFactory.forDirectFieldAccess(botApiMethod); - if (propertyAccessor.getPropertyValue(PARSE_MODE) == null) { - Class type = propertyAccessor.getPropertyType(PARSE_MODE); - if (type != null && String.class.isAssignableFrom(type)) { - propertyAccessor.setPropertyValue(PARSE_MODE, MARKDOWN); - } - } - } catch (InvalidPropertyException | PropertyAccessException ignored) { - } - } - private String truncateQuotes(String stringResponse) { if (!StringUtils.isBlank(stringResponse) && (stringResponse.startsWith("\"") && stringResponse.endsWith("\""))) { diff --git a/src/main/java/io/github/drednote/telegram/session/AbstractTelegramUpdateProcessor.java b/src/main/java/io/github/drednote/telegram/session/AbstractTelegramUpdateProcessor.java new file mode 100644 index 0000000..c6b28a8 --- /dev/null +++ b/src/main/java/io/github/drednote/telegram/session/AbstractTelegramUpdateProcessor.java @@ -0,0 +1,158 @@ +package io.github.drednote.telegram.session; + +import io.github.drednote.telegram.utils.Assert; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.telegram.telegrambots.meta.api.objects.Update; + +/** + * {@code AbstractTelegramUpdateProcessor} is an abstract class that implements the + * {@code TelegramUpdateProcessor} interface for processing Telegram updates. It provides the + * foundational functionalities for managing the processing of updates with a thread pool and limits + * on the number of messages in queue. + * + *

This class handles the configuration of thread pool settings and ensures proper + * synchronization when processing updates.

+ * + * @author Ivan Galushko + */ +public abstract class AbstractTelegramUpdateProcessor implements TelegramUpdateProcessor { + + private static final Logger log = LoggerFactory.getLogger( + AbstractTelegramUpdateProcessor.class); + final ThreadPoolExecutor executorService; + private final int maxMessageInQueue; + + private final ReadWriteLock maxMessagesLock = new ReentrantReadWriteLock(); + private final Condition maxMessagesLimit = maxMessagesLock.writeLock().newCondition(); + + /** + * Constructs an {@code AbstractTelegramUpdateProcessor} with specified properties and thread + * factory. + * + * @param properties configuration settings for the session. + * @param threadFactory the factory to create threads for the executor service. + * @throws IllegalArgumentException if {@code maxMessagesInQueue} is negative or + * {@code consumeMaxThreads} is not positive. + */ + protected AbstractTelegramUpdateProcessor( + SessionProperties properties, ThreadFactory threadFactory + ) { + Assert.required(properties, "SessionProperties"); + Assert.required(threadFactory, "ThreadFactory"); + + int consumeMaxThreads = properties.getConsumeMaxThreads(); + int maxMessagesInQueue = properties.getMaxMessagesInQueue(); + if (maxMessagesInQueue < 0) { + throw new IllegalArgumentException( + "maxMessageInQueue must be greater than or equal to 0"); + } + if (consumeMaxThreads <= 0) { + throw new IllegalArgumentException("maxThreads must be greater than 0"); + } + + this.maxMessageInQueue = maxMessagesInQueue == 0 ? Integer.MAX_VALUE : maxMessagesInQueue; + this.executorService = new ThreadPoolExecutor( + consumeMaxThreads, consumeMaxThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(this.maxMessageInQueue), + threadFactory, + new WaitUntilAvailablePolicy() + ); + } + + /** + * Constructs an {@code AbstractTelegramUpdateProcessor} with default thread factory. + * + * @param properties configuration settings for the session. + */ + protected AbstractTelegramUpdateProcessor(SessionProperties properties) { + this(properties, Executors.defaultThreadFactory()); + } + + /** + * Processes a list of updates by submitting them to the executor service for handling. + * + * @param updates The list of updates to be processed + */ + public synchronized void process(List updates) { + for (Update update : updates) { + getRunnable(update).ifPresent(runnable -> { + executorService.submit(() -> { + try { + runnable.run(); + } finally { + releaseSpace(); + } + }); + }); + } + } + + /** + * Returns an optional runnable task for the given update. + * + * @param update the Telegram update to be processed. + * @return an {@code Optional} containing the task to execute or empty if not + * applicable. + */ + protected abstract Optional getRunnable(Update update); + + private void releaseSpace() { + maxMessagesLock.readLock().lock(); + try { + if (executorService.getQueue().size() >= maxMessageInQueue - 1) { + maxMessagesLock.readLock().unlock(); + maxMessagesLock.writeLock().lock(); + try { + log.trace("Releasing waiting thread pool"); + maxMessagesLimit.signal(); + maxMessagesLock.readLock().lock(); + } finally { + maxMessagesLock.writeLock().unlock(); + } + } + } finally { + maxMessagesLock.readLock().unlock(); + } + } + + /** + * {@code WaitUntilAvailablePolicy} is a custom rejection policy that handles rejected tasks by + * waiting until there is space available in the executor's queue. + */ + private class WaitUntilAvailablePolicy implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + log.trace("Waiting until thread pool will be available"); + maxMessagesLock.writeLock().lock(); + try { + while (!executor.getQueue().offer(r)) { + if (executor.isShutdown()) { + break; + } + maxMessagesLimit.await(); + } + log.trace("Put task into queue"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted", e); + } finally { + maxMessagesLock.writeLock().unlock(); + } + } + } +} diff --git a/src/main/java/io/github/drednote/telegram/session/DefaultTelegramBotSession.java b/src/main/java/io/github/drednote/telegram/session/DefaultTelegramBotSession.java deleted file mode 100644 index 68804de..0000000 --- a/src/main/java/io/github/drednote/telegram/session/DefaultTelegramBotSession.java +++ /dev/null @@ -1,246 +0,0 @@ -package io.github.drednote.telegram.session; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Expiry; -import io.github.drednote.telegram.core.TelegramBot; -import io.github.drednote.telegram.core.request.ParsedUpdateRequest; -import io.github.drednote.telegram.utils.Assert; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.checkerframework.checker.index.qual.NonNegative; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.telegram.telegrambots.meta.api.objects.Update; -import org.telegram.telegrambots.meta.api.objects.User; - -/** - * Default implementation of the {@link TelegramBotSession} interface for managing consumption - * updates. - * - *

Implementations should call {@link DefaultTelegramBotSession#processUpdates(List)} for sending - * updates to {@link TelegramBot} - * - * @author Ivan Galushko - * @see TelegramBot - * @see SessionProperties - */ -public abstract class DefaultTelegramBotSession implements TelegramBotSession { - - private static final Logger log = LoggerFactory.getLogger(DefaultTelegramBotSession.class); - private final TelegramBot telegramBot; - private final ExecutorService executorService; - private final int maxThreadsPerUser; - private final int maxMessageInQueue; - final AtomicInteger updatesCount = new AtomicInteger(0); - private final Cache> updates; - private final Cache userProcessing; - - private final ReadWriteLock maxMessagesLock = new ReentrantReadWriteLock(); - private final Condition maxMessagesLimit = maxMessagesLock.writeLock().newCondition(); - - protected DefaultTelegramBotSession( - SessionProperties properties, TelegramBot telegramBot - ) { - Assert.required(telegramBot, "TelegramBot"); - Assert.required(properties, "SessionProperties"); - - this.telegramBot = telegramBot; - this.maxThreadsPerUser = properties.getMaxThreadsPerUser(); - this.maxMessageInQueue = properties.getLongPolling().getMaxMessagesInQueue(); - - if (properties.getConsumeMaxThreads() <= 0) { - throw new IllegalArgumentException("maxThreads must be greater than 0"); - } - if (maxThreadsPerUser < 0) { - throw new IllegalArgumentException("maxThreadsPerUser must be greater than 0"); - } - if (maxMessageInQueue < 0) { - throw new IllegalArgumentException( - "maxMessageInQueue must be greater or equals than 0"); - } - if (properties.getCacheLiveDuration() < 0) { - throw new IllegalArgumentException( - "cacheLiveDuration must be greater or equals than 0"); - } - - this.executorService = Executors.newFixedThreadPool(properties.getConsumeMaxThreads()); - this.updates = Caffeine.newBuilder() - .expireAfter(new UpdatesExpiry( - properties.getCacheLiveDuration(), - properties.getCacheLiveDurationUnit()) - ) - .build(); - this.userProcessing = Caffeine.newBuilder() - .expireAfter(new UserProcessingExpiry( - properties.getCacheLiveDuration(), - properties.getCacheLiveDurationUnit(), maxThreadsPerUser) - ) - .build(); - } - - /** - * Processes a list of updates by submitting them to the executor service for handling. - * - * @param updates The list of updates to be processed - */ - protected synchronized void processUpdates(List updates) { - for (Update update : updates) { - waitFreeSpaceIfNeeded(); - if (maxThreadsPerUser == 0) { - executorService.submit(() -> telegramBot.onUpdateReceived(update)); - } else { - ParsedUpdateRequest request = new ParsedUpdateRequest(update); - User user = request.getUser(); - if (user != null) { - processWithLimitByUser(update, user.getId()); - } else { - executorService.submit(() -> telegramBot.onUpdateReceived(update)); - } - } - } - } - - private void waitFreeSpaceIfNeeded() { - maxMessagesLock.readLock().lock(); - try { - if (updatesCount.get() >= maxMessageInQueue) { - maxMessagesLock.readLock().unlock(); - maxMessagesLock.writeLock().lock(); - try { - while (updatesCount.get() >= maxMessageInQueue) { - maxMessagesLimit.await(); - } - maxMessagesLock.readLock().lock(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - maxMessagesLock.writeLock().unlock(); - } - } - } finally { - maxMessagesLock.readLock().unlock(); - } - } - - private void processWithLimitByUser(Update update, Long userId) { - log.trace("Saving to queue update {} with user {}", update.getUpdateId(), userId); - updates.get(userId, k -> new ConcurrentLinkedQueue<>()).add(update); - updatesCount.incrementAndGet(); - userProcessing.get(userId, k -> new Semaphore(maxThreadsPerUser)); - processNext(userId); - } - - private void processNext(Long userId) { - Semaphore semaphore = userProcessing.getIfPresent(userId); - - if (semaphore.tryAcquire()) { - log.trace("Lock user {}", userId); - Queue queue = updates.getIfPresent(userId); - Update update = queue.poll(); - - if (update != null) { - executorService.submit(() -> { - try { - log.debug("Executing update {} with user {}", update.getUpdateId(), userId); - telegramBot.onUpdateReceived(update); - } finally { - releaseSpace(); - semaphore.release(); - log.trace("Release user {}", userId); - processNext(userId); - } - }); - } else { - semaphore.release(); - } - } - } - - private void releaseSpace() { - maxMessagesLock.readLock().lock(); - try { - if (updatesCount.get() >= maxMessageInQueue) { - maxMessagesLock.readLock().unlock(); - maxMessagesLock.writeLock().lock(); - try { - if (updatesCount.getAndDecrement() >= maxMessageInQueue) { - maxMessagesLimit.signalAll(); - } - maxMessagesLock.readLock().lock(); - } finally { - maxMessagesLock.writeLock().unlock(); - } - } else { - updatesCount.decrementAndGet(); - } - } finally { - maxMessagesLock.readLock().unlock(); - } - } - - private static class UpdatesExpiry extends AbstractExpiry> { - - private final long duration; - private final TimeUnit unit; - - private UpdatesExpiry(long duration, TimeUnit unit) { - this.duration = duration; - this.unit = unit; - } - - protected long getDuration(Queue updates) { - return updates.isEmpty() ? unit.toNanos(duration) : Long.MAX_VALUE; - } - } - - private static class UserProcessingExpiry extends AbstractExpiry { - - private final long duration; - private final TimeUnit unit; - private final int maxThreadsPerUser; - - private UserProcessingExpiry(long duration, TimeUnit unit, int maxThreadsPerUser) { - this.duration = duration; - this.unit = unit; - this.maxThreadsPerUser = maxThreadsPerUser; - } - - protected long getDuration(Semaphore semaphore) { - return semaphore.availablePermits() == maxThreadsPerUser - ? unit.toNanos(duration) - : Long.MAX_VALUE; - } - } - - private static abstract class AbstractExpiry implements Expiry { - - @Override - public long expireAfterCreate(Long key, T value, long currentTime) { - return getDuration(value); - } - - @Override - public long expireAfterUpdate(Long key, T value, long currentTime, - @NonNegative long currentDuration) { - return getDuration(value); - } - - @Override - public long expireAfterRead(Long key, T value, long currentTime, - @NonNegative long currentDuration) { - return getDuration(value); - } - - protected abstract long getDuration(T updates); - } -} diff --git a/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java b/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java new file mode 100644 index 0000000..86f7670 --- /dev/null +++ b/src/main/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessor.java @@ -0,0 +1,283 @@ +package io.github.drednote.telegram.session; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import io.github.drednote.telegram.core.TelegramBot; +import io.github.drednote.telegram.core.request.ParsedUpdateRequest; +import io.github.drednote.telegram.core.request.UpdateRequest; +import io.github.drednote.telegram.filter.FilterProperties; +import io.github.drednote.telegram.response.TooManyRequestsTelegramResponse; +import io.github.drednote.telegram.utils.Assert; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.checkerframework.checker.index.qual.NonNegative; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.lang.Nullable; +import org.telegram.telegrambots.meta.api.objects.Update; +import org.telegram.telegrambots.meta.api.objects.User; +import org.telegram.telegrambots.meta.generics.TelegramClient; + +/** + * Default implementation of the {@link TelegramUpdateProcessor} interface for managing consumption + * updates. + *

+ * {@code DefaultTelegramUpdateProcessor} processes updates from Telegram. It handles user requests + * and applies rate limiting based on the configuration provided. + * + * @author Ivan Galushko + * @see TelegramBot + * @see SessionProperties + */ +public class DefaultTelegramUpdateProcessor extends AbstractTelegramUpdateProcessor { + + private static final Logger log = LoggerFactory.getLogger(DefaultTelegramUpdateProcessor.class); + private final TelegramBot telegramBot; + private final int maxThreadsPerUser; + private final Cache userProcessing; + private final Set limitProcessing; + private final UserRateLimitRequestFilter userRateLimitRequestFilter; + private final TelegramClient telegramClient; + private final ReadWriteLock limitLock = new ReentrantReadWriteLock(); + + /** + * Constructs a {@code DefaultTelegramUpdateProcessor} with specified properties and Telegram + * client. + * + * @param properties Configuration settings for the session. + * @param filterProperties Filter properties for controlling request processing. + * @param telegramBot The telegram bot instance for sending messages. + * @param telegramClient The Telegram client instance used to interact with Telegram API. + * @param threadFactory The thread factory to create threads for processing requests. + * @throws IllegalArgumentException if {@code maxThreadsPerUser} or {@code cacheLiveDuration} is + * invalid. + */ + public DefaultTelegramUpdateProcessor( + SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, + TelegramClient telegramClient, ThreadFactory threadFactory + ) { + super(properties, threadFactory); + Assert.required(telegramBot, "TelegramBot"); + Assert.required(filterProperties, "FilterProperties"); + Assert.required(telegramClient, "TelegramClient"); + + this.telegramClient = telegramClient; + this.telegramBot = telegramBot; + this.maxThreadsPerUser = properties.getMaxThreadsPerUser(); + + if (maxThreadsPerUser < 0) { + throw new IllegalArgumentException("maxThreadsPerUser must be greater than 0"); + } + if (properties.getCacheLiveDuration() < 0) { + throw new IllegalArgumentException( + "cacheLiveDuration must be greater or equals than 0"); + } + + this.userProcessing = Caffeine.newBuilder() + .expireAfter(new UserProcessingExpiry( + properties.getCacheLiveDuration(), + properties.getCacheLiveDurationUnit(), + maxThreadsPerUser) + ) + .build(); + this.limitProcessing = ConcurrentHashMap.newKeySet(); + this.userRateLimitRequestFilter = new UserRateLimitRequestFilter(filterProperties); + ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1); + executorService.scheduleWithFixedDelay( + this::sendRateLimitResponse, 0, 1, TimeUnit.SECONDS); + } + + /** + * Constructs a {@code DefaultTelegramUpdateProcessor} with default thread factory. + * + * @param properties Configuration settings for the session. + * @param filterProperties Filter properties for controlling request processing. + * @param telegramBot The telegram bot instance for sending messages. + * @param telegramClient The Telegram client instance used to interact with Telegram API. + */ + public DefaultTelegramUpdateProcessor( + SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, + TelegramClient telegramClient + ) { + this(properties, filterProperties, telegramBot, telegramClient, + Executors.defaultThreadFactory()); + } + + /** + * Retrieves a runnable task for processing the given update. + * + * @param update the Telegram update to be processed. + * @return an {@code Optional} containing the task to execute or empty if rate limit + * exceeds. + */ + @Override + protected Optional getRunnable(Update update) { + ParsedUpdateRequest request = new ParsedUpdateRequest(update, telegramClient); + User user = request.getUser(); + if (user == null) { + return doProcessUpdate(update, null); + } else if (maxThreadsPerUser == 0) { + return processWithoutLimit(user.getId(), request); + } else { + return processWithLimitByUser(user.getId(), request); + } + } + + /** + * Processes the update without any thread limit for the user. + * + * @param userId the ID of the user. + * @param request the UpdateRequest containing the update details. + * @return an {@code Optional} containing the task to execute or empty if not + * applicable. + */ + private Optional processWithoutLimit(Long userId, UpdateRequest request) { + Update update = request.getOrigin(); + boolean limitNotExceeded = userRateLimitRequestFilter.filter(userId); + return limitNotExceeded + ? doProcessUpdate(update, null) + : addRateLimitResponseToProcess(request, userId); + } + + /** + * Processes the update applying user-specific thread limits. + * + * @param userId the ID of the user. + * @param request the UpdateRequest containing the update details. + * @return an {@code Optional} containing the task to execute or empty if not + * applicable. + */ + private Optional processWithLimitByUser(Long userId, UpdateRequest request) { + Semaphore semaphore = userProcessing.get(userId, k -> new Semaphore(maxThreadsPerUser)); + Update update = request.getOrigin(); + + if (userRateLimitRequestFilter.filter(userId) && semaphore.tryAcquire()) { + log.trace("Lock user {}", userId); + return doProcessUpdate(update, semaphore::release); + } else { + return addRateLimitResponseToProcess(request, userId); + } + } + + /** + * Processes the given update and defines an action to perform once processing is finished. + * + * @param update the Telegram update to be processed. + * @param actionOnFinish an optional runnable action to execute after processing is complete. + * @return an {@code Optional} containing the task to execute for processing. + */ + private Optional doProcessUpdate(Update update, @Nullable Runnable actionOnFinish) { + return Optional.of(() -> { + try { + log.debug("Executing update {}", update.getUpdateId()); + telegramBot.onUpdateReceived(update); + } finally { + log.trace("Release update {}", update.getUpdateId()); + if (actionOnFinish != null) { + actionOnFinish.run(); + } + } + }); + } + + private Optional addRateLimitResponseToProcess(UpdateRequest request, Long userId) { + limitLock.readLock().lock(); + try { + log.trace("Limit exceeded for user {}", userId); + limitProcessing.add(request); + } finally { + limitLock.readLock().unlock(); + } + return Optional.empty(); + } + + /** + * Processes and sends responses for requests that exceeded rate limits, clearing the limit + * processing set. + */ + private void sendRateLimitResponse() { + limitLock.writeLock().lock(); + Set updateRequests; + try { + updateRequests = new HashSet<>(limitProcessing); + limitProcessing.clear(); + } finally { + limitLock.writeLock().unlock(); + } + updateRequests.iterator().forEachRemaining(request -> { + try { + TooManyRequestsTelegramResponse.INSTANCE.process(request); + } catch (Exception e) { + log.error("Cannot process response to telegram for request {}", request, e); + } + }); + } + + /** + * {@code UserProcessingExpiry} is responsible for determining the expiration time for user + * processing semaphores. + */ + private static class UserProcessingExpiry extends AbstractExpiry { + + private final long duration; + private final TimeUnit unit; + private final int maxThreadsPerUser; + + /** + * Constructs a {@code UserProcessingExpiry} with specified parameters. + * + * @param duration the duration for which the semaphore is valid. + * @param unit the time unit of the duration. + * @param maxThreadsPerUser the maximum number of threads allowed per user. + */ + private UserProcessingExpiry(long duration, TimeUnit unit, int maxThreadsPerUser) { + this.duration = duration; + this.unit = unit; + this.maxThreadsPerUser = maxThreadsPerUser; + } + + protected long getDuration(Semaphore semaphore) { + return semaphore.availablePermits() == maxThreadsPerUser + ? unit.toNanos(duration) + : Long.MAX_VALUE; + } + } + + /** + * {@code AbstractExpiry} provides a base implementation for expiry policies for caching + * mechanisms. + * + * @param the type of value associated with the key. + */ + private static abstract class AbstractExpiry implements Expiry { + + @Override + public long expireAfterCreate(Long key, T value, long currentTime) { + return getDuration(value); + } + + @Override + public long expireAfterUpdate(Long key, T value, long currentTime, + @NonNegative long currentDuration) { + return getDuration(value); + } + + @Override + public long expireAfterRead(Long key, T value, long currentTime, + @NonNegative long currentDuration) { + return getDuration(value); + } + + protected abstract long getDuration(T updates); + } +} diff --git a/src/main/java/io/github/drednote/telegram/session/LongPollingSession.java b/src/main/java/io/github/drednote/telegram/session/LongPollingSession.java index 6f7f93b..6ed0fdd 100644 --- a/src/main/java/io/github/drednote/telegram/session/LongPollingSession.java +++ b/src/main/java/io/github/drednote/telegram/session/LongPollingSession.java @@ -1,7 +1,6 @@ package io.github.drednote.telegram.session; import io.github.drednote.telegram.TelegramProperties; -import io.github.drednote.telegram.core.TelegramBot; import io.github.drednote.telegram.utils.Assert; import java.util.Collections; import java.util.List; @@ -22,7 +21,7 @@ *

This class implements the {@link TelegramBotSession} interface to provide methods for * starting and stopping a long polling session with the Telegram Bot API. It utilizes a * {@link TelegramClient} to fetch updates from the Telegram server and processes them using a - * {@link TelegramBot}. + * {@link TelegramUpdateProcessor}. * *

The class is responsible for scheduling and executing the polling loop, processing updates, * and handling exceptions that may occur during the session and not caught with exception handling @@ -30,10 +29,10 @@ * * @author Ivan Galushko * @see TelegramClient - * @see TelegramBot + * @see TelegramUpdateProcessor * @see SessionProperties */ -public class LongPollingSession extends DefaultTelegramBotSession implements Runnable { +public class LongPollingSession implements TelegramBotSession, Runnable { private static final Logger log = LoggerFactory.getLogger(LongPollingSession.class); @@ -43,6 +42,7 @@ public class LongPollingSession extends DefaultTelegramBotSession implements Run protected final SessionProperties sessionProperties; protected final TelegramProperties telegramProperties; private final BackOff backOff; + private final TelegramUpdateProcessor processor; private int lastReceivedUpdate = 0; @@ -63,14 +63,15 @@ public class LongPollingSession extends DefaultTelegramBotSession implements Run public LongPollingSession( TelegramClient telegramClient, SessionProperties properties, TelegramProperties telegramProperties, BackOff backOff, - TelegramBot bot + TelegramUpdateProcessor processor ) { - super(properties, bot); Assert.required(telegramClient, "TelegramClient"); Assert.required(properties, "SessionProperties"); Assert.required(telegramProperties, "TelegramProperties"); Assert.required(backOff, "BackOff"); + Assert.required(processor, "TelegramUpdateProcessor"); + this.processor = processor; this.backOff = backOff; this.telegramProperties = telegramProperties; this.sessionProperties = properties; @@ -128,7 +129,7 @@ public void run() { .map(Update::getUpdateId) .max(Integer::compareTo) .orElse(0); - processUpdates(updates); + processor.process(updates); } } catch (Exception global) { log.error(global.getLocalizedMessage(), global); diff --git a/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java b/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java index 0bc54e8..9891990 100644 --- a/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java +++ b/src/main/java/io/github/drednote/telegram/session/SessionAutoConfiguration.java @@ -2,6 +2,7 @@ import io.github.drednote.telegram.TelegramProperties; import io.github.drednote.telegram.core.TelegramBot; +import io.github.drednote.telegram.filter.FilterProperties; import io.github.drednote.telegram.session.SessionProperties.ProxyType; import io.github.drednote.telegram.session.SessionProperties.ProxyUrl; import java.lang.reflect.InvocationTargetException; @@ -47,7 +48,6 @@ public class SessionAutoConfiguration { * Configures a bean for the Telegram bot session using long polling. And starts session * * @param telegramClient The Telegram client used to interact with the Telegram API - * @param bot The Telegram bot instance * @param properties Configuration properties for the session * @return The configured Telegram bot session */ @@ -59,16 +59,15 @@ public class SessionAutoConfiguration { matchIfMissing = true ) @ConditionalOnMissingBean - @ConditionalOnSingleCandidate(TelegramBot.class) public TelegramBotSession longPollingTelegramBotSession( - TelegramClient telegramClient, TelegramBot bot, SessionProperties properties, - TelegramProperties telegramProperties + TelegramClient telegramClient, SessionProperties properties, + TelegramProperties telegramProperties, TelegramUpdateProcessor processor ) { try { Class backOffClazz = properties.getBackOffStrategy(); BackOff backOff = backOffClazz.getDeclaredConstructor().newInstance(); LongPollingSession session = new LongPollingSession( - telegramClient, properties, telegramProperties, backOff, bot); + telegramClient, properties, telegramProperties, backOff, processor); session.start(); return session; } catch (InstantiationException | IllegalAccessException | InvocationTargetException | @@ -96,6 +95,17 @@ public TelegramBotSession webhooksTelegramBotSession() { throw new UnsupportedOperationException("Webhooks not implemented yet"); } + @Bean + @ConditionalOnMissingBean + @ConditionalOnSingleCandidate(TelegramBot.class) + public TelegramUpdateProcessor telegramUpdateProcessor( + SessionProperties properties, FilterProperties filterProperties, TelegramBot telegramBot, + org.telegram.telegrambots.meta.generics.TelegramClient telegramClient + ) { + return new DefaultTelegramUpdateProcessor( + properties, filterProperties, telegramBot, telegramClient); + } + /** * Configures a bean for the Telegram client to interact with the Telegram API. * @@ -118,7 +128,8 @@ private static HttpServiceProxyFactory getFactory(Consumer additionalSe additionalSettings.accept(builder); RestClient restClient = builder - .baseUrl(TelegramUrl.DEFAULT_URL.getSchema() + "://" + TelegramUrl.DEFAULT_URL.getHost()) + .baseUrl( + TelegramUrl.DEFAULT_URL.getSchema() + "://" + TelegramUrl.DEFAULT_URL.getHost()) .build(); RestClientAdapter adapter = RestClientAdapter.create(restClient); diff --git a/src/main/java/io/github/drednote/telegram/session/SessionProperties.java b/src/main/java/io/github/drednote/telegram/session/SessionProperties.java index cd48c57..35bbb1a 100644 --- a/src/main/java/io/github/drednote/telegram/session/SessionProperties.java +++ b/src/main/java/io/github/drednote/telegram/session/SessionProperties.java @@ -35,6 +35,12 @@ public class SessionProperties { */ @NonNull private int consumeMaxThreads = 10; + /** + * Limits the number of updates to be store in memory queue for update processing. 0 - no limit. + * Defaults to (consumeMaxThreads * 1.5). + */ + @NonNull + private int maxMessagesInQueue = 15; /** * Max number of threads used for consumption messages from a telegram for concrete user. 0 - no * restrictions. @@ -42,17 +48,17 @@ public class SessionProperties { @NonNull private int maxThreadsPerUser = 1; /** - * Cache lifetime used in {@link DefaultTelegramBotSession}. This parameter needed just for + * Cache lifetime used in {@link DefaultTelegramUpdateProcessor}. This parameter needed just to * delete staled buckets to free up memory * - * @see DefaultTelegramBotSession + * @see DefaultTelegramUpdateProcessor */ @NonNull private int cacheLiveDuration = 1; /** * The {@link TimeUnit} which will be applied to {@link #cacheLiveDuration} * - * @see DefaultTelegramBotSession + * @see DefaultTelegramUpdateProcessor */ @NonNull private TimeUnit cacheLiveDurationUnit = TimeUnit.HOURS; @@ -123,11 +129,6 @@ public static class LongPollingSessionProperties { */ @NonNull private int updateTimeout = 50; - /** - * Limits the number of updates to be store in memory queue for update processing. - */ - @NonNull - private int maxMessagesInQueue = 50; /** * A JSON-serialized list of the update types you want your bot to receive. For example, * specify [“message”, “edited_channel_post”, “callback_query”] to only receive updates of diff --git a/src/main/java/io/github/drednote/telegram/session/TelegramUpdateProcessor.java b/src/main/java/io/github/drednote/telegram/session/TelegramUpdateProcessor.java new file mode 100644 index 0000000..6526c4e --- /dev/null +++ b/src/main/java/io/github/drednote/telegram/session/TelegramUpdateProcessor.java @@ -0,0 +1,9 @@ +package io.github.drednote.telegram.session; + +import java.util.List; +import org.telegram.telegrambots.meta.api.objects.Update; + +public interface TelegramUpdateProcessor { + + void process(List update); +} diff --git a/src/main/java/io/github/drednote/telegram/session/UserRateLimitRequestFilter.java b/src/main/java/io/github/drednote/telegram/session/UserRateLimitRequestFilter.java new file mode 100644 index 0000000..05393ee --- /dev/null +++ b/src/main/java/io/github/drednote/telegram/session/UserRateLimitRequestFilter.java @@ -0,0 +1,114 @@ +package io.github.drednote.telegram.session; + +import static org.apache.commons.lang3.ObjectUtils.max; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.Refill; +import io.github.bucket4j.local.LocalBucket; +import io.github.bucket4j.local.SynchronizationStrategy; +import io.github.drednote.telegram.filter.FilterProperties; +import io.github.drednote.telegram.utils.Assert; +import java.time.Duration; +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; + +/** + * Implementation of a priority pre-update filter for rate-limiting user requests. + * + *

This filter uses a caching mechanism to manage rate limits for individual users. It tracks + * the rate of incoming requests per user, and if the rate limit is exceeded, it returns false, + * otherwise true. + * + * @author Ivan Galushko + */ +public class UserRateLimitRequestFilter { + + private final FilterProperties filterProperties; + private final Cache cache; + private final Duration duration; + /** + * Callback that called on remove cached LocalBucket. Using in testing, but you can use this if + * you needed + */ + @Nullable + private RemovalListener cacheEvictionCallback; + + /** + * Constructs a {@code UserRateLimitRequestFilter} with the specified {@link FilterProperties} + * + * @param filterProperties The filter properties for rate-limiting user requests. + * @throws IllegalArgumentException if filterProperties is null. + */ + public UserRateLimitRequestFilter(FilterProperties filterProperties) { + Assert.required(filterProperties, "FilterProperties"); + + this.filterProperties = filterProperties; + this.duration = Duration.of(filterProperties.getUserRateLimit(), + filterProperties.getUserRateLimitUnit()); + this.cache = Caffeine.newBuilder() + .expireAfterAccess(getCacheLiveDuration()) + .evictionListener(this::onRemoval) + .build(); + } + + private Duration getCacheLiveDuration() { + Duration cacheExpireDuration = Duration.of(filterProperties.getUserRateLimitCacheExpire(), + filterProperties.getUserRateLimitCacheExpireUnit()); + return max(cacheExpireDuration, duration); + } + + /** + * Pre-filters the incoming Telegram update userId to rate-limit user requests. + * + *

This method checks the rate of incoming requests per user using a caching mechanism. + * If the rate limit is exceeded, it returns false, otherwise true. + * + * @param userId The incoming Telegram update userId to be pre-filtered, not null + * @return if the rate limit is exceeded, it returns false, otherwise true. + */ + public boolean filter(@NonNull Long userId) { + Assert.notNull(userId, "userId"); + + if (filterProperties.getUserRateLimit() > 0) { + LocalBucket bucket = getBucket(userId); + return bucket.tryConsume(1); + } + return true; + } + + private LocalBucket getBucket(Long chatId) { + return cache.get(chatId, key -> + Bucket.builder() + .addLimit(Bandwidth.classic(1, Refill.intervally(1, this.duration))) + .withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED) + .withMillisecondPrecision() + .build() + ); + } + + private void onRemoval(@Nullable Long key, @Nullable LocalBucket value, RemovalCause cause) { + if (cacheEvictionCallback != null) { + cacheEvictionCallback.onRemoval(key, value, cause); + } + } + + /** + * Sets the cache eviction callback for this filter. + * + *

Using in testing, but you can use this if you needed + * + * @param cacheEvictionCallback The cache eviction callback to be set, not null + * @throws IllegalArgumentException if cacheEvictionCallback is null + */ + public void setCacheEvictionCallback( + RemovalListener cacheEvictionCallback + ) { + Assert.notNull(cacheEvictionCallback, "RemovalListener"); + this.cacheEvictionCallback = cacheEvictionCallback; + } +} diff --git a/src/test/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilterTest.java b/src/test/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilterTest.java index 45cd65e..43725dc 100644 --- a/src/test/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilterTest.java +++ b/src/test/java/io/github/drednote/telegram/filter/pre/UserRateLimitRequestFilterTest.java @@ -2,13 +2,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; import io.github.drednote.telegram.core.request.DefaultUpdateRequest; import io.github.drednote.telegram.filter.FilterProperties; +import io.github.drednote.telegram.session.UserRateLimitRequestFilter; import io.github.drednote.telegram.support.UpdateRequestUtils; import io.github.drednote.telegram.support.UpdateUtils; -import io.github.drednote.telegram.response.TooManyRequestsTelegramResponse; import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Test; @@ -16,7 +15,7 @@ class UserRateLimitRequestFilterTest { @Test - void testPreFilterWithOnceAccess() { + void testFilterWithOnceAccess() { FilterProperties filterProperties = new FilterProperties(); filterProperties.setUserRateLimitUnit(ChronoUnit.MINUTES); filterProperties.setUserRateLimit(10L); @@ -26,13 +25,13 @@ void testPreFilterWithOnceAccess() { DefaultUpdateRequest request = UpdateRequestUtils.createMockRequest( UpdateUtils.createEmpty()); - filter.preFilter(request); + filter.filter(request.getUser().getId()); assertNull(request.getResponse()); } @Test - void testPreFilterWithInvalidDuration() throws InterruptedException { + void testFilterWithInvalidDuration() throws InterruptedException { FilterProperties filterProperties = new FilterProperties(); filterProperties.setUserRateLimitUnit(ChronoUnit.SECONDS); filterProperties.setUserRateLimit(100L); @@ -42,15 +41,13 @@ void testPreFilterWithInvalidDuration() throws InterruptedException { DefaultUpdateRequest request = UpdateRequestUtils.createMockRequest( UpdateUtils.createEmpty()); - filter.preFilter(request); + assertThat(filter.filter(request.getUser().getId())).isTrue(); Thread.sleep(50); - filter.preFilter(request); - - assertSame(TooManyRequestsTelegramResponse.INSTANCE, request.getResponse()); + assertThat(filter.filter(request.getUser().getId())).isFalse(); } @Test - void testPreFilterWithValidDuration() throws InterruptedException { + void testFilterWithValidDuration() throws InterruptedException { FilterProperties filterProperties = new FilterProperties(); filterProperties.setUserRateLimitUnit(ChronoUnit.MILLIS); filterProperties.setUserRateLimit(10L); @@ -60,9 +57,9 @@ void testPreFilterWithValidDuration() throws InterruptedException { DefaultUpdateRequest request = UpdateRequestUtils.createMockRequest( UpdateUtils.createEmpty()); - filter.preFilter(request); + filter.filter(request.getUser().getId()); Thread.sleep(50); - filter.preFilter(request); + filter.filter(request.getUser().getId()); assertNull(request.getResponse()); } @@ -82,9 +79,9 @@ void shouldCleanStaledObjectsCorrect() throws InterruptedException { DefaultUpdateRequest request = UpdateRequestUtils.createMockRequest( UpdateUtils.createEmpty()); - filter.preFilter(request); + filter.filter(request.getUser().getId()); Thread.sleep(50); - filter.preFilter(request); + filter.filter(request.getUser().getId()); assertThat(key.get()).isEqualTo(2L); } } \ No newline at end of file diff --git a/src/test/java/io/github/drednote/telegram/response/GenericTelegramResponseTest.java b/src/test/java/io/github/drednote/telegram/response/GenericTelegramResponseTest.java index 1484b5e..e77d891 100644 --- a/src/test/java/io/github/drednote/telegram/response/GenericTelegramResponseTest.java +++ b/src/test/java/io/github/drednote/telegram/response/GenericTelegramResponseTest.java @@ -1,7 +1,5 @@ package io.github.drednote.telegram.response; -import static org.telegram.telegrambots.meta.api.methods.ParseMode.MARKDOWN; - import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import io.github.drednote.telegram.support.UpdateRequestUtils; @@ -30,7 +28,7 @@ void shouldCorrectSendString() throws TelegramApiException { response.process(request); SendMessage sendMessage = new SendMessage(update.getMessage().getChatId().toString(), text); - sendMessage.setParseMode(MARKDOWN); + sendMessage.setParseMode(null); Mockito.verify(absSender).execute(sendMessage); } @@ -46,7 +44,7 @@ void shouldCorrectSendBytes() throws TelegramApiException { response.process(request); SendMessage sendMessage = new SendMessage(update.getMessage().getChatId().toString(), new String(text, StandardCharsets.UTF_8)); - sendMessage.setParseMode(MARKDOWN); + sendMessage.setParseMode(null); Mockito.verify(absSender).execute(sendMessage); } @@ -87,7 +85,7 @@ void shouldCorrectSendGeneric() throws TelegramApiException { response.process(request); SendMessage sendMessage = new SendMessage(update.getMessage().getChatId().toString(), "text = 1"); - sendMessage.setParseMode(MARKDOWN); + sendMessage.setParseMode(null); Mockito.verify(absSender).execute(sendMessage); } diff --git a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramBotSessionTest.java b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramBotSessionTest.java deleted file mode 100644 index cdcdb65..0000000 --- a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramBotSessionTest.java +++ /dev/null @@ -1,107 +0,0 @@ -package io.github.drednote.telegram.session; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; -import io.github.drednote.telegram.core.TelegramBot; -import io.github.drednote.telegram.session.SessionProperties.LongPollingSessionProperties; -import io.github.drednote.telegram.support.builder.UpdateBuilder; -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.slf4j.LoggerFactory; -import org.telegram.telegrambots.meta.api.objects.Update; - -class DefaultTelegramBotSessionTest { - - DefaultTelegramBotSession session; - - TelegramBot telegramBot = Mockito.mock(TelegramBot.class); - - @BeforeEach - void setUp() { - Logger rootLogger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); - rootLogger.setLevel(Level.ALL); - - SessionProperties sessionProperties = new SessionProperties(); - LongPollingSessionProperties longPolling = new LongPollingSessionProperties(); - sessionProperties.setLongPolling(longPolling); - - longPolling.setMaxMessagesInQueue(20); - sessionProperties.setConsumeMaxThreads(5); - sessionProperties.setMaxThreadsPerUser(1); - sessionProperties.setCacheLiveDuration(30); - sessionProperties.setCacheLiveDurationUnit(TimeUnit.MILLISECONDS); - - session = new DefaultTelegramBotSession(sessionProperties, telegramBot) { - @Override - public void start() { - - } - - @Override - public void stop() { - - } - }; - } - - @Test - void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException { - List generate = generate(50); - doAnswer(answer -> { - Thread.sleep(10); - return null; - }).when(telegramBot).onUpdateReceived(any()); - - session.processUpdates(generate); - - for (int i = 0; i < 70; i++) { - Thread.sleep(10); - assertThat(session.updatesCount).hasValueLessThan(21); - } - - verify(telegramBot, times(50)).onUpdateReceived(any()); - } - - @Test - void shouldExecuteUpdateSequentially() throws InterruptedException { - List dateTimes = new ArrayList<>(); - List generate = generate(5, 1L); - doAnswer(answer -> { - Thread.sleep(10); - dateTimes.add(LocalDateTime.now()); - return null; - }).when(telegramBot).onUpdateReceived(any()); - - session.processUpdates(generate); - - Thread.sleep(300); - - verify(telegramBot, times(5)).onUpdateReceived(any()); - assertThat(dateTimes).hasSize(5); - for (int i = 0; i < dateTimes.size() - 1; i++) { - LocalDateTime dateTime = dateTimes.get(i); - LocalDateTime dateTimeNext = dateTimes.get(i + 1); - assertThat(Duration.between(dateTime, dateTimeNext)).isGreaterThan(Duration.ofMillis(10)); - } - } - - private List generate(int count, long... id) { - return Stream.iterate(0, i -> i < count, i -> i + 1) - .map(i -> UpdateBuilder.create().withUpdateId(i).withUser(id.length > 0 ? id[0] : i).message()) - .collect(Collectors.toCollection(ArrayList::new)); - } -} \ No newline at end of file diff --git a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java new file mode 100644 index 0000000..877f8bd --- /dev/null +++ b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java @@ -0,0 +1,120 @@ +package io.github.drednote.telegram.session; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import io.github.drednote.telegram.core.TelegramBot; +import io.github.drednote.telegram.filter.FilterProperties; +import io.github.drednote.telegram.support.builder.UpdateBuilder; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.Timeout.ThreadMode; +import org.mockito.Mockito; +import org.slf4j.LoggerFactory; +import org.telegram.telegrambots.meta.api.objects.Update; +import org.telegram.telegrambots.meta.generics.TelegramClient; + +@Slf4j +class DefaultTelegramUpdateProcessorTest { + + DefaultTelegramUpdateProcessor session; + + TelegramBot telegramBot = Mockito.mock(TelegramBot.class); + TelegramClient telegramClient; + private SessionProperties sessionProperties; + private FilterProperties filterProperties; + + @BeforeEach + void setUp() { + telegramClient = Mockito.mock(TelegramClient.class); + Logger rootLogger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + rootLogger.setLevel(Level.ALL); + + sessionProperties = new SessionProperties(); + + sessionProperties.setMaxMessagesInQueue(20); + sessionProperties.setConsumeMaxThreads(5); + sessionProperties.setMaxThreadsPerUser(1); + sessionProperties.setCacheLiveDuration(30); + sessionProperties.setCacheLiveDurationUnit(TimeUnit.MILLISECONDS); + + filterProperties = new FilterProperties(); + filterProperties.setUserRateLimitUnit(ChronoUnit.MILLIS); + } + + @RepeatedTest(20) + @Timeout(value = 2, threadMode = ThreadMode.SEPARATE_THREAD) + void shouldWaitIfMaxQueueSizeExceed() throws InterruptedException { + sessionProperties.setCacheLiveDuration(100); + session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, + telegramBot, telegramClient); + + List generate = generate(50); + doAnswer(answer -> { + Thread.sleep(10); + return null; + }).when(telegramBot).onUpdateReceived(any()); + + session.process(generate); + + Thread.sleep(200); + + verify(telegramBot, times(50)).onUpdateReceived(any()); + } + + @Test + void shouldExecuteUpdateOnlyOneOthersRejected() throws InterruptedException { + sessionProperties.setCacheLiveDuration(1500); + session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, + telegramBot, telegramClient); + List generate = generate(5, 1L); + doAnswer(answer -> { + Thread.sleep(100); + return null; + }).when(telegramBot).onUpdateReceived(any()); + + session.process(generate); + + Thread.sleep(200); + + verify(telegramBot, times(1)).onUpdateReceived(any()); + } + + @Test + void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() { + sessionProperties.setCacheLiveDuration(1500); + filterProperties.setUserRateLimit(2000); + sessionProperties.setMaxThreadsPerUser(5); + session = new DefaultTelegramUpdateProcessor(sessionProperties, filterProperties, + telegramBot, telegramClient); + List generate = generate(5, 1L); + doAnswer(answer -> { + Thread.sleep(1300); + return null; + }).when(telegramBot).onUpdateReceived(any()); + + session.process(generate); + + verify(telegramBot, times(1)).onUpdateReceived(any()); + } + + private List generate(int count, long... id) { + return Stream.iterate(0, i -> i < count, i -> i + 1) + .map(i -> UpdateBuilder.create().withUpdateId(i).withUser(id.length > 0 ? id[0] : i) + .message()) + .collect(Collectors.toCollection(ArrayList::new)); + } +} \ No newline at end of file From efba885bf00096104bb6c4873989a50049cb3a7a Mon Sep 17 00:00:00 2001 From: Ivan Galushko Date: Thu, 10 Oct 2024 18:07:07 +0300 Subject: [PATCH 2/2] [FIX]: default session and jar name --- .../session/DefaultTelegramUpdateProcessorTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java index 877f8bd..48c1842 100644 --- a/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java +++ b/src/test/java/io/github/drednote/telegram/session/DefaultTelegramUpdateProcessorTest.java @@ -94,7 +94,7 @@ void shouldExecuteUpdateOnlyOneOthersRejected() throws InterruptedException { } @Test - void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() { + void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() throws InterruptedException { sessionProperties.setCacheLiveDuration(1500); filterProperties.setUserRateLimit(2000); sessionProperties.setMaxThreadsPerUser(5); @@ -102,12 +102,14 @@ void shouldExecuteUpdateOnlyOneOthersRejectedByRateLimit() { telegramBot, telegramClient); List generate = generate(5, 1L); doAnswer(answer -> { - Thread.sleep(1300); + Thread.sleep(20); return null; }).when(telegramBot).onUpdateReceived(any()); session.process(generate); + Thread.sleep(50); + verify(telegramBot, times(1)).onUpdateReceived(any()); }