Skip to content

Commit

Permalink
Refactor NexusEngineX to be more efficient in CPU
Browse files Browse the repository at this point in the history
  • Loading branch information
ShindouMihou committed Mar 30, 2022
1 parent bdb00bd commit 158327e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 101 deletions.
106 changes: 83 additions & 23 deletions src/main/java/pw/mihou/nexus/core/enginex/core/NexusEngineXCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class NexusEngineXCore implements NexusEngineX {

private final ConcurrentLinkedQueue<NexusEngineEvent> globalQueue = new ConcurrentLinkedQueue<>();
private final Map<Long, ConcurrentLinkedQueue<NexusEngineEvent>> localQueue = new ConcurrentHashMap<>();
private final BlockingQueue<NexusEngineEvent> globalQueue = new LinkedBlockingQueue<>();
private final Map<Integer, BlockingQueue<NexusEngineEvent>> localQueue = new ConcurrentHashMap<>();
private final AtomicBoolean hasGlobalProcessed = new AtomicBoolean(false);
private final Nexus nexus;

/**
Expand All @@ -37,52 +39,110 @@ public NexusEngineXCore(Nexus nexus) {
*
* @return The blocking queue that any shard can accept.
*/
public ConcurrentLinkedQueue<NexusEngineEvent> getGlobalQueue() {
public BlockingQueue<NexusEngineEvent> getGlobalQueue() {
return globalQueue;
}

/**
* An open executable method that is used by {@link pw.mihou.nexus.core.managers.NexusShardManager} to tell the EngineX
* to proceed with sending requests to the specific shard.
*
* @param shard The shard to process the events.
*/
public void onShardReady(DiscordApi shard) {
CompletableFuture.runAsync(() -> {
while (!getLocalQueue(shard.getCurrentShard()).isEmpty()) {
NexusEngineEvent event = getLocalQueue(shard.getCurrentShard()).poll();

if (event != null) {
((NexusEngineEventCore) event).process(shard);
}
}
});

if (!hasGlobalProcessed.get() && !getGlobalQueue().isEmpty()) {
hasGlobalProcessed.set(true);
CompletableFuture.runAsync(() -> {
while (!getGlobalQueue().isEmpty()) {
NexusEngineEvent event = getGlobalQueue().poll();

if (event != null) {
((NexusEngineEventCore) event).process(shard);
}
}
hasGlobalProcessed.set(false);
});
}
}

/**
* Gets the local queue for this shard. If it doesn't exist then it will add
* a queue instead and return the newly created queue.
*
* @param shard The shard to get the queue of.
* @return The blocking queue for this shard.
*/
public ConcurrentLinkedQueue<NexusEngineEvent> getLocalQueue(long shard) {
public BlockingQueue<NexusEngineEvent> getLocalQueue(int shard) {
if (!localQueue.containsKey(shard)) {
localQueue.put(shard, new ConcurrentLinkedQueue<>());
localQueue.put(shard, new LinkedBlockingQueue<>());
}

return localQueue.get(shard);
}

@Override
public NexusEngineEvent queue(long shard, NexusEngineQueuedEvent event) {
public NexusEngineEvent queue(int shard, NexusEngineQueuedEvent event) {
NexusEngineEventCore engineEvent = new NexusEngineEventCore(event);
getLocalQueue(shard).add(engineEvent);

Duration expiration = nexus.getConfiguration().timeBeforeExpiringEngineRequests();
if (!(expiration.isZero() || expiration.isNegative())) {
NexusThreadPool.schedule(() -> {
if (engineEvent.status() == NexusEngineEventStatus.WAITING) {
boolean removeFromQueue = localQueue.get(shard).remove(engineEvent);
NexusCore.logger.warn(
"An engine request that was specified for a shard was expired because the shard failed to take hold of the request before expiration. " +
"[shard={};acknowledged={}]",
shard, removeFromQueue
);
engineEvent.expire();
}
}, expiration.toMillis(), TimeUnit.MILLISECONDS);

if (nexus.getShardManager().getShard(shard) == null) {
getLocalQueue(shard).add(engineEvent);

Duration expiration = nexus.getConfiguration().timeBeforeExpiringEngineRequests();
if (!(expiration.isZero() || expiration.isNegative())) {
NexusThreadPool.schedule(() -> {
if (engineEvent.status() == NexusEngineEventStatus.WAITING) {
boolean removeFromQueue = localQueue.get(shard).remove(engineEvent);
NexusCore.logger.warn(
"An engine request that was specified for a shard was expired because the shard failed to take hold of the request before expiration. " +
"[shard={};acknowledged={}]",
shard, removeFromQueue
);
engineEvent.expire();
}
}, expiration.toMillis(), TimeUnit.MILLISECONDS);
}
} else {
engineEvent.process(nexus.getShardManager().getShard(shard));
}

return engineEvent;
}

@Override
public NexusEngineEvent queue(NexusEngineQueuedEvent event) {
NexusEngineEvent engineEvent = new NexusEngineEventCore(event);
globalQueue.add(engineEvent);
NexusEngineEventCore engineEvent = new NexusEngineEventCore(event);

if (nexus.getShardManager().size() == 0) {
globalQueue.add(engineEvent);

Duration expiration = nexus.getConfiguration().timeBeforeExpiringEngineRequests();
if (!(expiration.isZero() || expiration.isNegative())) {
NexusThreadPool.schedule(() -> {
if (engineEvent.status() == NexusEngineEventStatus.WAITING) {
boolean removeFromQueue = globalQueue.remove(engineEvent);
NexusCore.logger.warn(
"An engine request that was specified for a shard was expired because the shard failed to take hold of the request before expiration. " +
"[acknowledged={}]",
removeFromQueue
);
engineEvent.expire();
}
}, expiration.toMillis(), TimeUnit.MILLISECONDS);
}
} else {
DiscordApi shard = nexus.getShardManager().asStream().findFirst().orElseThrow();
engineEvent.process(shard);
}

return engineEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface NexusEngineX {
* @param event The event to execute for this shard.
* @return The controller and status viewer for the event.
*/
NexusEngineEvent queue(long shard, NexusEngineQueuedEvent event);
NexusEngineEvent queue(int shard, NexusEngineQueuedEvent event);

/**
* Queues an event to be executed by any specific shard that is available
Expand Down
32 changes: 18 additions & 14 deletions src/main/java/pw/mihou/nexus/core/managers/NexusShardManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.javacord.api.DiscordApi;
import pw.mihou.nexus.Nexus;
import pw.mihou.nexus.core.NexusCore;
import pw.mihou.nexus.core.managers.wrappers.NexusDiscordShardWrapper;
import pw.mihou.nexus.core.enginex.core.NexusEngineXCore;

import javax.annotation.Nullable;
import java.util.*;
Expand All @@ -12,7 +12,7 @@

public class NexusShardManager {

private final ConcurrentHashMap<Integer, NexusDiscordShardWrapper> shards;
private final ConcurrentHashMap<Integer, DiscordApi> shards;
private final Nexus nexus;

/**
Expand All @@ -23,9 +23,7 @@ public class NexusShardManager {
*/
public NexusShardManager(Nexus nexus, DiscordApi... shards) {
this(nexus);
Arrays.stream(shards)
.forEach(discordApi -> this.shards
.put(discordApi.getCurrentShard(), new NexusDiscordShardWrapper((NexusCore) nexus, discordApi)));
Arrays.stream(shards).forEach(this::put);
}

/**
Expand All @@ -45,7 +43,7 @@ public NexusShardManager(Nexus nexus) {
*/
@Nullable
public DiscordApi getShard(int number) {
return shards.get(number).api();
return shards.get(number);
}

/**
Expand All @@ -67,8 +65,9 @@ public Optional<DiscordApi> getShardOf(long server) {
* @param api The Discord API to store.
*/
public void put(DiscordApi api) {
remove(api.getCurrentShard());
this.shards.put(api.getCurrentShard(), new NexusDiscordShardWrapper((NexusCore) nexus, api));
this.shards.put(api.getCurrentShard(), api);

((NexusEngineXCore) ((NexusCore) nexus).getEngineX()).onShardReady(api);
}

/**
Expand All @@ -77,10 +76,6 @@ public void put(DiscordApi api) {
* @param shard The number of the shard to remove.
*/
public void remove(int shard) {
if (this.shards.containsKey(shard)) {
this.shards.get(shard).disable();
}

this.shards.remove(shard);
}

Expand All @@ -91,7 +86,7 @@ public void remove(int shard) {
* @return A stream of all the shards registered in the shard manager.
*/
public Stream<DiscordApi> asStream() {
return shards.values().stream().map(NexusDiscordShardWrapper::api);
return shards.values().stream();
}

/**
Expand All @@ -101,6 +96,15 @@ public Stream<DiscordApi> asStream() {
* @return A {@link Collection} of all the shards registered in the shard manager.
*/
public Collection<DiscordApi> asCollection() {
return shards.values().stream().map(NexusDiscordShardWrapper::api).toList();
return shards.values();
}

/**
* Gets the current size of the shard manager.
*
* @return The current size of the shard manager.
*/
public int size() {
return shards.size();
}
}

This file was deleted.

0 comments on commit 158327e

Please sign in to comment.