Skip to content

Commit

Permalink
Merge pull request #3581 from ingef/release
Browse files Browse the repository at this point in the history
Merge Release
  • Loading branch information
thoniTUB authored Oct 28, 2024
2 parents 7fa49e3 + c636dd8 commit 3e3d0d1
Show file tree
Hide file tree
Showing 47 changed files with 618 additions and 475 deletions.
6 changes: 3 additions & 3 deletions backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.19.0</version>
<version>2.24.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
Expand Down Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Expand All @@ -367,7 +367,7 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.17.6</version>
<version>1.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -40,11 +41,16 @@ public WriteFuture send(final NetworkMessage<?> message) {
}
}
catch (InterruptedException e) {
log.error("Unexpected interruption", e);
return send(message);
log.error("Unexpected interruption, while trying to queue: {}", message, e);
return DefaultWriteFuture.newNotWrittenFuture(session, e);
}
WriteFuture future = session.write(message);
future.addListener(f -> queuedMessages.remove(message));
future.addListener(f -> {
if(f instanceof WriteFuture writeFuture && ! writeFuture.isWritten()) {
log.error("Could not write message: {}", message, writeFuture.getException());
}
queuedMessages.remove(message);
});

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,39 @@ public void sessionCreated(IoSession session) {
public void sessionOpened(IoSession session) {
NetworkSession networkSession = new NetworkSession(session);

context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());
// Schedule ShardNode and Worker registration, so we don't block this thread which does the actual sending
scheduler.schedule(() -> {
context = new NetworkMessageContext.ShardNodeNetworkContext(networkSession, workers, config, environment.getValidator());
log.info("Connected to ManagerNode @ `{}`", session.getRemoteAddress());

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}
}, 0, TimeUnit.SECONDS);

// Authenticate with ManagerNode
context.send(new AddShardNode());

for (Worker w : workers.getWorkers().values()) {
w.setSession(new NetworkSession(session));
WorkerInformation info = w.getInfo();
log.info("Sending worker identity '{}'", info.getName());
networkSession.send(new RegisterWorker(info));
}

scheduleIdleLogger(scheduler, session, config.getCluster().getIdleTimeOut());
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
}

@Override
public void sessionClosed(IoSession session) {
log.info("Disconnected from ManagerNode.");
Expand All @@ -96,56 +113,6 @@ public void sessionClosed(IoSession session) {
}
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}


@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}


@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}


@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}


@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

private void connectToCluster() {
InetSocketAddress address = new InetSocketAddress(
config.getCluster().getManagerURL().getHostAddress(),
Expand Down Expand Up @@ -184,7 +151,6 @@ private void connectToCluster() {
}
}


private void disconnectFromCluster() {
if (future != null) {
future.cancel();
Expand All @@ -201,7 +167,6 @@ private void disconnectFromCluster() {
}
}


@NotNull
private NioSocketConnector getClusterConnector(IdResolveContext workers) {
ObjectMapper om = internalMapperFactory.createShardCommunicationMapper();
Expand All @@ -216,19 +181,65 @@ private NioSocketConnector getClusterConnector(IdResolveContext workers) {
return connector;
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) {
log.trace("Session idle {}.", status);
}

private static void scheduleIdleLogger(ScheduledExecutorService scheduler, IoSession session, Duration timeout) {
scheduler.scheduleAtFixedRate(
() -> {
final Duration elapsed = Duration.milliseconds(System.currentTimeMillis() - session.getLastIoTime());
if (elapsed.compareTo(timeout) > 0) {
log.trace("No message sent or received since {}", elapsed);
}
},
timeout.toSeconds(), timeout.toSeconds() / 2, TimeUnit.SECONDS
);
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("Exception caught", cause);
}

@Override
public void messageReceived(IoSession session, Object message) {
if (!(message instanceof MessageToShardNode)) {
log.error("Unknown message type {} in {}", message.getClass(), message);
return;
}

log.trace("{} received {} from {}", environment.getName(), message.getClass().getSimpleName(), session.getRemoteAddress());
ReactingJob<MessageToShardNode, NetworkMessageContext.ShardNodeNetworkContext> job = new ReactingJob<>((MessageToShardNode) message, context);

if (message instanceof SlowMessage slowMessage) {
slowMessage.setProgressReporter(job.getProgressReporter());
jobManager.addSlowJob(job);
}
else {
jobManager.addFastJob(job);
}
}

@Override
public void messageSent(IoSession session, Object message) {
log.trace("Message sent: {}", message);
}

@Override
public void inputClosed(IoSession session) {
log.info("Input closed.");
session.closeNow();
scheduler.schedule(this::disconnectFromCluster, 0, TimeUnit.SECONDS);
}

@Override
public void event(IoSession session, FilterEvent event) throws Exception {
log.trace("Event handled: {}", event);
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

private void reportJobManagerStatus() {
if (context == null || !context.isConnected()) {
Expand Down Expand Up @@ -257,20 +268,6 @@ private void reportJobManagerStatus() {
}
}

@Override
public void start() throws Exception {


jobManager = new JobManager(environment.getName(), config.isFailOnError());

scheduler = environment.lifecycle().scheduledExecutorService("cluster-connection-shard").build();
// Connect async as the manager might not be up jet or is started by a test in succession
scheduler.schedule(this::connectToCluster, 0, TimeUnit.MINUTES);

scheduler.scheduleAtFixedRate(this::reportJobManagerStatus, 30, 1, TimeUnit.SECONDS);

}

public boolean isBusy() {
return jobManager.isSlowWorkerBusy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public Set<ConqueryPermission> getEffectivePermissions() {
return permissions;
}

public synchronized void addMember(User user) {
if (members.add(user.getId())) {
log.trace("Added user {} to group {}", user.getId(), getId());
updateStorage();
}
}

@Override
public void updateStorage() {
storage.updateGroup(this);
Expand All @@ -55,16 +62,9 @@ public GroupId createId() {
return new GroupId(name);
}

public synchronized void addMember(User user) {
if (members.add(user.getId())) {
log.trace("Added user {} to group {}", user.getId(), getId());
updateStorage();
}
}

public synchronized void removeMember(User user) {
if (members.remove(user.getId())) {
log.trace("Removed user {} from group {}", user.getId(), getId());
public synchronized void removeMember(UserId user) {
if (members.remove(user)) {
log.trace("Removed user {} from group {}", user, getId());
updateStorage();
}
}
Expand All @@ -84,9 +84,9 @@ public synchronized void addRole(Role role) {
}
}

public synchronized void removeRole(Role role) {
if (roles.remove(role.getId())) {
log.trace("Removed role {} from group {}", role.getId(), getId());
public synchronized void removeRole(RoleId role) {
if (roles.remove(role)) {
log.trace("Removed role {} from group {}", role, getId());
updateStorage();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@

import java.util.Set;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.auth.permissions.ConqueryPermission;
import com.bakdata.conquery.models.identifiable.ids.specific.RoleId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;

public interface RoleOwner {

void addRole(Role role);

void removeRole(Role role);
void removeRole(RoleId role);

/**
* Return a copy of the roles hold by the owner.
Expand Down
Loading

0 comments on commit 3e3d0d1

Please sign in to comment.