Skip to content

Commit

Permalink
fix(client-presence): disabled (failing) attendees unit tests (#23419)
Browse files Browse the repository at this point in the history
## Description

[ADO Bug
24395](https://dev.azure.com/fluidframework/internal/_workitems/edit/24395)

This PR enables the previously failing tests presence attendee tests as
well as the new ones added in this PR:
#23351

When the local attendee disconnects, we temporarily lose connectivity
status for remote attendees. To fix this we mark all 'Connected' remote
attendee connections as stale upon reconnect and update their status to
disconnected after 30 seconds of inactivity.

This PR also fixes some spelling mistakes: "seassionId-2" to
"sessionId-2"

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
  • Loading branch information
WillieHabi and Copilot authored Jan 17, 2025
1 parent 9ea731b commit 3e36525
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 64 deletions.
113 changes: 73 additions & 40 deletions packages/framework/presence/src/systemWorkspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
} from "./presence.js";
import { SessionClientStatus } from "./presence.js";
import type { PresenceStatesInternal } from "./presenceStates.js";
import { TimerManager } from "./timerManager.js";
import type { PresenceStates, PresenceStatesSchema } from "./types.js";

/**
Expand All @@ -37,17 +38,12 @@ class SessionClient implements ISessionClient {
*/
public order: number = 0;

private connectionStatus: SessionClientStatus;
private connectionStatus: SessionClientStatus = SessionClientStatus.Disconnected;

public constructor(
public readonly sessionId: ClientSessionId,
private connectionId: ClientConnectionId | undefined = undefined,
) {
this.connectionStatus =
connectionId === undefined
? SessionClientStatus.Disconnected
: SessionClientStatus.Connected;
}
public connectionId: ClientConnectionId | undefined = undefined,
) {}

