diff --git a/README.md b/README.md
index 4032c58..5706852 100644
--- a/README.md
+++ b/README.md
@@ -66,3 +66,6 @@ npm publish --access public
git push
git push --tags
```
+
+Domains Diagram:
+![diagram_encapuslated.svg](images%2Fdiagram_encapuslated.svg)
diff --git a/images/diagram_encapuslated.svg b/images/diagram_encapuslated.svg
new file mode 100644
index 0000000..bad2f68
--- /dev/null
+++ b/images/diagram_encapuslated.svg
@@ -0,0 +1,17 @@
+
\ No newline at end of file
diff --git a/package.json b/package.json
index 66c6d91..f9303d9 100644
--- a/package.json
+++ b/package.json
@@ -53,6 +53,16 @@
"ts-node": "^10.9.1",
"tsconfig-paths": "^3.9.0",
"typedoc": "^0.23.21",
- "typescript": "^4.9.3"
+ "typescript": "^4.9.3",
+ "@fast-check/jest": "^1.1.0"
+ },
+ "dependencies": {
+ "@matrixai/async-init": "^1.9.4",
+ "@matrixai/contexts": "^1.2.0",
+ "@matrixai/logger": "^3.1.0",
+ "@matrixai/errors": "^1.2.0",
+ "@matrixai/events": "^3.2.0",
+ "@streamparser/json": "^0.0.17",
+ "ix": "^5.0.0"
}
}
diff --git a/src/RPCClient.ts b/src/RPCClient.ts
new file mode 100644
index 0000000..6d19363
--- /dev/null
+++ b/src/RPCClient.ts
@@ -0,0 +1,592 @@
+import type { WritableStream, ReadableStream } from 'stream/web';
+import type { ContextTimedInput } from '@matrixai/contexts';
+import type {
+ HandlerType,
+ JSONRPCRequestMessage,
+ StreamFactory,
+ ClientManifest,
+ RPCStream,
+ JSONRPCResponseResult,
+} from './types';
+import type { JSONValue, IdGen } from './types';
+import type {
+ JSONRPCRequest,
+ JSONRPCResponse,
+ MiddlewareFactory,
+ MapCallers,
+} from './types';
+import type { ErrorRPCRemote } from './errors';
+import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
+import Logger from '@matrixai/logger';
+import { Timer } from '@matrixai/timer';
+import { createDestroy } from '@matrixai/async-init';
+import * as rpcUtilsMiddleware from './utils/middleware';
+import * as rpcErrors from './errors';
+import * as rpcUtils from './utils/utils';
+import { promise } from './utils';
+import { ErrorRPCStreamEnded, never } from './errors';
+import * as events from './events';
+
+const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol');
+
+/**
+ * Events:
+ * - {@link events.Event}
+ */
+interface RPCClient
+ extends createDestroy.CreateDestroy {}
+/**
+ * You must provide an error handler `addEventListener('error')`.
+ * Otherwise, errors will just be ignored.
+ *
+ * Events:
+ * - {@link events.EventRPCClientDestroy}
+ * - {@link events.EventRPCClientDestroyed}
+ */
+@createDestroy.CreateDestroy({
+ eventDestroy: events.EventRPCClientDestroy,
+ eventDestroyed: events.EventRPCClientDestroyed,
+})
+class RPCClient {
+ /**
+ * @param obj
+ * @param obj.manifest - Client manifest that defines the types for the rpc
+ * methods.
+ * @param obj.streamFactory - An arrow function that when called, creates a
+ * new stream for each rpc method call.
+ * @param obj.middlewareFactory - Middleware used to process the rpc messages.
+ * The middlewareFactory needs to be a function that creates a pair of
+ * transform streams that convert `JSONRPCRequest` to `Uint8Array` on the forward
+ * path and `Uint8Array` to `JSONRPCResponse` on the reverse path.
+ * @param obj.streamKeepAliveTimeoutTime - Timeout time used if no timeout timer was provided when making a call.
+ * Defaults to 60,000 milliseconds.
+ * for a client call.
+ * @param obj.logger
+ */
+ static async createRPCClient({
+ manifest,
+ streamFactory,
+ middlewareFactory = rpcUtilsMiddleware.defaultClientMiddlewareWrapper(),
+ streamKeepAliveTimeoutTime = Infinity, // 1 minute
+ logger = new Logger(this.name),
+ idGen = () => Promise.resolve(null),
+ }: {
+ manifest: M;
+ streamFactory: StreamFactory;
+ middlewareFactory?: MiddlewareFactory<
+ Uint8Array,
+ JSONRPCRequest,
+ JSONRPCResponse,
+ Uint8Array
+ >;
+ streamKeepAliveTimeoutTime?: number;
+ logger?: Logger;
+ idGen: IdGen;
+ toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote;
+ }) {
+ logger.info(`Creating ${this.name}`);
+ const rpcClient = new this({
+ manifest,
+ streamFactory,
+ middlewareFactory,
+ streamKeepAliveTimeoutTime: streamKeepAliveTimeoutTime,
+ logger,
+ idGen,
+ });
+ logger.info(`Created ${this.name}`);
+ return rpcClient;
+ }
+ protected onTimeoutCallback?: () => void;
+ protected idGen: IdGen;
+ protected logger: Logger;
+ protected streamFactory: StreamFactory;
+ protected middlewareFactory: MiddlewareFactory<
+ Uint8Array,
+ JSONRPCRequest,
+ JSONRPCResponse,
+ Uint8Array
+ >;
+ protected callerTypes: Record;
+ toError: (errorData: any, metadata?: JSONValue) => Error;
+ public registerOnTimeoutCallback(callback: () => void) {
+ this.onTimeoutCallback = callback;
+ }
+ // Method proxies
+ public readonly streamKeepAliveTimeoutTime: number;
+ public readonly methodsProxy = new Proxy(
+ {},
+ {
+ get: (_, method) => {
+ if (typeof method === 'symbol') return;
+ switch (this.callerTypes[method]) {
+ case 'UNARY':
+ return (params, ctx) => this.unaryCaller(method, params, ctx);
+ case 'SERVER':
+ return (params, ctx) =>
+ this.serverStreamCaller(method, params, ctx);
+ case 'CLIENT':
+ return (ctx) => this.clientStreamCaller(method, ctx);
+ case 'DUPLEX':
+ return (ctx) => this.duplexStreamCaller(method, ctx);
+ case 'RAW':
+ return (header, ctx) => this.rawStreamCaller(method, header, ctx);
+ default:
+ return;
+ }
+ },
+ },
+ );
+
+ public constructor({
+ manifest,
+ streamFactory,
+ middlewareFactory,
+ streamKeepAliveTimeoutTime,
+ logger,
+ idGen = () => Promise.resolve(null),
+ toError,
+ }: {
+ manifest: M;
+ streamFactory: StreamFactory;
+ middlewareFactory: MiddlewareFactory<
+ Uint8Array,
+ JSONRPCRequest,
+ JSONRPCResponse,
+ Uint8Array
+ >;
+ streamKeepAliveTimeoutTime: number;
+ logger: Logger;
+ idGen: IdGen;
+ toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote;
+ }) {
+ this.idGen = idGen;
+ this.callerTypes = rpcUtils.getHandlerTypes(manifest);
+ this.streamFactory = streamFactory;
+ this.middlewareFactory = middlewareFactory;
+ this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
+ this.logger = logger;
+ this.toError = toError || rpcUtils.toError;
+ }
+
+ public async destroy({
+ errorCode = rpcErrors.JSONRPCErrorCode.RPCStopping,
+ errorMessage = '',
+ force = true,
+ }: {
+ errorCode?: number;
+ errorMessage?: string;
+ force?: boolean;
+ } = {}): Promise {
+ this.logger.info(`Destroying ${this.constructor.name}`);
+
+ // You can dispatch an event before the actual destruction starts
+ this.dispatchEvent(new events.EventRPCClientDestroy());
+
+ // Dispatch an event after the client has been destroyed
+ this.dispatchEvent(new events.EventRPCClientDestroyed());
+
+ this.logger.info(`Destroyed ${this.constructor.name}`);
+ }
+
+ @ready(new rpcErrors.ErrorRPCCallerFailed())
+ public get methods(): MapCallers {
+ return this.methodsProxy as MapCallers;
+ }
+
+ /**
+ * Generic caller for unary RPC calls.
+ * This returns the response in the provided type. No validation is done so
+ * make sure the types match the handler types.
+ * @param method - Method name of the RPC call
+ * @param parameters - Parameters to be provided with the RPC message. Matches
+ * the provided I type.
+ * @param ctx - ContextTimed used for timeouts and cancellation.
+ */
+ @ready(new rpcErrors.ErrorMissingCaller())
+ public async unaryCaller(
+ method: string,
+ parameters: I,
+ ctx: Partial = {},
+ ): Promise {
+ const callerInterface = await this.duplexStreamCaller(method, ctx);
+ const reader = callerInterface.readable.getReader();
+ const writer = callerInterface.writable.getWriter();
+ try {
+ await writer.write(parameters);
+ const output = await reader.read();
+ if (output.done) {
+ throw new rpcErrors.ErrorMissingCaller('Missing response', {
+ cause: ctx.signal?.reason,
+ });
+ }
+ await reader.cancel();
+ await writer.close();
+ return output.value;
+ } finally {
+ // Attempt clean up, ignore errors if already cleaned up
+ await reader.cancel().catch(() => {});
+ await writer.close().catch(() => {});
+ }
+ }
+
+ /**
+ * Generic caller for server streaming RPC calls.
+ * This returns a ReadableStream of the provided type. When finished, the
+ * readable needs to be cleaned up, otherwise cleanup happens mostly
+ * automatically.
+ * @param method - Method name of the RPC call
+ * @param parameters - Parameters to be provided with the RPC message. Matches
+ * the provided I type.
+ * @param ctx - ContextTimed used for timeouts and cancellation.
+ */
+ @ready(new rpcErrors.ErrorRPCCallerFailed())
+ public async serverStreamCaller(
+ method: string,
+ parameters: I,
+ ctx: Partial = {},
+ ): Promise> {
+ const callerInterface = await this.duplexStreamCaller(method, ctx);
+ const writer = callerInterface.writable.getWriter();
+ try {
+ await writer.write(parameters);
+ await writer.close();
+ } catch (e) {
+ // Clean up if any problems, ignore errors if already closed
+ await callerInterface.readable.cancel(e);
+ throw e;
+ }
+ return callerInterface.readable;
+ }
+
+ /**
+ * Generic caller for Client streaming RPC calls.
+ * This returns a WritableStream for writing the input to and a Promise that
+ * resolves when the output is received.
+ * When finished the writable stream must be ended. Failing to do so will
+ * hold the connection open and result in a resource leak until the
+ * call times out.
+ * @param method - Method name of the RPC call
+ * @param ctx - ContextTimed used for timeouts and cancellation.
+ */
+ @ready(new rpcErrors.ErrorRPCCallerFailed())
+ public async clientStreamCaller(
+ method: string,
+ ctx: Partial = {},
+ ): Promise<{
+ output: Promise;
+ writable: WritableStream;
+ }> {
+ const callerInterface = await this.duplexStreamCaller(method, ctx);
+ const reader = callerInterface.readable.getReader();
+ const output = reader.read().then(({ value, done }) => {
+ if (done) {
+ throw new rpcErrors.ErrorMissingCaller('Missing response', {
+ cause: ctx.signal?.reason,
+ });
+ }
+ return value;
+ });
+ return {
+ output,
+ writable: callerInterface.writable,
+ };
+ }
+
+ /**
+ * Generic caller for duplex RPC calls.
+ * This returns a `ReadableWritablePair` of the types specified. No validation
+ * is applied to these types so make sure they match the types of the handler
+ * you are calling.
+ * When finished the streams must be ended manually. Failing to do so will
+ * hold the connection open and result in a resource leak until the
+ * call times out.
+ * @param method - Method name of the RPC call
+ * @param ctx - ContextTimed used for timeouts and cancellation.
+ */
+ @ready(new rpcErrors.ErrorRPCCallerFailed())
+ public async duplexStreamCaller(
+ method: string,
+ ctx: Partial = {},
+ ): Promise> {
+ // Setting up abort signal and timer
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ // A promise that will reject if there is an abort signal or timeout
+ const abortRaceProm = promise();
+ // Prevent unhandled rejection when we're done with the promise
+ abortRaceProm.p.catch(() => {});
+ const abortRacePromHandler = () => {
+ abortRaceProm.rejectP(signal.reason);
+ };
+ signal.addEventListener('abort', abortRacePromHandler);
+
+ let abortHandler: () => void;
+ if (ctx.signal != null) {
+ // Propagate signal events
+ abortHandler = () => {
+ abortController.abort(ctx.signal?.reason);
+ };
+ if (ctx.signal.aborted) abortHandler();
+ ctx.signal.addEventListener('abort', abortHandler);
+ }
+ let timer: Timer;
+ if (!(ctx.timer instanceof Timer)) {
+ timer = new Timer({
+ delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
+ });
+ } else {
+ timer = ctx.timer;
+ }
+ const cleanUp = () => {
+ // Clean up the timer and signal
+ if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
+ if (ctx.signal != null) {
+ ctx.signal.removeEventListener('abort', abortHandler);
+ }
+ signal.addEventListener('abort', abortRacePromHandler);
+ };
+ // Setting up abort events for timeout
+ const timeoutError = new rpcErrors.ErrorRPCTimedOut(
+ 'Error RPC has timed out',
+ { cause: ctx.signal?.reason },
+ );
+ void timer.then(
+ () => {
+ abortController.abort(timeoutError);
+ if (this.onTimeoutCallback) {
+ this.onTimeoutCallback();
+ }
+ },
+ () => {}, // Ignore cancellation error
+ );
+
+ // Hooking up agnostic stream side
+ let rpcStream: RPCStream;
+ const streamFactoryProm = this.streamFactory({ signal, timer });
+ try {
+ rpcStream = await Promise.race([streamFactoryProm, abortRaceProm.p]);
+ } catch (e) {
+ cleanUp();
+ void streamFactoryProm.then((stream) =>
+ stream.cancel(ErrorRPCStreamEnded),
+ );
+ throw e;
+ }
+ void timer.then(
+ () => {
+ rpcStream.cancel(
+ new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
+ cause: ctx.signal?.reason,
+ }),
+ );
+ },
+ () => {}, // Ignore cancellation error
+ );
+ // Deciding if we want to allow refreshing
+ // We want to refresh timer if none was provided
+ const refreshingTimer: Timer | undefined =
+ ctx.timer == null ? timer : undefined;
+ // Composing stream transforms and middleware
+ const metadata = {
+ ...(rpcStream.meta ?? {}),
+ command: method,
+ };
+ const outputMessageTransformStream =
+ rpcUtils.clientOutputTransformStream(metadata, refreshingTimer);
+ const inputMessageTransformStream = rpcUtils.clientInputTransformStream(
+ method,
+ refreshingTimer,
+ );
+ const middleware = this.middlewareFactory(
+ { signal, timer },
+ rpcStream.cancel,
+ metadata,
+ );
+ // This `Promise.allSettled` is used to asynchronously track the state
+ // of the streams. When both have finished we can clean up resources.
+ void Promise.allSettled([
+ rpcStream.readable
+ .pipeThrough(middleware.reverse)
+ .pipeTo(outputMessageTransformStream.writable)
+ // Ignore any errors, we only care about stream ending
+ .catch(() => {}),
+ inputMessageTransformStream.readable
+ .pipeThrough(middleware.forward)
+ .pipeTo(rpcStream.writable)
+ // Ignore any errors, we only care about stream ending
+ .catch(() => {}),
+ ]).finally(() => {
+ cleanUp();
+ });
+
+ // Returning interface
+ return {
+ readable: outputMessageTransformStream.readable,
+ writable: inputMessageTransformStream.writable,
+ cancel: rpcStream.cancel,
+ meta: metadata,
+ };
+ }
+
+ /**
+ * Generic caller for raw RPC calls.
+ * This returns a `ReadableWritablePair` of the raw RPC stream.
+ * When finished the streams must be ended manually. Failing to do so will
+ * hold the connection open and result in a resource leak until the
+ * call times out.
+ * Raw streams don't support the keep alive timeout. Timeout will only apply\
+ * to the creation of the stream.
+ * @param method - Method name of the RPC call
+ * @param headerParams - Parameters for the header message. The header is a
+ * single RPC message that is sent to specify the method for the RPC call.
+ * Any metadata of extra parameters is provided here.
+ * @param ctx - ContextTimed used for timeouts and cancellation.
+ * @param id - Id is generated only once, and used throughout the stream for the rest of the communication
+ */
+ @ready(new rpcErrors.ErrorRPCCallerFailed())
+ public async rawStreamCaller(
+ method: string,
+ headerParams: JSONValue,
+ ctx: Partial = {},
+ ): Promise<
+ RPCStream<
+ Uint8Array,
+ Uint8Array,
+ Record & { result: JSONValue; command: string }
+ >
+ > {
+ // Setting up abort signal and timer
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ // A promise that will reject if there is an abort signal or timeout
+ const abortRaceProm = promise();
+ // Prevent unhandled rejection when we're done with the promise
+ abortRaceProm.p.catch(() => {});
+ const abortRacePromHandler = () => {
+ abortRaceProm.rejectP(signal.reason);
+ };
+ signal.addEventListener('abort', abortRacePromHandler);
+
+ let abortHandler: () => void;
+ if (ctx.signal != null) {
+ // Propagate signal events
+ abortHandler = () => {
+ abortController.abort(ctx.signal?.reason);
+ };
+ if (ctx.signal.aborted) abortHandler();
+ ctx.signal.addEventListener('abort', abortHandler);
+ }
+ let timer: Timer;
+ if (!(ctx.timer instanceof Timer)) {
+ timer = new Timer({
+ delay: ctx.timer ?? this.streamKeepAliveTimeoutTime,
+ });
+ } else {
+ timer = ctx.timer;
+ }
+ const cleanUp = () => {
+ // Clean up the timer and signal
+ if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
+ if (ctx.signal != null) {
+ ctx.signal.removeEventListener('abort', abortHandler);
+ }
+ signal.addEventListener('abort', abortRacePromHandler);
+ };
+ // Setting up abort events for timeout
+ const timeoutError = new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
+ cause: ctx.signal?.reason,
+ });
+ void timer.then(
+ () => {
+ abortController.abort(timeoutError);
+ },
+ () => {}, // Ignore cancellation error
+ );
+
+ const setupStream = async (): Promise<
+ [JSONValue, RPCStream]
+ > => {
+ if (signal.aborted) throw signal.reason;
+ const abortProm = promise();
+ // Ignore error if orphaned
+ void abortProm.p.catch(() => {});
+ signal.addEventListener(
+ 'abort',
+ () => {
+ abortProm.rejectP(signal.reason);
+ },
+ { once: true },
+ );
+ const rpcStream = await Promise.race([
+ this.streamFactory({ signal, timer }),
+ abortProm.p,
+ ]);
+ const tempWriter = rpcStream.writable.getWriter();
+ const id = await this.idGen();
+ const header: JSONRPCRequestMessage = {
+ jsonrpc: '2.0',
+ method,
+ params: headerParams,
+ id,
+ };
+ await tempWriter.write(Buffer.from(JSON.stringify(header)));
+ tempWriter.releaseLock();
+ const headTransformStream = rpcUtils.parseHeadStream(
+ rpcUtils.parseJSONRPCResponse,
+ );
+ void rpcStream.readable
+ // Allow us to re-use the readable after reading the first message
+ .pipeTo(headTransformStream.writable)
+ // Ignore any errors here, we only care that it ended
+ .catch(() => {});
+ const tempReader = headTransformStream.readable.getReader();
+ let leadingMessage: JSONRPCResponseResult;
+ try {
+ const message = await Promise.race([tempReader.read(), abortProm.p]);
+ const messageValue = message.value as JSONRPCResponse;
+ if (message.done) never();
+ if ('error' in messageValue) {
+ const metadata = {
+ ...(rpcStream.meta ?? {}),
+ command: method,
+ };
+ throw this.toError(messageValue.error.data, metadata);
+ }
+ leadingMessage = messageValue;
+ } catch (e) {
+ rpcStream.cancel(
+ new ErrorRPCStreamEnded('RPC Stream Ended', { cause: e }),
+ );
+ throw e;
+ }
+ tempReader.releaseLock();
+ const newRpcStream: RPCStream = {
+ writable: rpcStream.writable,
+ readable: headTransformStream.readable as ReadableStream,
+ cancel: rpcStream.cancel,
+ meta: rpcStream.meta,
+ };
+ return [leadingMessage.result, newRpcStream];
+ };
+ let streamCreation: [JSONValue, RPCStream];
+ try {
+ streamCreation = await setupStream();
+ } finally {
+ cleanUp();
+ }
+ const [result, rpcStream] = streamCreation;
+ const metadata = {
+ ...(rpcStream.meta ?? {}),
+ result,
+ command: method,
+ };
+ return {
+ writable: rpcStream.writable,
+ readable: rpcStream.readable,
+ cancel: rpcStream.cancel,
+ meta: metadata,
+ };
+ }
+}
+
+export default RPCClient;
diff --git a/src/RPCServer.ts b/src/RPCServer.ts
new file mode 100644
index 0000000..71d8966
--- /dev/null
+++ b/src/RPCServer.ts
@@ -0,0 +1,662 @@
+import type { ReadableStreamDefaultReadResult } from 'stream/web';
+import type {
+ ClientHandlerImplementation,
+ DuplexHandlerImplementation,
+ JSONRPCError,
+ JSONRPCRequest,
+ JSONRPCResponse,
+ JSONRPCResponseError,
+ JSONRPCResponseResult,
+ ServerManifest,
+ RawHandlerImplementation,
+ ServerHandlerImplementation,
+ UnaryHandlerImplementation,
+ RPCStream,
+ MiddlewareFactory,
+} from './types';
+import type { JSONValue } from './types';
+import type { IdGen } from './types';
+import { ReadableStream, TransformStream } from 'stream/web';
+import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
+import Logger from '@matrixai/logger';
+import { PromiseCancellable } from '@matrixai/async-cancellable';
+import { Timer } from '@matrixai/timer';
+import { createDestroy } from '@matrixai/async-init';
+import { RawHandler } from './handlers';
+import { DuplexHandler } from './handlers';
+import { ServerHandler } from './handlers';
+import { UnaryHandler } from './handlers';
+import { ClientHandler } from './handlers';
+import * as rpcEvents from './events';
+import * as rpcUtils from './utils';
+import * as rpcErrors from './errors';
+import * as rpcUtilsMiddleware from './utils';
+import { ErrorHandlerAborted, JSONRPCErrorCode, never } from './errors';
+import * as events from './events';
+
+const cleanupReason = Symbol('CleanupReason');
+
+/**
+ * You must provide a error handler `addEventListener('error')`.
+ * Otherwise errors will just be ignored.
+ *
+ * Events:
+ * - error
+ */
+interface RPCServer extends createDestroy.CreateDestroy {}
+/**
+ * You must provide an error handler `addEventListener('error')`.
+ * Otherwise, errors will just be ignored.
+ *
+ * Events:
+ * - {@link events.EventRPCServerDestroy}
+ * - {@link events.EventRPCServerDestroyed}
+ */
+@createDestroy.CreateDestroy({
+ eventDestroy: events.EventRPCServerDestroy,
+ eventDestroyed: events.EventRPCServerDestroyed,
+})
+class RPCServer extends EventTarget {
+ /**
+ * Creates RPC server.
+
+ * @param obj
+ * @param obj.manifest - Server manifest used to define the rpc method
+ * handlers.
+ * @param obj.middlewareFactory - Middleware used to process the rpc messages.
+ * The middlewareFactory needs to be a function that creates a pair of
+ * transform streams that convert `Uint8Array` to `JSONRPCRequest` on the forward
+ * path and `JSONRPCResponse` to `Uint8Array` on the reverse path.
+ * @param obj.sensitive - If true, sanitises any rpc error messages of any
+ * sensitive information.
+ * @param obj.streamKeepAliveTimeoutTime - Time before a connection is cleaned up due to no activity. This is the
+ * value used if the handler doesn't specify its own timeout time. This timeout is advisory and only results in a
+ * signal sent to the handler. Stream is forced to end after the timeoutForceCloseTime. Defaults to 60,000
+ * milliseconds.
+ * @param obj.timeoutForceCloseTime - Time before the stream is forced to end after the initial timeout time.
+ * The stream will be forced to close after this amount of time after the initial timeout. This is a grace period for
+ * the handler to handle timeout before it is forced to end. Defaults to 2,000 milliseconds.
+ * @param obj.logger
+ */
+ public static async createRPCServer({
+ manifest,
+ middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(),
+ sensitive = false,
+ handlerTimeoutTime = Infinity, // 1 minute
+ logger = new Logger(this.name),
+ idGen = () => Promise.resolve(null),
+ fromError = rpcUtils.fromError,
+ replacer = rpcUtils.replacer,
+ }: {
+ manifest: ServerManifest;
+ middlewareFactory?: MiddlewareFactory<
+ JSONRPCRequest,
+ Uint8Array,
+ Uint8Array,
+ JSONRPCResponse
+ >;
+ sensitive?: boolean;
+ handlerTimeoutTime?: number;
+ logger?: Logger;
+ idGen: IdGen;
+ fromError?: (error: Error) => JSONValue;
+ replacer?: (key: string, value: any) => any;
+ }): Promise {
+ logger.info(`Creating ${this.name}`);
+ const rpcServer = new this({
+ manifest,
+ middlewareFactory,
+ sensitive,
+ handlerTimeoutTime,
+ logger,
+ idGen,
+ fromError,
+ replacer,
+ });
+ logger.info(`Created ${this.name}`);
+ return rpcServer;
+ }
+ protected onTimeoutCallback?: () => void;
+ protected idGen: IdGen;
+ protected logger: Logger;
+ protected handlerMap: Map = new Map();
+ protected defaultTimeoutMap: Map = new Map();
+ protected handlerTimeoutTime: number;
+ protected activeStreams: Set> = new Set();
+ protected sensitive: boolean;
+ protected fromError: (error: Error, sensitive?: boolean) => JSONValue;
+ protected replacer: (key: string, value: any) => any;
+ protected middlewareFactory: MiddlewareFactory<
+ JSONRPCRequest,
+ Uint8Array,
+ Uint8Array,
+ JSONRPCResponseResult
+ >;
+ // Function to register a callback for timeout
+ public registerOnTimeoutCallback(callback: () => void) {
+ this.onTimeoutCallback = callback;
+ }
+ public constructor({
+ manifest,
+ middlewareFactory,
+ sensitive,
+ handlerTimeoutTime = Infinity, // 1 minuet
+ logger,
+ idGen = () => Promise.resolve(null),
+ fromError = rpcUtils.fromError,
+ replacer = rpcUtils.replacer,
+ }: {
+ manifest: ServerManifest;
+
+ middlewareFactory: MiddlewareFactory<
+ JSONRPCRequest,
+ Uint8Array,
+ Uint8Array,
+ JSONRPCResponseResult
+ >;
+ handlerTimeoutTime?: number;
+ sensitive: boolean;
+ logger: Logger;
+ idGen: IdGen;
+ fromError?: (error: Error) => JSONValue;
+ replacer?: (key: string, value: any) => any;
+ }) {
+ super();
+ for (const [key, manifestItem] of Object.entries(manifest)) {
+ if (manifestItem instanceof RawHandler) {
+ this.registerRawStreamHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ if (manifestItem instanceof DuplexHandler) {
+ this.registerDuplexStreamHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ if (manifestItem instanceof ServerHandler) {
+ this.registerServerStreamHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ if (manifestItem instanceof ClientHandler) {
+ this.registerClientStreamHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ if (manifestItem instanceof ClientHandler) {
+ this.registerClientStreamHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ if (manifestItem instanceof UnaryHandler) {
+ this.registerUnaryHandler(
+ key,
+ manifestItem.handle,
+ manifestItem.timeout,
+ );
+ continue;
+ }
+ never();
+ }
+ this.idGen = idGen;
+ this.middlewareFactory = middlewareFactory;
+ this.sensitive = sensitive;
+ this.handlerTimeoutTime = handlerTimeoutTime;
+ this.logger = logger;
+ this.fromError = fromError || rpcUtils.fromError;
+ this.replacer = replacer || rpcUtils.replacer;
+ }
+
+ public async destroy(force: boolean = true): Promise {
+ // Log and dispatch an event before starting the destruction
+ this.logger.info(`Destroying ${this.constructor.name}`);
+ this.dispatchEvent(new events.EventRPCServerDestroy());
+
+ // Your existing logic for stopping active streams and other cleanup
+ if (force) {
+ for await (const [activeStream] of this.activeStreams.entries()) {
+ activeStream.cancel(new rpcErrors.ErrorRPCStopping());
+ }
+ }
+
+ for await (const [activeStream] of this.activeStreams.entries()) {
+ await activeStream;
+ }
+
+ // Log and dispatch an event after the destruction has been completed
+ this.dispatchEvent(new events.EventRPCServerDestroyed());
+ this.logger.info(`Destroyed ${this.constructor.name}`);
+ }
+
+ /**
+ * Registers a raw stream handler. This is the basis for all handlers as
+ * handling the streams is done with raw streams only.
+ * The raw streams do not automatically refresh the timeout timer when
+ * messages are sent or received.
+ */
+ protected registerRawStreamHandler(
+ method: string,
+ handler: RawHandlerImplementation,
+ timeout: number | undefined,
+ ) {
+ this.handlerMap.set(method, handler);
+ this.defaultTimeoutMap.set(method, timeout);
+ }
+
+ /**
+ * Registers a duplex stream handler.
+ * This handles all message parsing and conversion from generators
+ * to raw streams.
+ *
+ * @param method - The rpc method name.
+ * @param handler - The handler takes an input async iterable and returns an output async iterable.
+ * @param timeout
+ */
+ /**
+ * The ID is generated only once when the function is called and stored in the id variable.
+ * the ID is associated with the entire stream
+ * Every response (whether successful or an error) produced within this stream will have the
+ * same ID, which is consistent with the originating request.
+ */
+ protected registerDuplexStreamHandler<
+ I extends JSONValue,
+ O extends JSONValue,
+ >(
+ method: string,
+ handler: DuplexHandlerImplementation,
+ timeout: number | undefined,
+ ): void {
+ const rawSteamHandler: RawHandlerImplementation = async (
+ [header, input],
+ cancel,
+ meta,
+ ctx,
+ ) => {
+ // Setting up abort controller
+ const abortController = new AbortController();
+ if (ctx.signal.aborted) abortController.abort(ctx.signal.reason);
+ ctx.signal.addEventListener('abort', () => {
+ abortController.abort(ctx.signal.reason);
+ });
+ const signal = abortController.signal;
+ // Setting up middleware
+ const middleware = this.middlewareFactory(ctx, cancel, meta);
+ // Forward from the client to the server
+ // Transparent TransformStream that re-inserts the header message into the
+ // stream.
+ const headerStream = new TransformStream({
+ start(controller) {
+ controller.enqueue(Buffer.from(JSON.stringify(header)));
+ },
+ transform(chunk, controller) {
+ controller.enqueue(chunk);
+ },
+ });
+ const forwardStream = input
+ .pipeThrough(headerStream)
+ .pipeThrough(middleware.forward);
+ // Reverse from the server to the client
+ const reverseStream = middleware.reverse.writable;
+ // Generator derived from handler
+ const id = await this.idGen();
+ const outputGen = async function* (): AsyncGenerator {
+ if (signal.aborted) throw signal.reason;
+ // Input generator derived from the forward stream
+ const inputGen = async function* (): AsyncIterable {
+ for await (const data of forwardStream) {
+ ctx.timer.refresh();
+ yield data.params as I;
+ }
+ };
+ const handlerG = handler(inputGen(), cancel, meta, {
+ signal,
+ timer: ctx.timer,
+ });
+ for await (const response of handlerG) {
+ ctx.timer.refresh();
+ const responseMessage: JSONRPCResponseResult = {
+ jsonrpc: '2.0',
+ result: response,
+ id,
+ };
+ yield responseMessage;
+ }
+ };
+ const outputGenerator = outputGen();
+ const reverseMiddlewareStream = new ReadableStream({
+ pull: async (controller) => {
+ try {
+ const { value, done } = await outputGenerator.next();
+ if (done) {
+ controller.close();
+ return;
+ }
+ controller.enqueue(value);
+ } catch (e) {
+ const rpcError: JSONRPCError = {
+ code: e.exitCode ?? JSONRPCErrorCode.InternalError,
+ message: e.description ?? '',
+ data: JSON.stringify(this.fromError(e), this.replacer),
+ };
+ const rpcErrorMessage: JSONRPCResponseError = {
+ jsonrpc: '2.0',
+ error: rpcError,
+ id,
+ };
+ controller.enqueue(rpcErrorMessage);
+ // Clean up the input stream here, ignore error if already ended
+ await forwardStream
+ .cancel(
+ new rpcErrors.ErrorRPCHandlerFailed('Error clean up', {
+ cause: e,
+ }),
+ )
+ .catch(() => {});
+ controller.close();
+ }
+ },
+ cancel: async (reason) => {
+ this.dispatchEvent(
+ new rpcEvents.RPCErrorEvent({
+ detail: new rpcErrors.ErrorRPCStreamEnded(
+ 'Stream has been cancelled',
+ {
+ cause: reason,
+ },
+ ),
+ }),
+ );
+ // Abort with the reason
+ abortController.abort(reason);
+ // If the output stream path fails then we need to end the generator
+ // early.
+ await outputGenerator.return(undefined);
+ },
+ });
+ // Ignore any errors here, it should propagate to the ends of the stream
+ void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
+ return [undefined, middleware.reverse.readable];
+ };
+ this.registerRawStreamHandler(method, rawSteamHandler, timeout);
+ }
+
+ protected registerUnaryHandler(
+ method: string,
+ handler: UnaryHandlerImplementation,
+ timeout: number | undefined,
+ ) {
+ const wrapperDuplex: DuplexHandlerImplementation = async function* (
+ input,
+ cancel,
+ meta,
+ ctx,
+ ) {
+ // The `input` is expected to be an async iterable with only 1 value.
+ // Unlike generators, there is no `next()` method.
+ // So we use `break` after the first iteration.
+ for await (const inputVal of input) {
+ yield await handler(inputVal, cancel, meta, ctx);
+ break;
+ }
+ };
+ this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
+ }
+
+ protected registerServerStreamHandler<
+ I extends JSONValue,
+ O extends JSONValue,
+ >(
+ method: string,
+ handler: ServerHandlerImplementation,
+ timeout: number | undefined,
+ ) {
+ const wrapperDuplex: DuplexHandlerImplementation = async function* (
+ input,
+ cancel,
+ meta,
+ ctx,
+ ) {
+ for await (const inputVal of input) {
+ yield* handler(inputVal, cancel, meta, ctx);
+ break;
+ }
+ };
+ this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
+ }
+
+ protected registerClientStreamHandler<
+ I extends JSONValue,
+ O extends JSONValue,
+ >(
+ method: string,
+ handler: ClientHandlerImplementation,
+ timeout: number | undefined,
+ ) {
+ const wrapperDuplex: DuplexHandlerImplementation = async function* (
+ input,
+ cancel,
+ meta,
+ ctx,
+ ) {
+ yield await handler(input, cancel, meta, ctx);
+ };
+ this.registerDuplexStreamHandler(method, wrapperDuplex, timeout);
+ }
+
+ /**
+ * ID is associated with the stream, not individual messages.
+ */
+ @ready(new rpcErrors.ErrorRPCHandlerFailed())
+ public handleStream(rpcStream: RPCStream) {
+ // This will take a buffer stream of json messages and set up service
+ // handling for it.
+ // Constructing the PromiseCancellable for tracking the active stream
+ const abortController = new AbortController();
+ // Setting up timeout timer logic
+ const timer = new Timer({
+ delay: this.handlerTimeoutTime,
+ handler: () => {
+ abortController.abort(new rpcErrors.ErrorRPCTimedOut());
+ if (this.onTimeoutCallback) {
+ this.onTimeoutCallback();
+ }
+ },
+ });
+
+ const prom = (async () => {
+ const id = await this.idGen();
+ const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
+ rpcUtils.parseJSONRPCRequest,
+ );
+ // Transparent transform used as a point to cancel the input stream from
+ const passthroughTransform = new TransformStream<
+ Uint8Array,
+ Uint8Array
+ >();
+ const inputStream = passthroughTransform.readable;
+ const inputStreamEndProm = rpcStream.readable
+ .pipeTo(passthroughTransform.writable)
+ // Ignore any errors here, we only care that it ended
+ .catch(() => {});
+ void inputStream
+ // Allow us to re-use the readable after reading the first message
+ .pipeTo(headTransformStream.writable, {
+ preventClose: true,
+ preventCancel: true,
+ })
+ // Ignore any errors here, we only care that it ended
+ .catch(() => {});
+ const cleanUp = async (reason: any) => {
+ await inputStream.cancel(reason);
+ await rpcStream.writable.abort(reason);
+ await inputStreamEndProm;
+ timer.cancel(cleanupReason);
+ await timer.catch(() => {});
+ };
+ // Read a single empty value to consume the first message
+ const reader = headTransformStream.readable.getReader();
+ // Allows timing out when waiting for the first message
+ let headerMessage:
+ | ReadableStreamDefaultReadResult
+ | undefined
+ | void;
+ try {
+ headerMessage = await Promise.race([
+ reader.read(),
+ timer.then(
+ () => undefined,
+ () => {},
+ ),
+ ]);
+ } catch (e) {
+ const newErr = new rpcErrors.ErrorRPCHandlerFailed(
+ 'Stream failed waiting for header',
+ { cause: e },
+ );
+ await inputStreamEndProm;
+ timer.cancel(cleanupReason);
+ await timer.catch(() => {});
+ this.dispatchEvent(
+ new rpcEvents.RPCErrorEvent({
+ detail: new rpcErrors.ErrorRPCOutputStreamError(
+ 'Stream failed waiting for header',
+ {
+ cause: newErr,
+ },
+ ),
+ }),
+ );
+ return;
+ }
+ // Downgrade back to the raw stream
+ await reader.cancel();
+ // There are 2 conditions where we just end here
+ // 1. The timeout timer resolves before the first message
+ // 2. the stream ends before the first message
+ if (headerMessage == null) {
+ const newErr = new rpcErrors.ErrorRPCTimedOut(
+ 'Timed out waiting for header',
+ { cause: new rpcErrors.ErrorRPCStreamEnded() },
+ );
+ await cleanUp(newErr);
+ this.dispatchEvent(
+ new rpcEvents.RPCErrorEvent({
+ detail: new rpcErrors.ErrorRPCTimedOut(
+ 'Timed out waiting for header',
+ {
+ cause: newErr,
+ },
+ ),
+ }),
+ );
+ return;
+ }
+ if (headerMessage.done) {
+ const newErr = new rpcErrors.ErrorMissingHeader('Missing header');
+ await cleanUp(newErr);
+ this.dispatchEvent(
+ new rpcEvents.RPCErrorEvent({
+ detail: new rpcErrors.ErrorRPCOutputStreamError('Missing header', {
+ cause: newErr,
+ }),
+ }),
+ );
+ return;
+ }
+ const method = headerMessage.value.method;
+ const handler = this.handlerMap.get(method);
+ if (handler == null) {
+ await cleanUp(new rpcErrors.ErrorRPCHandlerFailed('Missing handler'));
+ return;
+ }
+ if (abortController.signal.aborted) {
+ await cleanUp(
+ new rpcErrors.ErrorHandlerAborted('Aborted', {
+ cause: new ErrorHandlerAborted(),
+ }),
+ );
+ return;
+ }
+ // Setting up Timeout logic
+ const timeout = this.defaultTimeoutMap.get(method);
+ if (timeout != null && timeout < this.handlerTimeoutTime) {
+ // Reset timeout with new delay if it is less than the default
+ timer.reset(timeout);
+ } else {
+ // Otherwise refresh
+ timer.refresh();
+ }
+ this.logger.info(`Handling stream with method (${method})`);
+ let handlerResult: [JSONValue | undefined, ReadableStream];
+ const headerWriter = rpcStream.writable.getWriter();
+ try {
+ handlerResult = await handler(
+ [headerMessage.value, inputStream],
+ rpcStream.cancel,
+ rpcStream.meta,
+ { signal: abortController.signal, timer },
+ );
+ } catch (e) {
+ const rpcError: JSONRPCError = {
+ code: e.exitCode ?? JSONRPCErrorCode.InternalError,
+ message: e.description ?? '',
+ data: JSON.stringify(this.fromError(e), this.replacer),
+ };
+ const rpcErrorMessage: JSONRPCResponseError = {
+ jsonrpc: '2.0',
+ error: rpcError,
+ id,
+ };
+ await headerWriter.write(Buffer.from(JSON.stringify(rpcErrorMessage)));
+ await headerWriter.close();
+ // Clean up and return
+ timer.cancel(cleanupReason);
+ rpcStream.cancel(Error('TMP header message was an error'));
+ return;
+ }
+ const [leadingResult, outputStream] = handlerResult;
+
+ if (leadingResult !== undefined) {
+ // Writing leading metadata
+ const leadingMessage: JSONRPCResponseResult = {
+ jsonrpc: '2.0',
+ result: leadingResult,
+ id,
+ };
+ await headerWriter.write(Buffer.from(JSON.stringify(leadingMessage)));
+ }
+ headerWriter.releaseLock();
+ const outputStreamEndProm = outputStream
+ .pipeTo(rpcStream.writable)
+ .catch(() => {}); // Ignore any errors, we only care that it finished
+ await Promise.allSettled([inputStreamEndProm, outputStreamEndProm]);
+ this.logger.info(`Handled stream with method (${method})`);
+ // Cleaning up abort and timer
+ timer.cancel(cleanupReason);
+ abortController.abort(new rpcErrors.ErrorRPCStreamEnded());
+ })();
+ const handlerProm = PromiseCancellable.from(prom, abortController).finally(
+ () => this.activeStreams.delete(handlerProm),
+ abortController,
+ );
+ // Putting the PromiseCancellable into the active streams map
+ this.activeStreams.add(handlerProm);
+ }
+}
+
+export default RPCServer;
diff --git a/src/callers/Caller.ts b/src/callers/Caller.ts
new file mode 100644
index 0000000..ddc54a8
--- /dev/null
+++ b/src/callers/Caller.ts
@@ -0,0 +1,13 @@
+import type { HandlerType, JSONValue } from '../types';
+
+abstract class Caller<
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> {
+ protected _inputType: Input;
+ protected _outputType: Output;
+ // Need this to distinguish the classes when inferring types
+ abstract type: HandlerType;
+}
+
+export default Caller;
diff --git a/src/callers/ClientCaller.ts b/src/callers/ClientCaller.ts
new file mode 100644
index 0000000..7fb44da
--- /dev/null
+++ b/src/callers/ClientCaller.ts
@@ -0,0 +1,11 @@
+import type { JSONValue } from '../types';
+import Caller from './Caller';
+
+class ClientCaller<
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> extends Caller {
+ public type: 'CLIENT' = 'CLIENT' as const;
+}
+
+export default ClientCaller;
diff --git a/src/callers/DuplexCaller.ts b/src/callers/DuplexCaller.ts
new file mode 100644
index 0000000..4c079b3
--- /dev/null
+++ b/src/callers/DuplexCaller.ts
@@ -0,0 +1,11 @@
+import type { JSONValue } from '../types';
+import Caller from './Caller';
+
+class DuplexCaller<
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> extends Caller {
+ public type: 'DUPLEX' = 'DUPLEX' as const;
+}
+
+export default DuplexCaller;
diff --git a/src/callers/RawCaller.ts b/src/callers/RawCaller.ts
new file mode 100644
index 0000000..a4721cf
--- /dev/null
+++ b/src/callers/RawCaller.ts
@@ -0,0 +1,7 @@
+import type { JSONValue } from '../types';
+import Caller from './Caller';
+class RawCaller extends Caller {
+ public type: 'RAW' = 'RAW' as const;
+}
+
+export default RawCaller;
diff --git a/src/callers/ServerCaller.ts b/src/callers/ServerCaller.ts
new file mode 100644
index 0000000..11a9fe9
--- /dev/null
+++ b/src/callers/ServerCaller.ts
@@ -0,0 +1,11 @@
+import type { JSONValue } from '../types';
+import Caller from './Caller';
+
+class ServerCaller<
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> extends Caller {
+ public type: 'SERVER' = 'SERVER' as const;
+}
+
+export default ServerCaller;
diff --git a/src/callers/UnaryCaller.ts b/src/callers/UnaryCaller.ts
new file mode 100644
index 0000000..c446073
--- /dev/null
+++ b/src/callers/UnaryCaller.ts
@@ -0,0 +1,11 @@
+import type { JSONValue } from '../types';
+import Caller from './Caller';
+
+class UnaryCaller<
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> extends Caller {
+ public type: 'UNARY' = 'UNARY' as const;
+}
+
+export default UnaryCaller;
diff --git a/src/callers/index.ts b/src/callers/index.ts
new file mode 100644
index 0000000..17e8c87
--- /dev/null
+++ b/src/callers/index.ts
@@ -0,0 +1,6 @@
+export { default as Caller } from './Caller';
+export { default as ClientCaller } from './ClientCaller';
+export { default as DuplexCaller } from './DuplexCaller';
+export { default as RawCaller } from './RawCaller';
+export { default as ServerCaller } from './ServerCaller';
+export { default as UnaryCaller } from './UnaryCaller';
diff --git a/src/errors/errors.ts b/src/errors/errors.ts
new file mode 100644
index 0000000..2acc942
--- /dev/null
+++ b/src/errors/errors.ts
@@ -0,0 +1,266 @@
+import type { Class } from '@matrixai/errors';
+import type { JSONValue } from '@/types';
+import { AbstractError } from '@matrixai/errors';
+
+const enum JSONRPCErrorCode {
+ ParseError = -32700,
+ InvalidRequest = -32600,
+ MethodNotFound = -32601,
+ InvalidParams = -32602,
+ InternalError = -32603,
+ HandlerNotFound = -32000,
+ RPCStopping = -32001,
+ RPCDestroyed = -32002,
+ RPCMessageLength = -32003,
+ RPCMissingResponse = -32004,
+ RPCOutputStreamError = -32005,
+ RPCRemote = -32006,
+ RPCStreamEnded = -32007,
+ RPCTimedOut = -32008,
+ RPCConnectionLocal = -32010,
+ RPCConnectionPeer = -32011,
+ RPCConnectionKeepAliveTimeOut = -32012,
+ RPCConnectionInternal = -32013,
+ MissingHeader = -32014,
+ HandlerAborted = -32015,
+ MissingCaller = -32016,
+}
+interface RPCError extends Error {
+ code?: number;
+}
+class ErrorRPC extends AbstractError implements RPCError {
+ private _description: string = 'Generic Error';
+ constructor(message?: string) {
+ super(message);
+ }
+ code?: number;
+
+ get description(): string {
+ return this._description;
+ }
+
+ set description(value: string) {
+ this._description = value;
+ }
+}
+
+class ErrorRPCDestroyed extends ErrorRPC {
+ constructor(message?: string) {
+ super(message); // Call the parent constructor
+ this.description = 'Rpc is destroyed'; // Set the specific description
+ this.code = JSONRPCErrorCode.MethodNotFound;
+ }
+}
+
+class ErrorRPCParse extends ErrorRPC {
+ static description = 'Failed to parse Buffer stream';
+
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Failed to parse Buffer stream'; // Set the specific description
+ this.code = JSONRPCErrorCode.ParseError;
+ }
+}
+
+class ErrorRPCStopping extends ErrorRPC {
+ constructor(message?: string) {
+ super(message); // Call the parent constructor
+ this.description = 'Rpc is stopping'; // Set the specific description
+ this.code = JSONRPCErrorCode.RPCStopping;
+ }
+}
+
+/**
+ * This is an internal error, it should not reach the top level.
+ */
+class ErrorRPCHandlerFailed extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Failed to handle stream'; // Set the specific description
+ this.code = JSONRPCErrorCode.HandlerNotFound;
+ }
+}
+class ErrorRPCCallerFailed extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Failed to call stream'; // Set the specific description
+ this.code = JSONRPCErrorCode.MissingCaller;
+ }
+}
+class ErrorMissingCaller extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Header information is missing'; // Set the specific description
+ this.code = JSONRPCErrorCode.MissingCaller;
+ }
+}
+class ErrorMissingHeader extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Header information is missing'; // Set the specific description
+ this.code = JSONRPCErrorCode.MissingHeader;
+ }
+}
+
+class ErrorHandlerAborted extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message); // Call the parent constructor
+ this.description = 'Handler Aborted Stream.'; // Set the specific description
+ this.code = JSONRPCErrorCode.HandlerAborted;
+ }
+}
+class ErrorRPCMessageLength extends ErrorRPC {
+ static description = 'RPC Message exceeds maximum size';
+ code? = JSONRPCErrorCode.RPCMessageLength;
+}
+
+class ErrorRPCMissingResponse extends ErrorRPC {
+ constructor(message?: string) {
+ super(message);
+ this.description = 'Stream ended before response';
+ this.code = JSONRPCErrorCode.RPCMissingResponse;
+ }
+}
+
+interface ErrorRPCOutputStreamErrorOptions {
+ cause?: Error;
+}
+class ErrorRPCOutputStreamError extends ErrorRPC {
+ constructor(message: string, options: ErrorRPCOutputStreamErrorOptions) {
+ super(message);
+ this.description = 'Output stream failed, unable to send data';
+ this.code = JSONRPCErrorCode.RPCOutputStreamError;
+ }
+}
+
+class ErrorRPCRemote extends ErrorRPC {
+ static description = 'Remote error from RPC call';
+ static message: string = 'The server responded with an error';
+ metadata: JSONValue | undefined;
+
+ constructor(metadata?: JSONValue, message?: string, options?) {
+ super(message);
+ this.metadata = metadata;
+ this.code = JSONRPCErrorCode.RPCRemote;
+ this.data = options?.data;
+ }
+
+ public static fromJSON>(
+ this: T,
+ json: any,
+ ): InstanceType {
+ if (
+ typeof json !== 'object' ||
+ json.type !== this.name ||
+ typeof json.data !== 'object' ||
+ typeof json.data.message !== 'string' ||
+ isNaN(Date.parse(json.data.timestamp)) ||
+ typeof json.data.metadata !== 'object' ||
+ typeof json.data.data !== 'object' ||
+ ('stack' in json.data && typeof json.data.stack !== 'string')
+ ) {
+ throw new TypeError(`Cannot decode JSON to ${this.name}`);
+ }
+
+ // Here, you can define your own metadata object, or just use the one from JSON directly.
+ const parsedMetadata = json.data.metadata;
+
+ const e = new this(parsedMetadata, json.data.message, {
+ timestamp: new Date(json.data.timestamp),
+ data: json.data.data,
+ cause: json.data.cause,
+ });
+ e.stack = json.data.stack;
+ return e;
+ }
+ public toJSON(): any {
+ return {
+ type: this.name,
+ data: {
+ description: this.description,
+ },
+ };
+ }
+}
+
+class ErrorRPCStreamEnded extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message);
+ this.description = 'Handled stream has ended';
+ this.code = JSONRPCErrorCode.RPCStreamEnded;
+ }
+}
+
+class ErrorRPCTimedOut extends ErrorRPC {
+ constructor(message?: string, options?: { cause: Error }) {
+ super(message);
+ this.description = 'RPC handler has timed out';
+ this.code = JSONRPCErrorCode.RPCTimedOut;
+ }
+}
+
+class ErrorUtilsUndefinedBehaviour extends ErrorRPC {
+ constructor(message?: string) {
+ super(message);
+ this.description = 'You should never see this error';
+ this.code = JSONRPCErrorCode.MethodNotFound;
+ }
+}
+export function never(): never {
+ throw new ErrorRPC('This function should never be called');
+}
+
+class ErrorRPCMethodNotImplemented extends ErrorRPC {
+ constructor(message?: string) {
+ super(message || 'This method must be overridden'); // Default message if none provided
+ this.name = 'ErrorRPCMethodNotImplemented';
+ this.description =
+ 'This abstract method must be implemented in a derived class';
+ this.code = JSONRPCErrorCode.MethodNotFound;
+ }
+}
+
+class ErrorRPCConnectionLocal extends ErrorRPC {
+ static description = 'RPC Connection local error';
+ code? = JSONRPCErrorCode.RPCConnectionLocal;
+}
+
+class ErrorRPCConnectionPeer extends ErrorRPC {
+ static description = 'RPC Connection peer error';
+ code? = JSONRPCErrorCode.RPCConnectionPeer;
+}
+
+class ErrorRPCConnectionKeepAliveTimeOut extends ErrorRPC {
+ static description = 'RPC Connection keep alive timeout';
+ code? = JSONRPCErrorCode.RPCConnectionKeepAliveTimeOut;
+}
+
+class ErrorRPCConnectionInternal extends ErrorRPC {
+ static description = 'RPC Connection internal error';
+ code? = JSONRPCErrorCode.RPCConnectionInternal;
+}
+
+export {
+ ErrorRPC,
+ ErrorRPCDestroyed,
+ ErrorRPCStopping,
+ ErrorRPCParse,
+ ErrorRPCHandlerFailed,
+ ErrorRPCMessageLength,
+ ErrorRPCMissingResponse,
+ ErrorRPCOutputStreamError,
+ ErrorRPCRemote,
+ ErrorRPCStreamEnded,
+ ErrorRPCTimedOut,
+ ErrorUtilsUndefinedBehaviour,
+ ErrorRPCMethodNotImplemented,
+ ErrorRPCConnectionLocal,
+ ErrorRPCConnectionPeer,
+ ErrorRPCConnectionKeepAliveTimeOut,
+ ErrorRPCConnectionInternal,
+ ErrorMissingHeader,
+ ErrorHandlerAborted,
+ ErrorRPCCallerFailed,
+ ErrorMissingCaller,
+ JSONRPCErrorCode,
+};
diff --git a/src/errors/index.ts b/src/errors/index.ts
new file mode 100644
index 0000000..f72bc43
--- /dev/null
+++ b/src/errors/index.ts
@@ -0,0 +1 @@
+export * from './errors';
diff --git a/src/events.ts b/src/events.ts
new file mode 100644
index 0000000..828cca4
--- /dev/null
+++ b/src/events.ts
@@ -0,0 +1,85 @@
+import type RPCServer from './RPCServer';
+import type RPCClient from './RPCClient';
+import type {
+ ErrorRPCConnectionLocal,
+ ErrorRPCConnectionPeer,
+ ErrorRPCConnectionKeepAliveTimeOut,
+ ErrorRPCConnectionInternal,
+} from './errors';
+import { AbstractEvent } from '@matrixai/events';
+import * as rpcErrors from './errors';
+
+abstract class EventRPC extends AbstractEvent {}
+
+abstract class EventRPCClient extends AbstractEvent {}
+
+abstract class EventRPCServer extends AbstractEvent {}
+
+abstract class EventRPCConnection extends AbstractEvent {}
+
+// Client events
+class EventRPCClientDestroy extends EventRPCClient {}
+
+class EventRPCClientDestroyed extends EventRPCClient {}
+
+class EventRPCClientCreate extends EventRPCClient {}
+
+class EventRPCClientCreated extends EventRPCClient {}
+
+class EventRPCClientError extends EventRPCClient {}
+
+class EventRPCClientConnect extends EventRPCClient {}
+
+// Server events
+
+class EventRPCServerConnection extends EventRPCServer {}
+
+class EventRPCServerCreate extends EventRPCServer {}
+
+class EventRPCServerCreated extends EventRPCServer {}
+
+class EventRPCServerDestroy extends EventRPCServer {}
+
+class EventRPCServerDestroyed extends EventRPCServer {}
+
+class EventRPCServerError extends EventRPCServer {}
+
+class EventRPCConnectionError extends EventRPCConnection<
+ | ErrorRPCConnectionLocal
+ | ErrorRPCConnectionPeer
+ | ErrorRPCConnectionKeepAliveTimeOut
+ | ErrorRPCConnectionInternal
+> {}
+
+class RPCErrorEvent extends Event {
+ public detail: Error;
+ constructor(
+ options: EventInit & {
+ detail: Error;
+ },
+ ) {
+ super('error', options);
+ this.detail = options.detail;
+ }
+}
+
+export {
+ RPCErrorEvent,
+ EventRPC,
+ EventRPCClient,
+ EventRPCServer,
+ EventRPCConnection,
+ EventRPCClientDestroy,
+ EventRPCClientDestroyed,
+ EventRPCClientCreate,
+ EventRPCClientCreated,
+ EventRPCClientError,
+ EventRPCClientConnect,
+ EventRPCServerConnection,
+ EventRPCServerCreate,
+ EventRPCServerCreated,
+ EventRPCServerDestroy,
+ EventRPCServerDestroyed,
+ EventRPCServerError,
+ EventRPCConnectionError,
+};
diff --git a/src/handlers/ClientHandler.ts b/src/handlers/ClientHandler.ts
new file mode 100644
index 0000000..0aea354
--- /dev/null
+++ b/src/handlers/ClientHandler.ts
@@ -0,0 +1,21 @@
+import type { ContainerType, JSONValue } from '../types';
+import type { ContextTimed } from '@matrixai/contexts';
+import Handler from './Handler';
+import { ErrorRPCMethodNotImplemented } from '../errors';
+
+abstract class ClientHandler<
+ Container extends ContainerType = ContainerType,
+ Input extends JSONValue = JSONValue,
+ Output extends JSONValue = JSONValue,
+> extends Handler {
+ public handle = async (
+ input: AsyncIterableIterator,
+ cancel: (reason?: any) => void,
+ meta: Record | undefined,
+ ctx: ContextTimed,
+ ): Promise