diff --git a/packages/ott-vis-datasource/src/datasource.ts b/packages/ott-vis-datasource/src/datasource.ts index f44099502..eb7841f95 100644 --- a/packages/ott-vis-datasource/src/datasource.ts +++ b/packages/ott-vis-datasource/src/datasource.ts @@ -16,6 +16,7 @@ import { Observable, lastValueFrom, merge } from "rxjs"; export class DataSource extends DataSourceApi { baseUrl: string; + socket: WebSocket | null = null; constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); @@ -38,9 +39,12 @@ export class DataSource extends DataSourceApi { frame.addField({ name: "direction", type: FieldType.string }); const base = this.baseUrl.replace(/^http/, "ws"); - const ws = new WebSocket(`${base}/state/stream`); - ws.addEventListener("message", msg => { + const open = (e: Event) => { + console.log("WebSocket opened", e); + }; + + const message = (msg: MessageEvent) => { const event = JSON.parse(msg.data); frame.add(event); @@ -49,16 +53,57 @@ export class DataSource extends DataSourceApi { key: frame.refId, state: LoadingState.Streaming, }); - }); + }; - ws.addEventListener("error", err => { + const error = (err: Event) => { console.error("WebSocket error", err); subscriber.error(err); - }); + }; - ws.addEventListener("close", () => { - subscriber.complete(); - }); + const close = (e: CloseEvent) => { + // subscriber.complete(); + console.warn("WebSocket closed", e); + if (e.code === 4132) { + console.info("We aborted the connection."); + return; + } + console.log("Datasource reconnecting..."); + setTimeout(() => { + this.socket = new WebSocket(`${base}/state/stream`); + addListeners(this.socket); + }, 1000); + }; + + function addListeners(ws: WebSocket) { + ws.addEventListener("open", open); + ws.addEventListener("message", message); + ws.addEventListener("error", error); + ws.addEventListener("close", close); + } + function removeListeners(ws: WebSocket) { + ws.removeEventListener("open", open); + ws.removeEventListener("message", message); + ws.removeEventListener("error", error); + ws.removeEventListener("close", close); + } + + function teardown(this: DataSource) { + if (!this.socket) { + return; + } + this.socket.close(4132); + removeListeners(this.socket); + window.removeEventListener("beforeunload", teardown); + } + subscriber.add(teardown.bind(this)); + window.addEventListener("beforeunload", teardown); + + if (this.socket) { + teardown.call(this); + } + + this.socket = new WebSocket(`${base}/state/stream`); + addListeners(this.socket); }); } diff --git a/packages/ott-vis-panel/src/components/CorePanel.tsx b/packages/ott-vis-panel/src/components/CorePanel.tsx index 8e620faf7..2015a6ef1 100644 --- a/packages/ott-vis-panel/src/components/CorePanel.tsx +++ b/packages/ott-vis-panel/src/components/CorePanel.tsx @@ -32,13 +32,30 @@ const getStyles = () => { }; }; -export const CorePanel: React.FC = ({ options, data, width, height }) => { +export const CorePanel: React.FC = props => { + const { data } = props; + + if (data.state === LoadingState.Error) { + return ; + } + + return ; +}; + +/** + * Shown when the data source is in a nominal state. + */ +const CoreData: React.FC = ({ options, data, width, height }) => { const styles = useStyles2(getStyles); const stateSeries = data.series[0]; const eventBusSeries = data.series[1]; const eventBus = useEventBus(); + if (!stateSeries) { + console.log("No state series, data:", data); + } + const systemState: SystemState = useMemo(() => { return options.useSampleData ? sampleSystemState @@ -116,6 +133,20 @@ export const CorePanel: React.FC = ({ options, data, width, height }) => ); }; +/** + * Shown when the data source is in an error state. + */ +const CoreError: React.FC> = ({ data }) => { + return ( +
+ Errors: + {data.errors?.map((e, i) => ( +
{e.message}
+ ))} +
+ ); +}; + const Loading: React.FC = () => { return
Loading...
; };