public getConnectionId(): ClientConnectionId {
if (this.connectionId === undefined) {
Expand All @@ -60,8 +56,7 @@ class SessionClient implements ISessionClient {
return this.connectionStatus;
}

public setConnectionId(connectionId: ClientConnectionId): void {
this.connectionId = connectionId;
public setConnected(): void {
this.connectionStatus = SessionClientStatus.Connected;
}

Expand Down Expand Up @@ -103,6 +98,12 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
*/
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, SessionClient>();

// When local client disconnects, we lose the connectivity status updates for remote attendees in the session.
// Upon reconnect, we mark all other attendees connections as stale and update their status to disconnected after 30 seconds of inactivity.
private readonly staleConnectionClients = new Set<SessionClient>();

private readonly staleConnectionTimer = new TimerManager();

public constructor(
clientSessionId: ClientSessionId,
private readonly datastore: SystemWorkspaceDatastore,
Expand Down Expand Up @@ -135,36 +136,24 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
},
senderConnectionId: ClientConnectionId,
): void {
const postUpdateActions: (() => void)[] = [];
const audienceMembers = this.audience.getMembers();
const connectedAttendees = new Set<SessionClient>();
const joiningAttendees = new Set<SessionClient>();
for (const [clientConnectionId, value] of Object.entries(
remoteDatastore.clientToSessionId,
)) {
const clientSessionId = value.value;
const { attendee, isNew } = this.ensureAttendee(
const { attendee, isJoining } = this.ensureAttendee(
clientSessionId,
clientConnectionId,
/* order */ value.rev,
// If the attendee is present in audience OR if the attendee update is from the sending remote client itself,
// then the attendee is considered connected.
/* isConnected */ senderConnectionId === clientConnectionId ||
audienceMembers.has(clientConnectionId),
);

// Check new attendee against audience to see if they're currently connected
const isAttendeeConnected = audienceMembers.has(clientConnectionId);

if (isAttendeeConnected) {
connectedAttendees.add(attendee);
if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected) {
attendee.setConnectionId(clientConnectionId);
}
if (isNew) {
// If the attendee is both new and in audience (i.e. currently connected), emit an attendeeJoined event.
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
}
}

// If the attendee is not in the audience, they are considered disconnected.
if (!connectedAttendees.has(attendee)) {
attendee.setDisconnected();
// If the attendee is joining the session, add them to the list of joining attendees to be announced later.
if (isJoining) {
joiningAttendees.add(attendee);
}

const knownSessionId: InternalTypes.ValueRequiredState<ClientSessionId> | undefined =
Expand All @@ -177,20 +166,44 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
}

// TODO: reorganize processUpdate and caller to process actions after all updates are processed.
for (const action of postUpdateActions) {
action();
for (const announcedAttendee of joiningAttendees) {
this.events.emit("attendeeJoined", announcedAttendee);
}
}

public onConnectionAdded(clientConnectionId: ClientConnectionId): void {
assert(
this.selfAttendee.getConnectionStatus() === SessionClientStatus.Disconnected,
"Local client should be 'Disconnected' before adding new connection.",
);

this.datastore.clientToSessionId[clientConnectionId] = {
rev: this.selfAttendee.order++,
timestamp: Date.now(),
value: this.selfAttendee.sessionId,
};

this.selfAttendee.setConnectionId(clientConnectionId);
// Mark 'Connected' remote attendees connections as stale
for (const staleConnectionClient of this.attendees.values()) {
if (staleConnectionClient.getConnectionStatus() === SessionClientStatus.Connected) {
this.staleConnectionClients.add(staleConnectionClient);
}
}

// Update the self attendee
this.selfAttendee.connectionId = clientConnectionId;
this.selfAttendee.setConnected();
this.attendees.set(clientConnectionId, this.selfAttendee);

this.staleConnectionTimer.setTimeout(() => {
for (const client of this.staleConnectionClients) {
client.setDisconnected();
}
for (const client of this.staleConnectionClients) {
this.events.emit("attendeeDisconnected", client);
}
this.staleConnectionClients.clear();
}, 30_000);
}

public removeClientConnectionId(clientConnectionId: ClientConnectionId): void {
Expand All @@ -199,13 +212,19 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
return;
}

// If the local connection is being removed, clear the stale connection timer
if (attendee === this.selfAttendee) {
this.staleConnectionTimer.clearTimeout();
}

// If the last known connectionID is different from the connection ID being removed, the attendee has reconnected,
// therefore we should not change the attendee connection status or emit a disconnect event.
const attendeeReconnected = attendee.getConnectionId() !== clientConnectionId;
const connected = attendee.getConnectionStatus() === SessionClientStatus.Connected;
if (!attendeeReconnected && connected) {
attendee.setDisconnected();
this.events.emit("attendeeDisconnected", attendee);
this.staleConnectionClients.delete(attendee);
}
}

Expand Down Expand Up @@ -239,27 +258,41 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
clientSessionId: ClientSessionId,
clientConnectionId: ClientConnectionId,
order: number,
): { attendee: SessionClient; isNew: boolean } {
isConnected: boolean,
): { attendee: SessionClient; isJoining: boolean } {
let attendee = this.attendees.get(clientSessionId);
let isNew = false;
let isJoining = false;

if (attendee === undefined) {
// New attendee. Create SessionClient and add session ID based
// entry to map.
attendee = new SessionClient(clientSessionId, clientConnectionId);
this.attendees.set(clientSessionId, attendee);
isNew = true;
if (isConnected) {
attendee.setConnected();
isJoining = true;
}
} else if (order > attendee.order) {
// The given association is newer than the one we have.
// Update the order and current connection ID.
attendee.order = order;
attendee.setConnectionId(clientConnectionId);
isNew = true;
// Known attendee is joining the session if they are currently disconnected
if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected && isConnected) {
attendee.setConnected();
isJoining = true;
}
attendee.connectionId = clientConnectionId;
}

if (isConnected) {
// If the attendee is connected, remove them from the stale connection set
this.staleConnectionClients.delete(attendee);
}

// Always update entry for the connection ID. (Okay if already set.)
this.attendees.set(clientConnectionId, attendee);

return { attendee, isNew };
return { attendee, isJoining };
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ describe("Presence", () => {

it("sends join when connected during initialization", () => {
// Setup, Act (call to createPresenceManager), & Verify (post createPresenceManager call)
prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger);
prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger);
});

describe("responds to ClientJoin", () => {
let presence: ReturnType<typeof createPresenceManager>;

beforeEach(() => {
presence = prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger);
presence = prepareConnectedPresence(runtime, "sessionId-2", "client2", clock, logger);

// Pass a little time (to mimic reality)
clock.tick(10);
Expand All @@ -83,7 +83,7 @@ describe("Presence", () => {
"client2": {
"rev": 0,
"timestamp": initialTime,
"value": "seassionId-2",
"value": "sessionId-2",
},
},
},
Expand Down Expand Up @@ -153,7 +153,7 @@ describe("Presence", () => {
"client2": {
"rev": 0,
"timestamp": initialTime,
"value": "seassionId-2",
"value": "sessionId-2",
},
},
},
Expand Down
Loading

0 comments on commit 3e36525

Please sign in to comment.