Skip to content

Commit

Permalink
Working on heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
robmoffat committed Sep 24, 2024
1 parent 3b8a021 commit 49a62c4
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 76 deletions.
10 changes: 5 additions & 5 deletions toolbox/fdc3-for-web/demo/src/client/da/dummy-desktop-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import { v4 as uuid } from 'uuid'
import { APP_GOODBYE, APP_HELLO, DA_HELLO, FDC3_APP_EVENT } from "../../message-types";
import { DemoServerContext } from "./DemoServerContext";
import { FDC3_2_1_JSONDirectory } from "./FDC3_2_1_JSONDirectory";
import { DefaultFDC3Server, DirectoryApp, ServerContext } from "@kite9/fdc3-web-impl";
import { AppRegistration, DefaultFDC3Server, DirectoryApp, ServerContext } from "@kite9/fdc3-web-impl";
import { ChannelState, ChannelType } from "@kite9/fdc3-web-impl/src/handlers/BroadcastHandler";
import { link } from "./util";
import { BrowserTypes } from "@kite9/fdc3-schema";

type WebConnectionProtocol2LoadURL = BrowserTypes.WebConnectionProtocol2LoadURL

function createAppStartButton(app: DirectoryApp, sc: ServerContext<any>): HTMLDivElement {
function createAppStartButton(app: DirectoryApp, sc: ServerContext<AppRegistration>): HTMLDivElement {
const div = document.createElement("div") as HTMLDivElement
div.classList.add("app")
const h3 = document.createElement("h3")
Expand Down Expand Up @@ -55,8 +55,8 @@ window.addEventListener("load", () => {
socket.emit(DA_HELLO, desktopAgentUUID)

const directory = new FDC3_2_1_JSONDirectory()
//await directory.load("/static/da/appd.json")
await directory.load("/static/da/local-conformance-2_0.v2.json")
await directory.load("/static/da/appd.json")
//await directory.load("/static/da/local-conformance-2_0.v2.json")
//await directory.load("/static/da/training-appd.v2.json")
const sc = new DemoServerContext(socket, directory)

Expand All @@ -65,7 +65,7 @@ window.addEventListener("load", () => {
{ id: "two", type: ChannelType.user, context: [], displayMetadata: { name: "THE BLUE CHANNEL", color: "blue" } },
{ id: "three", type: ChannelType.user, context: [], displayMetadata: { name: "THE GREEN CHANNEL", color: "green" } }
]
const fdc3Server = new DefaultFDC3Server(sc, directory, channelDetails, false, 20000, 10000)
const fdc3Server = new DefaultFDC3Server(sc, directory, channelDetails, true, 20000, 10000)

socket.on(FDC3_APP_EVENT, (msg, from) => {
console.log(`App Event ${JSON.stringify(msg, null, 2)} from ${from}`)
Expand Down
10 changes: 5 additions & 5 deletions toolbox/fdc3-for-web/fdc3-web-impl/src/BasicFDC3Server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FDC3Server } from "./FDC3Server";
import { InstanceID, ServerContext } from "./ServerContext";
import { AppRegistration, InstanceID, ServerContext } from "./ServerContext";
import { BroadcastHandler, ChannelState } from "./handlers/BroadcastHandler";
import { IntentHandler } from "./handlers/IntentHandler";
import { Directory } from "./directory/DirectoryInterface";
Expand All @@ -15,7 +15,7 @@ export interface MessageHandler {
/**
* Handles an AgentRequestMessage from the messaging source
*/
accept(msg: any, sc: ServerContext<any>, from: InstanceID): void
accept(msg: any, sc: ServerContext<AppRegistration>, from: InstanceID): void

shutdown(): void
}
Expand All @@ -26,9 +26,9 @@ export interface MessageHandler {
export class BasicFDC3Server implements FDC3Server {

private handlers: MessageHandler[]
private sc: ServerContext<any>
private sc: ServerContext<AppRegistration>

constructor(handlers: MessageHandler[], sc: ServerContext<any>) {
constructor(handlers: MessageHandler[], sc: ServerContext<AppRegistration>) {
this.handlers = handlers
this.sc = sc;
}
Expand All @@ -45,7 +45,7 @@ export class BasicFDC3Server implements FDC3Server {

export class DefaultFDC3Server extends BasicFDC3Server {

constructor(sc: ServerContext<any>, directory: Directory, userChannels: ChannelState[], heartbeats: boolean, intentTimeoutMs: number = 20000, openHandlerTimeoutMs: number = 3000) {
constructor(sc: ServerContext<AppRegistration>, directory: Directory, userChannels: ChannelState[], heartbeats: boolean, intentTimeoutMs: number = 20000, openHandlerTimeoutMs: number = 3000) {
const handlers: MessageHandler[] = [
new BroadcastHandler(userChannels),
new IntentHandler(directory, intentTimeoutMs),
Expand Down
8 changes: 5 additions & 3 deletions toolbox/fdc3-for-web/fdc3-web-impl/src/ServerContext.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AppIntent, AppIdentifier } from "@kite9/fdc3-standard";
import { AppIntent } from "@kite9/fdc3-standard";
import { Context } from "@kite9/fdc3-context";

export enum State {
Expand All @@ -8,8 +8,10 @@ export enum State {
Terminated /* App has sent a termination message */
}

export type AppRegistration = AppIdentifier & {
state: State
export type AppRegistration = {
state: State,
appId: string;
instanceId: InstanceID
}

/**
Expand Down
32 changes: 16 additions & 16 deletions toolbox/fdc3-for-web/fdc3-web-impl/src/handlers/BroadcastHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MessageHandler } from "../BasicFDC3Server";
import { InstanceID, ServerContext } from "../ServerContext";
import { AppRegistration, InstanceID, ServerContext } from "../ServerContext";
import { Context } from "@kite9/fdc3-context";
import { AppIdentifier, ChannelError, DisplayMetadata } from "@kite9/fdc3-standard";
import { successResponse, errorResponse, onlyUnique } from "./support";
Expand Down Expand Up @@ -102,7 +102,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

accept(msg: any, sc: ServerContext<any>, uuid: InstanceID) {
accept(msg: any, sc: ServerContext<AppRegistration>, uuid: InstanceID) {
const from = sc.getInstanceDetails(uuid)

if (from == null) {
Expand Down Expand Up @@ -146,7 +146,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

handleCreatePrivateChannelRequest(arg0: CreatePrivateChannelRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleCreatePrivateChannelRequest(arg0: CreatePrivateChannelRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const id = sc.createUUID()
this.state.push({
id,
Expand All @@ -159,7 +159,7 @@ export class BroadcastHandler implements MessageHandler {
successResponse(sc, arg0, from, { privateChannel: { id, type: this.convertChannelTypeToString(ChannelType.private) } }, 'createPrivateChannelResponse')
}

handleGetCurrentContextRequest(arg0: GetCurrentContextRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleGetCurrentContextRequest(arg0: GetCurrentContextRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const channel = this.getChannelById(arg0.payload.channelId)
const type = arg0.payload.contextType

Expand All @@ -171,7 +171,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

handlePrivateChannelUnsubscribeEventListenerRequest(arg0: PrivateChannelUnsubscribeEventListenerRequest, sc: ServerContext<any>, from: AppIdentifier) {
handlePrivateChannelUnsubscribeEventListenerRequest(arg0: PrivateChannelUnsubscribeEventListenerRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const i = this.eventListeners.findIndex(r => r.listenerUuid == arg0.payload.listenerUUID)
if (i > -1) {
this.eventListeners.splice(i, 1)
Expand All @@ -181,7 +181,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

handlePrivateChannelDisconnectRequest(arg0: PrivateChannelDisconnectRequest, sc: ServerContext<any>, from: AppIdentifier) {
handlePrivateChannelDisconnectRequest(arg0: PrivateChannelDisconnectRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const toUnsubscribe = this.contextListeners
.filter(r => (r.appId == from.appId) && (r.instanceId == from.instanceId))
.filter(r => r.channelId == arg0.payload.channelId)
Expand All @@ -195,7 +195,7 @@ export class BroadcastHandler implements MessageHandler {
successResponse(sc, arg0, from, {}, 'privateChannelDisconnectResponse')
}

handleContextListenerUnsubscribeRequest(arg0: ContextListenerUnsubscribeRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleContextListenerUnsubscribeRequest(arg0: ContextListenerUnsubscribeRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const i = this.contextListeners
.findIndex(r => (r.listenerUuid == arg0.payload.listenerUUID) && (r.instanceId == from.instanceId))

Expand All @@ -210,7 +210,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

handleAddContextListenerRequest(arg0: AddContextListenerRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleAddContextListenerRequest(arg0: AddContextListenerRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
var channelId = null
var channelType = ChannelType.user

Expand Down Expand Up @@ -241,7 +241,7 @@ export class BroadcastHandler implements MessageHandler {

}

handleBroadcastRequest(arg0: BroadcastRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleBroadcastRequest(arg0: BroadcastRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const matchingListeners = this.contextListeners
.filter(r => r.channelId == arg0.payload.channelId)
.filter(r => r.contextType == null || r.contextType == arg0.payload.context.type)
Expand All @@ -268,7 +268,7 @@ export class BroadcastHandler implements MessageHandler {
successResponse(sc, arg0, from, {}, 'broadcastResponse')
}

handleGetCurrentChannelRequest(arg0: GetCurrentChannelRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleGetCurrentChannelRequest(arg0: GetCurrentChannelRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const currentChannel = this.getCurrentChannel(from)
if (currentChannel) {
successResponse(sc, arg0, from, {
Expand All @@ -283,7 +283,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

handleJoinUserChannelRequest(arg0: JoinUserChannelRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleJoinUserChannelRequest(arg0: JoinUserChannelRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
// check it's a user channel
const newChannel = this.getChannelById(arg0.payload.channelId)
if ((newChannel == null) || (newChannel.type != ChannelType.user)) {
Expand All @@ -297,7 +297,7 @@ export class BroadcastHandler implements MessageHandler {
successResponse(sc, arg0, from, {}, 'joinUserChannelResponse')
}

handleLeaveCurrentChannelRequest(arg0: LeaveCurrentChannelRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleLeaveCurrentChannelRequest(arg0: LeaveCurrentChannelRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const instanceId = from.instanceId ?? 'no-instance-id'
const currentChannel = this.currentChannel[instanceId]
if (currentChannel) {
Expand All @@ -307,7 +307,7 @@ export class BroadcastHandler implements MessageHandler {
successResponse(sc, arg0, from, {}, 'leaveCurrentChannelResponse')
}

handleGetOrCreateRequest(arg0: GetOrCreateChannelRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleGetOrCreateRequest(arg0: GetOrCreateChannelRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const id = arg0.payload.channelId
var channel = this.getChannelById(id)
if (channel) {
Expand All @@ -328,12 +328,12 @@ export class BroadcastHandler implements MessageHandler {
}


handleGetUserChannelsRequest(arg0: GetUserChannelsRequest, sc: ServerContext<any>, from: AppIdentifier) {
handleGetUserChannelsRequest(arg0: GetUserChannelsRequest, sc: ServerContext<AppRegistration>, from: AppIdentifier) {
const userChannels = this.state.filter(c => c.type == ChannelType.user)
successResponse(sc, arg0, from, { userChannels: userChannels.map(c => ({ id: c.id, type: this.convertChannelTypeToString(c.type), displayMetadata: c.displayMetadata })) }, 'getUserChannelsResponse')
}

handlePrivateChannelAddEventListenerRequest(arg0: PrivateChannelAddEventListenerRequest, from: AppIdentifier, sc: ServerContext<any>) {
handlePrivateChannelAddEventListenerRequest(arg0: PrivateChannelAddEventListenerRequest, from: AppIdentifier, sc: ServerContext<AppRegistration>) {
const channel = this.getChannelById(arg0.payload.privateChannelId)

if ((channel == null) || (channel.type != ChannelType.private)) {
Expand All @@ -351,7 +351,7 @@ export class BroadcastHandler implements MessageHandler {
}
}

invokeEventListeners(privateChannelId: string | null, eventType: PrivateChannelEventListenerTypes, messageType: NotificationAgentEventMessage, sc: ServerContext<any>, contextType?: string) {
invokeEventListeners(privateChannelId: string | null, eventType: PrivateChannelEventListenerTypes, messageType: NotificationAgentEventMessage, sc: ServerContext<AppRegistration>, contextType?: string) {
if (privateChannelId) {
const msg = {
type: messageType,
Expand Down
21 changes: 10 additions & 11 deletions toolbox/fdc3-for-web/fdc3-web-impl/src/handlers/HeartbeatHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@ export class HeartbeatHandler implements MessageHandler {

private readonly contexts: ServerContext<AppRegistration>[] = []
private readonly lastHeartbeats: Map<InstanceID, number> = new Map()
private readonly warnings: Set<InstanceID> = new Set()
private readonly timerFunction: NodeJS.Timeout

constructor(pingInterval: number = 1000, warnAfter: number = 5000, deadAfter: number = 10000) {
constructor(pingInterval: number = 1000, disconnectedAfter: number = 5000, deadAfter: number = 10000) {

this.timerFunction = setInterval(() => {
console.log(`Contexts: ${this.contexts.length} Last Heartbeats: `, this.heartbeatTimes())

this.contexts.forEach(async (sc) => {
const allAops = await sc.getAllApps()

console.log(`Last Heartbeats: `, this.heartbeatTimes())

allAops
.filter(app => (app.state == State.Connected) || (app.state == State.NotResponding))
.forEach(app => {
Expand All @@ -33,14 +31,14 @@ export class HeartbeatHandler implements MessageHandler {
if (lastHeartbeat != undefined) {
const timeSinceLastHeartbeat = now - lastHeartbeat

if (timeSinceLastHeartbeat < warnAfter) {
this.warnings.delete(app.instanceId!!)
} else if ((timeSinceLastHeartbeat > warnAfter) && (!this.warnings.has(app.instanceId!!))) {
console.warn(`No heartbeat from ${app.instanceId} for ${timeSinceLastHeartbeat}ms`)
this.warnings.add(app.instanceId!!)
} else if (timeSinceLastHeartbeat > deadAfter) {
if (timeSinceLastHeartbeat < disconnectedAfter) {
sc.setAppState(app.instanceId!!, State.Connected)
} else if ((timeSinceLastHeartbeat > disconnectedAfter) && (!this.warnings.has(app.instanceId!!))) {
console.error(`No heartbeat from ${app.instanceId} for ${timeSinceLastHeartbeat}ms. App is considered not responding.`)
sc.setAppState(app.instanceId!!, State.NotResponding)
} else if (timeSinceLastHeartbeat > deadAfter) {
console.error(`No heartbeat from ${app.instanceId} for ${timeSinceLastHeartbeat}ms. App is considered terminated.`)
sc.setAppState(app.instanceId!!, State.Terminated)
} else {
// no action
}
Expand Down Expand Up @@ -81,6 +79,7 @@ export class HeartbeatHandler implements MessageHandler {
const app = sc.getInstanceDetails(from)
if (app) {
sc.setAppState(from, State.Terminated)
this
}
}
}
Expand Down
Loading

0 comments on commit 49a62c4

Please sign in to comment.