From b9992cc2a7648d6e566f408a4a6b321ffff4bd5d Mon Sep 17 00:00:00 2001 From: Miu Date: Fri, 31 Mar 2023 20:09:52 +0800 Subject: [PATCH] fix(express): global processing queue is not completely processed at startup (#15) update(express): use launcher instead of threadpool --- .../nexus/express/core/NexusExpressCore.kt | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/main/java/pw/mihou/nexus/express/core/NexusExpressCore.kt b/src/main/java/pw/mihou/nexus/express/core/NexusExpressCore.kt index 2fb8028d..2d9b53a1 100644 --- a/src/main/java/pw/mihou/nexus/express/core/NexusExpressCore.kt +++ b/src/main/java/pw/mihou/nexus/express/core/NexusExpressCore.kt @@ -4,7 +4,6 @@ import org.javacord.api.DiscordApi import org.javacord.api.entity.server.Server import pw.mihou.nexus.Nexus import pw.mihou.nexus.core.exceptions.NexusFailedActionException -import pw.mihou.nexus.core.threadpool.NexusThreadPool import pw.mihou.nexus.express.NexusExpress import pw.mihou.nexus.express.event.NexusExpressEvent import pw.mihou.nexus.express.event.core.NexusExpressEventCore @@ -27,14 +26,14 @@ internal class NexusExpressCore: NexusExpress { private val localQueue: MutableMap> = ConcurrentHashMap() fun ready(shard: DiscordApi) { - NexusThreadPool.executorService.submit { + Nexus.launcher.launch { val local = localQueue(shard.currentShard) while (!local.isEmpty()) { try { val event = local.poll() if (event != null) { - NexusThreadPool.executorService.submit { + Nexus.launcher.launch { (event as NexusExpressEventCore).process(shard) } } @@ -51,7 +50,7 @@ internal class NexusExpressCore: NexusExpress { if (!predicate.test(shard)) continue val (_, event) = predicateQueue.poll() - NexusThreadPool.executorService.submit { + Nexus.launcher.launch { (event as NexusExpressEventCore).process(shard) } } catch (exception: Exception) { @@ -61,18 +60,20 @@ internal class NexusExpressCore: NexusExpress { } } - NexusThreadPool.executorService.submit { + Nexus.launcher.launch { globalQueueProcessingLock.withLock { - try { - val event = globalQueue.poll() + while(globalQueue.isNotEmpty()) { + try { + val event = globalQueue.poll() - if (event != null) { - NexusThreadPool.executorService.submit { - (event as NexusExpressEventCore).process(shard) + if (event != null) { + Nexus.launcher.launch { + (event as NexusExpressEventCore).process(shard) + } } + } catch (exception: Exception) { + Nexus.logger.error("An uncaught exception was caught from Nexus Express Way.", exception) } - } catch (exception: Exception) { - Nexus.logger.error("An uncaught exception was caught from Nexus Express Way.", exception) } } } @@ -90,7 +91,7 @@ internal class NexusExpressCore: NexusExpress { val maximumTimeout = Nexus.configuration.express.maximumTimeout if (!maximumTimeout.isZero && !maximumTimeout.isNegative) { - NexusThreadPool.schedule({ + Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) { expressEvent.`do` { if (status() == NexusExpressEventStatus.WAITING) { val removed = localQueue(shard).remove(this) @@ -105,10 +106,10 @@ internal class NexusExpressCore: NexusExpress { expire() } } - }, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS) + } } } else { - NexusThreadPool.executorService.submit { expressEvent.process(Nexus.sharding[shard]!!) } + Nexus.launcher.launch { expressEvent.process(Nexus.sharding[shard]!!) } } return expressEvent @@ -124,25 +125,25 @@ internal class NexusExpressCore: NexusExpress { val maximumTimeout = Nexus.configuration.express.maximumTimeout if (!maximumTimeout.isZero && !maximumTimeout.isNegative) { - NexusThreadPool.schedule({ + Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) { expressEvent.`do` { if (status() == NexusExpressEventStatus.WAITING) { val removed = predicateQueue.remove(pair) if (Nexus.configuration.express.showExpiredWarnings) { Nexus.logger.warn( - "An express request that was specified " + - "for a predicate has expired after ${maximumTimeout.toMillis()} milliseconds " + - "without any matching shard connecting with Nexus. [acknowledged=$removed]" + "An express request that was specified " + + "for a predicate has expired after ${maximumTimeout.toMillis()} milliseconds " + + "without any matching shard connecting with Nexus. [acknowledged=$removed]" ) } expire() } } - }, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS) + } } } else { - NexusThreadPool.executorService.submit { expressEvent.process(shard) } + Nexus.launcher.launch { expressEvent.process(shard) } } return expressEvent @@ -156,7 +157,7 @@ internal class NexusExpressCore: NexusExpress { val maximumTimeout = Nexus.configuration.express.maximumTimeout if (!maximumTimeout.isZero && !maximumTimeout.isNegative) { - NexusThreadPool.schedule({ + Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) { expressEvent.`do` { if (status() == NexusExpressEventStatus.WAITING) { val removed = globalQueue.remove(this) @@ -171,10 +172,10 @@ internal class NexusExpressCore: NexusExpress { expire() } } - }, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS) + } } } else { - NexusThreadPool.executorService.submit { expressEvent.process(Nexus.sharding.collection().first()) } + Nexus.launcher.launch { expressEvent.process(Nexus.sharding.collection().first()) } } return expressEvent @@ -238,7 +239,7 @@ internal class NexusExpressCore: NexusExpress { override fun broadcast(event: Consumer) { Nexus.sharding.collection().forEach { shard -> - NexusThreadPool.executorService.submit { + Nexus.launcher.launch { try { event.accept(shard) } catch (exception: Exception) {