diff --git a/turms-plugins/turms-plugin-antispam/src/main/java/im/turms/plugin/antispam/AntiSpamHandler.java b/turms-plugins/turms-plugin-antispam/src/main/java/im/turms/plugin/antispam/AntiSpamHandler.java index cedc599c42..b5201a0392 100644 --- a/turms-plugins/turms-plugin-antispam/src/main/java/im/turms/plugin/antispam/AntiSpamHandler.java +++ b/turms-plugins/turms-plugin-antispam/src/main/java/im/turms/plugin/antispam/AntiSpamHandler.java @@ -81,7 +81,7 @@ public AntiSpamHandler(AntiSpamProperties properties) { } @Override - protected void onStarted() { + protected Mono onStarted() { AntiSpamProperties properties = loadProperties(AntiSpamProperties.class); enabled = properties.isEnabled(); textPreprocessor = new TextPreprocessor(properties.getTextParsingStrategy()); @@ -95,6 +95,7 @@ protected void onStarted() { : null; textTypeToProperties = createTextTypeToPropertiesMap(properties.getTextTypes(), properties.getSilentIllegalTextTypes()); + return Mono.empty(); } private Map createTextTypeToPropertiesMap( diff --git a/turms-plugins/turms-plugin-minio/src/main/java/im/turms/plugin/minio/MinioStorageServiceProvider.java b/turms-plugins/turms-plugin-minio/src/main/java/im/turms/plugin/minio/MinioStorageServiceProvider.java index 2d885c2bb2..6f81ce622b 100644 --- a/turms-plugins/turms-plugin-minio/src/main/java/im/turms/plugin/minio/MinioStorageServiceProvider.java +++ b/turms-plugins/turms-plugin-minio/src/main/java/im/turms/plugin/minio/MinioStorageServiceProvider.java @@ -48,6 +48,7 @@ import org.springframework.context.ApplicationContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import im.turms.plugin.minio.core.BucketPolicy; import im.turms.plugin.minio.core.BucketPolicyAction; @@ -85,6 +86,8 @@ import im.turms.server.common.infra.security.MacUtil; import im.turms.server.common.infra.time.DateRange; import im.turms.server.common.infra.time.DurationConst; +import im.turms.server.common.infra.tracing.TracingCloseableContext; +import im.turms.server.common.infra.tracing.TracingContext; import im.turms.server.common.storage.mongo.TurmsMongoClient; import im.turms.service.domain.group.service.GroupMemberService; import im.turms.service.domain.storage.bo.StorageResourceInfo; @@ -122,8 +125,10 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora * @implNote 1. We use HMAC(key, message) instead of a HASH(key + message) to avoid the length * extension attack. To put simply, if a hacker knows the signature of the resource * "1", and he can also know the signature of resource "12", "13", "123", and so on - * without knowledge of the key. 2. Use MD5 because its output size (128 bits) is - * small, and it is a 22-character Base62-encoded string. + * without knowledge of the key. + *

+ * 2. Use MD5 because its output size (128 bits) is small, and it is a 22-character + * Base62-encoded string. */ private boolean isMacEnabled; @Nullable @@ -156,30 +161,30 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora } @Override - public void onStarted() { - setUp(); + public Mono onStarted() { + return setUp(); } - private void setUp() { + private Mono setUp() { MinioStorageProperties properties = loadProperties(MinioStorageProperties.class); if (!properties.isEnabled()) { - return; + return Mono.empty(); } String endpoint = properties.getEndpoint(); URI uri; try { uri = new URI(endpoint); } catch (URISyntaxException e) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "Illegal endpoint URL: " + endpoint, - e); + e)); } if (!uri.isAbsolute()) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "The endpoint URL (" + endpoint - + ") must be absolute"); + + ") must be absolute")); } ApplicationContext context = getContext(); node = context.getBean(Node.class); @@ -217,15 +222,15 @@ private void setUp() { key = Base64.getDecoder() .decode(base64Key); } catch (Exception e) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "The HMAC key must be Base64-encoded, but got: " + base64Key, - e); + e)); } if (key.length < 16) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "The length of HMAC key must be greater than or equal to 16, but got: " - + key.length); + + key.length)); } macKey = new SecretKeySpec(key, "HmacMD5"); } else { @@ -235,42 +240,32 @@ private void setUp() { properties.getRegion(), properties.getAccessKey(), properties.getSecretKey()); - Duration timeout = Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS); - try { - initBuckets().block(timeout); - isServing = true; - } catch (Exception e) { - MinioStorageProperties.Retry retry = properties.getRetry(); - int maxAttempts = retry.getMaxAttempts(); - if (!retry.isEnabled() || maxAttempts <= 0) { - throw new RuntimeException("Failed to initialize the MinIO client", e); - } - LOGGER.error("Failed to initialize the MinIO client. Retry times: 0", e); - try { - Thread.sleep(retry.getInitialIntervalMillis()); - } catch (InterruptedException ex) { - throw new RuntimeException("Failed to initialize the MinIO client", e); - } - for (int currentRetryTimes = 1; currentRetryTimes <= maxAttempts; currentRetryTimes++) { - try { - initBuckets().block(timeout); - } catch (Exception ex) { - LOGGER.error("Failed to initialize the MinIO client. Retry times: " - + currentRetryTimes, ex); - if (currentRetryTimes == maxAttempts) { - throw new RuntimeException( - "Failed to initialize the MinIO client with retries exhausted: " - + maxAttempts); + + Mono initBuckets = + initBuckets().timeout(Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS)) + .doOnSuccess(unused -> isServing = true); + + MinioStorageProperties.Retry retry = properties.getRetry(); + int maxAttempts = retry.getMaxAttempts(); + if (!retry.isEnabled() || maxAttempts <= 0) { + return initBuckets.onErrorMap(t -> true, + t -> new RuntimeException("Failed to initialize the MinIO client", t)); + } + return initBuckets.retryWhen(Retry.max(maxAttempts) + .doBeforeRetryAsync(retrySignal -> Mono.deferContextual(contextView -> { + long totalRetries = retrySignal.totalRetries(); + try (TracingCloseableContext ignored = + TracingContext.getCloseableContext(contextView)) { + LOGGER.error("Failed to initialize the MinIO client. Retry times: " + + totalRetries, retrySignal.failure()); } - try { - Thread.sleep(retry.getIntervalMillis()); - } catch (InterruptedException ignored) { - throw new RuntimeException("Failed to initialize the MinIO client", ex); + if (0 == totalRetries) { + return Mono.delay(Duration.ofMillis(retry.getInitialIntervalMillis())) + .then(); } - } - } - isServing = true; - } + return Mono.delay(Duration.ofMillis(retry.getIntervalMillis())) + .then(); + }))); } private void initClient(String endpoint, String region, String accessKey, String secretKey) { diff --git a/turms-plugins/turms-plugin-push/src/main/java/im/turms/plugin/push/NotificationPusher.java b/turms-plugins/turms-plugin-push/src/main/java/im/turms/plugin/push/NotificationPusher.java index 12b40787b8..fb87d7bc1d 100644 --- a/turms-plugins/turms-plugin-push/src/main/java/im/turms/plugin/push/NotificationPusher.java +++ b/turms-plugins/turms-plugin-push/src/main/java/im/turms/plugin/push/NotificationPusher.java @@ -38,7 +38,6 @@ import im.turms.server.common.infra.logging.core.logger.Logger; import im.turms.server.common.infra.logging.core.logger.LoggerFactory; import im.turms.server.common.infra.plugin.TurmsExtension; -import im.turms.server.common.infra.time.DurationConst; import im.turms.server.common.infra.tracing.TracingCloseableContext; import im.turms.server.common.infra.tracing.TracingContext; import im.turms.service.access.servicerequest.dto.RequestHandlerResult; @@ -59,19 +58,19 @@ public class NotificationPusher extends TurmsExtension implements RequestHandler private List deviceTokenFieldNames; @Override - protected void onStarted() { + protected Mono onStarted() { PushNotificationProperties properties = loadProperties(PushNotificationProperties.class); manager = new PushNotificationManager(properties); userService = getContext().getBean(UserService.class); userStatusService = getContext().getBean(UserStatusService.class); deviceTokenFieldNames = manager.getDeviceTokenFieldNames(); + return Mono.empty(); } @Override - protected void onStopped() { - manager.close() - .block(DurationConst.ONE_MINUTE); + protected Mono onStopped() { + return manager.close(); } @Override diff --git a/turms-plugins/turms-plugin-rasa/src/main/java/im/turms/plugin/rasa/RasaResponser.java b/turms-plugins/turms-plugin-rasa/src/main/java/im/turms/plugin/rasa/RasaResponser.java index 4dfc94d55a..a30a508844 100644 --- a/turms-plugins/turms-plugin-rasa/src/main/java/im/turms/plugin/rasa/RasaResponser.java +++ b/turms-plugins/turms-plugin-rasa/src/main/java/im/turms/plugin/rasa/RasaResponser.java @@ -64,19 +64,19 @@ public class RasaResponser extends TurmsExtension implements RequestHandlerResul private Map idToClientInfo; @Override - public void onStarted() { - setUp(); + public Mono onStarted() { + return setUp(); } - private void setUp() { + private Mono setUp() { RasaProperties properties = loadProperties(RasaProperties.class); if (!properties.isEnabled() || properties.getInstanceFindStrategy() != InstanceFindStrategy.PROPERTY) { - return; + return Mono.empty(); } List instancePropertiesList = properties.getInstances(); if (instancePropertiesList.isEmpty()) { - return; + return Mono.empty(); } int size = instancePropertiesList.size(); Map uriToClientInfo = CollectionUtil.newMapWithExpectedSize(size); @@ -87,10 +87,10 @@ private void setUp() { try { uri = new URI(url); } catch (URISyntaxException e) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "Illegal endpoint URL: " + url, - e); + e)); } int requestTimeoutMillis = instanceProperties.getRequest() .getTimeoutMillis(); @@ -101,15 +101,16 @@ private void setUp() { Long chatbotUserId = instanceProperties.getChatbotUserId(); RasaClientInfo existingClientInfo = idToClientInfo.put(chatbotUserId, newClientInfo); if (existingClientInfo != null) { - throw new IllegalArgumentException( + return Mono.error(new IllegalArgumentException( "Found a duplicate chatbot user ID: " - + chatbotUserId); + + chatbotUserId)); } } this.idToClientInfo = Map.copyOf(idToClientInfo); chatbotUserIds = CollectionUtil.toImmutableSet(idToClientInfo.keySet()); ApplicationContext context = getContext(); messageService = context.getBean(MessageService.class); + return Mono.empty(); } @Override diff --git a/turms-server-common/src/main/java/im/turms/server/common/access/admin/dto/response/HttpHandlerResult.java b/turms-server-common/src/main/java/im/turms/server/common/access/admin/dto/response/HttpHandlerResult.java index d15ab5371c..f8c03f02f8 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/access/admin/dto/response/HttpHandlerResult.java +++ b/turms-server-common/src/main/java/im/turms/server/common/access/admin/dto/response/HttpHandlerResult.java @@ -143,6 +143,19 @@ public static HttpHandlerResult> updateResult(long return okIfTruthy(new UpdateResultDTO(modifiedCount, modifiedCount)); } + public static Mono>> updateResultByIntegerMono( + Mono data) { + return okIfTruthy(data.map(number -> { + Long count = number.longValue(); + return new UpdateResultDTO(count, count); + })); + } + + public static Mono>> updateResultByLongMono( + Mono data) { + return okIfTruthy(data.map(number -> new UpdateResultDTO(number, number))); + } + public static Mono>> deleteResult( Mono data) { return okIfTruthy(data.map(DeleteResultDTO::get)); diff --git a/turms-server-common/src/main/java/im/turms/server/common/domain/plugin/access/admin/controller/PluginController.java b/turms-server-common/src/main/java/im/turms/server/common/domain/plugin/access/admin/controller/PluginController.java index b78213c660..18af72ae7a 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/domain/plugin/access/admin/controller/PluginController.java +++ b/turms-server-common/src/main/java/im/turms/server/common/domain/plugin/access/admin/controller/PluginController.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Set; +import reactor.core.publisher.Mono; + import im.turms.server.common.access.admin.dto.response.HttpHandlerResult; import im.turms.server.common.access.admin.dto.response.ResponseDTO; import im.turms.server.common.access.admin.dto.response.UpdateResultDTO; @@ -106,20 +108,20 @@ public HttpHandlerResult>> getPlugins( @PutMapping @RequiredPermission(AdminPermission.PLUGIN_UPDATE) - public HttpHandlerResult> updatePlugins( + public Mono>> updatePlugins( Set ids, @RequestBody UpdatePluginDTO updatePluginDTO) { UpdatePluginDTO.PluginStatus status = updatePluginDTO.status(); if (status == null) { - return HttpHandlerResult.okIfTruthy(UpdateResultDTO.NONE); + return HttpHandlerResult.okIfTruthy(Mono.just(UpdateResultDTO.NONE)); } - long count = switch (status) { + Mono count = switch (status) { case STARTED -> pluginManager.startPlugins(ids); case STOPPED -> pluginManager.stopPlugins(ids); case RESUMED -> pluginManager.resumePlugins(ids); case PAUSED -> pluginManager.pausePlugins(ids); }; - return HttpHandlerResult.okIfTruthy(new UpdateResultDTO(count, count)); + return HttpHandlerResult.updateResultByIntegerMono(count); } @PostMapping("java") @@ -162,11 +164,10 @@ public HttpHandlerResult> createJsPlugins( @DeleteMapping @RequiredPermission(AdminPermission.PLUGIN_DELETE) - public HttpHandlerResult> deletePlugins( + public Mono>> deletePlugins( Set ids, boolean deleteLocalFiles) { - pluginManager.deletePlugins(ids, deleteLocalFiles); - return HttpHandlerResult.RESPONSE_OK; + return HttpHandlerResult.okIfTruthy(pluginManager.deletePlugins(ids, deleteLocalFiles)); } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsExtensionPointInvocationHandler.java b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsExtensionPointInvocationHandler.java index d095276da2..a4200cec42 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsExtensionPointInvocationHandler.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsExtensionPointInvocationHandler.java @@ -21,7 +21,7 @@ import java.lang.reflect.Method; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; +import jakarta.annotation.Nullable; import org.graalvm.polyglot.Value; import reactor.core.publisher.Mono; @@ -44,6 +44,7 @@ public JsExtensionPointInvocationHandler( this.extensionPointToFunction = extensionPointToFunction; } + @Nullable @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (OBJECT_METHODS.contains(method.getName())) { @@ -63,8 +64,8 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl Value function = nameToFunction.get(method.getName()); if (function == null) { if (isAsync) { - // Keep it simple because we have only - // the return type of Mono currently + // Keep it simple because we only use + // the return type of Mono currently. return Mono.empty(); } else if (void.class == returnType) { return null; @@ -97,38 +98,15 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } private Object parseReturnValue(boolean isAsync, Value returnValue) { - if (returnValue.getMetaObject() - .getMetaSimpleName() - .equals("Promise")) { - return Mono.create(sink -> { - try { - Consumer resolve = o -> { - if (o == null) { - sink.success(); - } else if (o instanceof Value v) { - sink.success(ValueDecoder.decode(v)); - } else { - sink.success(o); - } - }; - Consumer reject = - error -> sink.error(ValueDecoder.translateException(error)); - returnValue.invokeMember("then", resolve) - .invokeMember("catch", reject); - } catch (Exception e) { - sink.error(new ScriptExecutionException( - "Failed to run the promise", - e, - ScriptExceptionSource.HOST)); - } - }); - } Object val = ValueDecoder.decode(returnValue); if (isAsync) { + if (val instanceof Mono mono) { + return mono; + } return Mono.just(val); } else { return val; } } -} +} \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsTurmsExtensionAdaptor.java b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsTurmsExtensionAdaptor.java index 1d5a60d2cd..139f97d43d 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsTurmsExtensionAdaptor.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/JsTurmsExtensionAdaptor.java @@ -21,7 +21,9 @@ import lombok.Getter; import org.graalvm.polyglot.Value; +import reactor.core.publisher.Mono; +import im.turms.server.common.infra.plugin.script.ValueDecoder; import im.turms.server.common.infra.plugin.script.ValueInspector; /** @@ -56,31 +58,34 @@ ExtensionPoint getExtensionPoint() { } @Override - protected void onStarted() { - if (onStarted != null) { - onStarted.execute(); - } + protected Mono onStarted() { + return execute(onStarted); } @Override - protected void onStopped() { - if (onStopped != null) { - onStopped.execute(); - } + protected Mono onStopped() { + return execute(onStopped); } @Override - protected void onResumed() { - if (onResumed != null) { - onResumed.execute(); - } + protected Mono onResumed() { + return execute(onResumed); } @Override - protected void onPaused() { - if (onPaused != null) { - onPaused.execute(); + protected Mono onPaused() { + return execute(onPaused); + } + + private Mono execute(Value callback) { + if (callback == null) { + return Mono.empty(); + } + Mono mono = ValueDecoder.decodeAsMonoIfPromise(callback.execute(), false); + if (mono == null) { + return Mono.empty(); } + return mono.then(); } -} +} \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/Plugin.java b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/Plugin.java index 4aa5682cf8..fb299bf0ba 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/Plugin.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/Plugin.java @@ -22,8 +22,8 @@ import lombok.Data; import lombok.experimental.Accessors; +import reactor.core.publisher.Mono; -import im.turms.server.common.infra.exception.ThrowableUtil; import im.turms.server.common.infra.logging.core.logger.Logger; import im.turms.server.common.infra.logging.core.logger.LoggerFactory; @@ -48,109 +48,99 @@ public abstract sealed class Plugin permits JavaPlugin, JsPlugin { private final PluginDescriptor descriptor; private final List extensions; - void start() { - List runnables = new ArrayList<>(extensions.size()); + Mono start() { + List> startMonos = new ArrayList<>(extensions.size()); for (TurmsExtension extension : extensions) { - runnables.add(() -> { - try { - extension.start(); - } catch (Exception | LinkageError e) { - throw new RuntimeException( + startMonos.add(extension.start() + .onErrorResume(t -> Mono.error(new RuntimeException( "Caught an error while starting the extension: " + extension.getClass() .getName(), - e); - } - }); + t)))); } - ThrowableUtil.delayError(runnables, - "Caught errors while starting extensions of the plugin: " - + descriptor.getId()); - LOGGER.info("The plugin ({}) has been started", descriptor.getId()); + return Mono.whenDelayError(startMonos) + .onErrorResume(t -> Mono.error(new RuntimeException( + "Caught errors while starting extensions of the plugin: " + + descriptor.getId()))) + .doOnSuccess( + unused -> LOGGER.info("The extensions of the plugin ({}) have been started", + descriptor.getId())); } - void stop() { - List runnables = new ArrayList<>(extensions.size()); + Mono stop() { + List> stopMonos = new ArrayList<>(extensions.size()); for (TurmsExtension extension : extensions) { - runnables.add(() -> { - try { - extension.stop(); - } catch (Exception | LinkageError e) { - throw new RuntimeException( + stopMonos.add(extension.stop() + .onErrorResume(t -> Mono.error(new RuntimeException( "Caught an error while stopping the extension: " + extension.getClass() .getName(), - e); - } - }); + t)))); } - RuntimeException stopExtensionsException = null; - RuntimeException closeContextException = null; - try { - ThrowableUtil.delayError(runnables, "Caught errors while stopping extensions"); - } catch (RuntimeException e) { - stopExtensionsException = e; - } - try { - closeContext(); - } catch (RuntimeException e) { - closeContextException = e; - } - if (stopExtensionsException != null || closeContextException != null) { - RuntimeException e = new RuntimeException( - "Caught errors while stopping the plugin: " - + descriptor.getId()); - if (stopExtensionsException != null) { - e.addSuppressed(stopExtensionsException); - } - if (closeContextException != null) { - e.addSuppressed(closeContextException); - } - throw e; - } - LOGGER.info("The plugin ({}) has been stopped", descriptor.getId()); + return Mono.whenDelayError(stopMonos) + .materialize() + .flatMap(signal -> { + Throwable stopExtensionsException = signal.getThrowable(); + RuntimeException closeContextException = null; + try { + closeContext(); + } catch (RuntimeException e) { + closeContextException = e; + } + if (stopExtensionsException != null || closeContextException != null) { + RuntimeException e = new RuntimeException( + "Caught errors while stopping the plugin: " + + descriptor.getId()); + if (stopExtensionsException != null) { + e.addSuppressed(stopExtensionsException); + } + if (closeContextException != null) { + e.addSuppressed(closeContextException); + } + return Mono.error(e); + } + LOGGER.info("The extensions of the plugin ({}) have been stopped", + descriptor.getId()); + return Mono.empty(); + }); } - void resume() { - List runnables = new ArrayList<>(extensions.size()); + Mono resume() { + List> resumeMonos = new ArrayList<>(extensions.size()); for (TurmsExtension extension : extensions) { - runnables.add(() -> { - try { - extension.resume(); - } catch (Exception | LinkageError e) { - throw new RuntimeException( + resumeMonos.add(extension.resume() + .onErrorResume(t -> Mono.error(new RuntimeException( "Caught an error while resuming the extension: " + extension.getClass() .getName(), - e); - } - }); + t)))); } - ThrowableUtil.delayError(runnables, - "Caught errors while resuming extensions of the plugin: " - + descriptor.getId()); - LOGGER.info("The plugin ({}) has been resumed", descriptor.getId()); + return Mono.whenDelayError(resumeMonos) + .onErrorResume(t -> Mono.error(new RuntimeException( + "Caught errors while resuming extensions of the plugin: " + + descriptor.getId()))) + .doOnSuccess( + unused -> LOGGER.info("The extensions of the plugin ({}) have been resumed", + descriptor.getId())); } - void pause() { - List runnables = new ArrayList<>(extensions.size()); + Mono pause() { + List> pauseMonos = new ArrayList<>(extensions.size()); for (TurmsExtension extension : extensions) { - runnables.add(() -> { - try { - extension.pause(); - } catch (Exception | LinkageError e) { - throw new RuntimeException( + pauseMonos.add(extension.pause() + .onErrorResume(t -> Mono.error(new RuntimeException( "Caught an error while pausing the extension: " + extension.getClass() .getName(), - e); - } - }); + t)))); } - ThrowableUtil.delayError(runnables, - "Caught errors while pausing extensions of the plugin: " - + descriptor.getId()); - LOGGER.info("The plugin ({}) has been paused", descriptor.getId()); + return Mono.whenDelayError(pauseMonos) + .onErrorResume(t -> Mono.error(new RuntimeException( + "Caught errors while pausing extensions of the plugin: " + + descriptor.getId()))) + .doOnSuccess( + unused -> LOGGER.info("The extensions of the plugin ({}) have been paused", + descriptor.getId())); } abstract void closeContext(); diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/PluginManager.java b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/PluginManager.java index f77ef0b4de..be99d6eaa4 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/PluginManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/PluginManager.java @@ -55,7 +55,6 @@ import im.turms.server.common.infra.context.JobShutdownOrder; import im.turms.server.common.infra.context.TurmsApplicationContext; import im.turms.server.common.infra.exception.FeatureDisabledException; -import im.turms.server.common.infra.exception.ThrowableUtil; import im.turms.server.common.infra.io.FileUtil; import im.turms.server.common.infra.io.InputOutputException; import im.turms.server.common.infra.lang.ClassUtil; @@ -98,6 +97,7 @@ public class PluginManager implements ApplicationListener private final String jsInspectHost; private final int jsInspectPort; + @Getter private final PluginRepository pluginRepository; private final NodeType nodeType; private final ApplicationContext context; @@ -154,12 +154,14 @@ public PluginManager( /** * @implNote Start plugins after all beans are ready so that plugins can get any bean when - * starting + * starting. */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (enabled) { - startPlugins(); + startPlugins().timeout(Duration.ofMinutes(10)) + .subscribe(unused -> LOGGER.info("All plugins are started"), + t -> LOGGER.error("Caught an error while starting plugins", t)); } } @@ -169,31 +171,29 @@ private Path getPluginDir(Path home, String pluginsDir) { } private Mono destroy() { - Exception stopPluginsException = null; - Exception closeEngineException = null; - try { - stopPlugins(); - } catch (Exception e) { - stopPluginsException = e; - } - if (engine != null) { - try { - ((Engine) engine).close(true); - } catch (Exception e) { - closeEngineException = e; - } - } - if (stopPluginsException != null || closeEngineException != null) { - Exception e = new RuntimeException("Caught errors while destroying"); - if (stopPluginsException != null) { - e.addSuppressed(stopPluginsException); - } - if (closeEngineException != null) { - e.addSuppressed(closeEngineException); - } - return Mono.error(e); - } - return Mono.empty(); + return stopPlugins().materialize() + .flatMap(signal -> { + Throwable stopPluginsException = signal.getThrowable(); + Exception closeEngineException = null; + if (engine != null) { + try { + ((Engine) engine).close(true); + } catch (Exception e) { + closeEngineException = e; + } + } + if (stopPluginsException != null || closeEngineException != null) { + Exception e = new RuntimeException("Caught errors while destroying"); + if (stopPluginsException != null) { + e.addSuppressed(stopPluginsException); + } + if (closeEngineException != null) { + e.addSuppressed(closeEngineException); + } + return Mono.error(e); + } + return Mono.empty(); + }); } private void loadNetworkPlugins(NetworkProperties properties) { @@ -531,68 +531,78 @@ public List getPlugins(Set ids) { return pluginRepository.getPlugins(ids); } - public int startPlugins() { + public Mono startPlugins() { Collection plugins = pluginRepository.getPlugins(); return startPlugins(plugins); } - public int startPlugins(Set ids) { + public Mono startPlugins(Set ids) { List plugins = pluginRepository.getPlugins(ids); return startPlugins(plugins); } - public int startPlugins(Collection plugins) { - List runnables = new ArrayList<>(plugins.size()); + public Mono startPlugins(Collection plugins) { + List> startMonos = new ArrayList<>(plugins.size()); for (Plugin plugin : plugins) { - runnables.add(plugin::start); + startMonos.add(plugin.start()); } - ThrowableUtil.delayError(runnables, "Caught errors while starting plugins"); - return plugins.size(); + return Mono.whenDelayError(startMonos) + .onErrorResume(t -> Mono + .error(new RuntimeException("Caught errors while starting plugins"))) + .thenReturn(plugins.size()); } - public int stopPlugins() { + public Mono stopPlugins() { Collection plugins = pluginRepository.getPlugins(); return stopPlugins(plugins); } - public int stopPlugins(Set ids) { + public Mono stopPlugins(Set ids) { List plugins = pluginRepository.getPlugins(ids); return stopPlugins(plugins); } - public int stopPlugins(Collection plugins) { - List runnables = new ArrayList<>(plugins.size()); + public Mono stopPlugins(Collection plugins) { + List> stopMonos = new ArrayList<>(plugins.size()); for (Plugin plugin : plugins) { - runnables.add(plugin::stop); + stopMonos.add(plugin.stop()); } - ThrowableUtil.delayError(runnables, "Caught errors while stopping plugins"); - return plugins.size(); + return Mono.whenDelayError(stopMonos) + .onErrorResume(t -> Mono + .error(new RuntimeException("Caught errors while stopping plugins"))) + .thenReturn(plugins.size()); } - public int resumePlugins(Set ids) { + public Mono resumePlugins(Set ids) { List plugins = pluginRepository.getPlugins(ids); - List runnables = new ArrayList<>(plugins.size()); + List> resumeMonos = new ArrayList<>(plugins.size()); for (Plugin plugin : plugins) { - runnables.add(plugin::resume); + resumeMonos.add(plugin.resume()); } - ThrowableUtil.delayError(runnables, "Caught errors while resuming plugins"); - return plugins.size(); + return Mono.whenDelayError(resumeMonos) + .onErrorResume(t -> Mono + .error(new RuntimeException("Caught errors while resuming plugins"))) + .thenReturn(plugins.size()); } - public int pausePlugins(Set ids) { + public Mono pausePlugins(Set ids) { List plugins = pluginRepository.getPlugins(ids); - List runnables = new ArrayList<>(plugins.size()); + List> pauseMonos = new ArrayList<>(plugins.size()); for (Plugin plugin : plugins) { - runnables.add(plugin::pause); + pauseMonos.add(plugin.pause()); } - ThrowableUtil.delayError(runnables, "Caught errors while pausing plugins"); - return plugins.size(); + return Mono.whenDelayError(pauseMonos) + .onErrorResume(t -> Mono + .error(new RuntimeException("Caught errors while pausing plugins"))) + .thenReturn(plugins.size()); } - public void deletePlugins(Set ids, boolean deleteLocalFiles) { + public Mono deletePlugins(Set ids, boolean deleteLocalFiles) { List plugins = pluginRepository.removePlugins(ids); - stopPlugins(plugins); - if (deleteLocalFiles) { + return stopPlugins(plugins).then(Mono.fromRunnable(() -> { + if (!deleteLocalFiles) { + return; + } for (Plugin plugin : plugins) { try { Path path = plugin.descriptor() @@ -604,7 +614,7 @@ public void deletePlugins(Set ids, boolean deleteLocalFiles) { // ignored } } - } + })); } public boolean hasRunningExtensions(Class extensionPointClass) { @@ -651,8 +661,7 @@ public Mono invokeExtensionPointsSequentially( @Nullable R initialValue, SequentialExtensionPointInvoker invoker) { List extensionPoints = pluginRepository.getExtensionPoints(extensionPointClass); - int size = extensionPoints.size(); - if (size == 0) { + if (extensionPoints.isEmpty()) { return initialValue == null ? Mono.empty() : Mono.just(initialValue); diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/TurmsExtension.java b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/TurmsExtension.java index 5b6347bdf1..3e2bc8293f 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/TurmsExtension.java +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/plugin/TurmsExtension.java @@ -25,12 +25,14 @@ import lombok.Getter; import lombok.Setter; import org.springframework.context.ApplicationContext; +import reactor.core.publisher.Mono; import im.turms.server.common.access.admin.web.HttpRequestDispatcher; import im.turms.server.common.infra.lang.ClassUtil; import im.turms.server.common.infra.logging.core.logger.Logger; import im.turms.server.common.infra.logging.core.logger.LoggerFactory; import im.turms.server.common.infra.property.TurmsPropertiesManager; +import im.turms.server.common.infra.reactor.TaskScheduler; /** * @author James Chen @@ -39,6 +41,8 @@ public abstract class TurmsExtension { private static final Logger LOGGER = LoggerFactory.getLogger(TurmsExtension.class); + private final TaskScheduler taskScheduler = new TaskScheduler(); + @Setter(AccessLevel.PACKAGE) @Getter private ApplicationContext context; @@ -93,76 +97,79 @@ List> getExtensionPointClasses() { return extensionPointClasses; } - synchronized void start() { + Mono start() { if (initialized || started) { - return; + return Mono.empty(); } - onStarted(); - initialized = true; - started = true; - running = true; - LOGGER.info("The extension ({}) of the plugin ({}) has been started", - getClass().getName(), - plugin.descriptor() - .getId()); - } - - synchronized void stop() { + return taskScheduler.addTask(Mono.defer(this::onStarted) + .doOnSuccess(unused -> { + initialized = true; + started = true; + running = true; + LOGGER.info("The extension ({}) of the plugin ({}) has been started", + getClass().getName(), + plugin.descriptor() + .getId()); + })); + } + + Mono stop() { if (!started) { - return; - } - try { - onStopped(); - } finally { - running = false; - started = false; - LOGGER.info("The extension ({}) of the plugin ({}) has been stopped", - getClass().getName(), - plugin.descriptor() - .getId()); + return Mono.empty(); } - } - - synchronized void resume() { + return taskScheduler.addTask(Mono.defer(this::onStopped) + .doFinally(signalType -> { + running = false; + started = false; + LOGGER.info("The extension ({}) of the plugin ({}) has been stopped", + getClass().getName(), + plugin.descriptor() + .getId()); + })); + } + + Mono resume() { if (!started || running) { - return; - } - try { - onResumed(); - } finally { - running = true; - LOGGER.info("The extension ({}) of the plugin ({}) has been resumed", - getClass().getName(), - plugin.descriptor() - .getId()); + return Mono.empty(); } + return taskScheduler.addTask(Mono.defer(this::onResumed) + .doOnSuccess(signalType -> { + running = true; + LOGGER.info("The extension ({}) of the plugin ({}) has been resumed", + getClass().getName(), + plugin.descriptor() + .getId()); + })); } - synchronized void pause() { + Mono pause() { if (!running) { - return; - } - try { - onPaused(); - } finally { - running = false; - LOGGER.info("The extension ({}) of the plugin ({}) has been paused", - getClass().getName(), - plugin.descriptor() - .getId()); + return Mono.empty(); } + return taskScheduler.addTask(Mono.defer(this::onPaused) + .doFinally(signalType -> { + running = false; + LOGGER.info("The extension ({}) of the plugin ({}) has been paused", + getClass().getName(), + plugin.descriptor() + .getId()); + })); } - protected void onStarted() { + protected Mono onStarted() { + return Mono.empty(); } - protected void onStopped() { + protected Mono onStopped() { + return Mono.empty(); } - protected void onResumed() { + protected Mono onResumed() { + return Mono.empty(); } - protected void onPaused() { + protected Mono onPaused() { + return Mono.empty(); } } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/infra/reactor/TaskScheduler.java b/turms-server-common/src/main/java/im/turms/server/common/infra/reactor/TaskScheduler.java new file mode 100644 index 0000000000..1c6942ed25 --- /dev/null +++ b/turms-server-common/src/main/java/im/turms/server/common/infra/reactor/TaskScheduler.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2019 The Turms Project + * https://github.com/turms-im/turms + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.turms.server.common.infra.reactor; + +import java.util.ArrayDeque; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import im.turms.server.common.infra.thread.ThreadSafe; + +/** + * The class is used to run task one by one. + * + * @author James Chen + */ +@ThreadSafe +public class TaskScheduler { + + private volatile boolean hasRunningTask; + private final ArrayDeque> tasks = new ArrayDeque<>(16); + + public Mono addTask(Mono mono) { + mono = mono.doFinally(signalType -> { + synchronized (this) { + Sinks.One sink = tasks.poll(); + if (sink == null) { + hasRunningTask = false; + } else { + sink.tryEmitEmpty(); + } + } + }); + synchronized (this) { + if (!hasRunningTask) { + hasRunningTask = true; + return mono; + } + Sinks.One sink = Sinks.one(); + tasks.add(sink); + return sink.asMono() + .then(mono); + } + } + +} \ No newline at end of file diff --git a/turms-server-common/src/test/java/unit/im/turms/server/common/infra/reactor/TaskSchedulerTests.java b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/reactor/TaskSchedulerTests.java new file mode 100644 index 0000000000..780dd53d1a --- /dev/null +++ b/turms-server-common/src/test/java/unit/im/turms/server/common/infra/reactor/TaskSchedulerTests.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2019 The Turms Project + * https://github.com/turms-im/turms + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.im.turms.server.common.infra.reactor; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import im.turms.server.common.infra.reactor.TaskScheduler; + +/** + * @author James Chen + */ +class TaskSchedulerTests { + + @Test + void test() { + TaskScheduler scheduler = new TaskScheduler(); + Mono mono1 = scheduler.addTask(Mono.delay(Duration.ofMillis(100)) + .thenReturn(1)); + Mono mono2 = scheduler.addTask(Mono.just(2)); + Mono mono3 = scheduler.addTask(Mono.delay(Duration.ofMillis(200)) + .thenReturn(3)); + Mono mono4 = scheduler.addTask(Mono.just(4)); + + Flux result = Flux.merge(mono1, mono2, mono3, mono4); + StepVerifier.create(result) + .expectNext(1, 2, 3, 4) + .verifyComplete(); + } +} \ No newline at end of file