From bcf683f403bf7e35308bdbcf2a8627c87136d628 Mon Sep 17 00:00:00 2001 From: Jose Ignacio Acin Pozo Date: Fri, 29 Nov 2024 20:00:56 +0000 Subject: [PATCH 1/2] add streamer apis intro docs and streamer docs, streamer client docs are still pending --- .../index.mdx | 5 +- .../01_streamer/_category_.yml | 3 + .../02_streamer-apis/01_streamer/index.mdx | 308 ++++++++++++++++++ .../02_streamer-client/_category_.yml | 3 + .../02_streamer-client/index.mdx | 35 ++ .../02_streamer-apis/_category_.yml | 2 +- .../02_streamer-apis/index.mdx | 48 ++- 7 files changed, 397 insertions(+), 7 deletions(-) create mode 100644 docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/_category_.yml create mode 100644 docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx create mode 100644 docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/_category_.yml create mode 100644 docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx diff --git a/docs/001_develop/02_server-capabilities/009_real-time-aggregation-consolidator/index.mdx b/docs/001_develop/02_server-capabilities/009_real-time-aggregation-consolidator/index.mdx index 97183e97ef..13fdd5f5ec 100644 --- a/docs/001_develop/02_server-capabilities/009_real-time-aggregation-consolidator/index.mdx +++ b/docs/001_develop/02_server-capabilities/009_real-time-aggregation-consolidator/index.mdx @@ -1183,10 +1183,7 @@ It is important to note that at the beginning of a cold start, all fields affect -## Testing - - -### Integration testing (legacy) +## Testing (legacy) :::info For the latest information on testing, go to our page on [Integration testing](/operations/testing/integration-testing/). diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/_category_.yml b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/_category_.yml new file mode 100644 index 0000000000..fa84163e90 --- /dev/null +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/_category_.yml @@ -0,0 +1,3 @@ +label: 'Streamer' +className: 'menu__nested-category' +collapsed: false diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx new file mode 100644 index 0000000000..430e9a18d6 --- /dev/null +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx @@ -0,0 +1,308 @@ +--- +title: 'Streamer' +sidebar_label: 'Streamer' +id: server-integrations-streamer +keywords: [integration, integrations, streamer, ingress, egress, consume, produce] +tags: +- integration +- integrations +- streamer +- ingress +- egress +- consume +- produce +sidebar_position: 1 +--- + +import Log from '/snippet/_LOG.md' +import CommonProcesses from '/snippet/_common-see-processes.md' +import CommonFieldOverrides from '/snippet/_common-field-overrides.md' + +## Overview + +**Streamers** provide reliable data streams to [streamer clients](/develop/server-capabilities/integrations/streamer-apis/streamer-client) based on a database table or view. These data streams are reliable as they are driven by an underlying unique, sequenced and monotonically increasing field found in the table or view itself. + +The core streamer behaviour is the following: +- Streamer receives a **logon** message from a **streamer client** with or without a specific sequence, for a specific streamer definition. +- The streamer now will act differently depending on whether it received a sequence or not. + - If no sequence has been received, the streamer will mark the streamer client connection as up to date. + - If a sequence has been received, the streamer will use it to read a batch of records (see batchSize TODO) from the database using the defined table index. These records will be sent to the streamer client with an additional flag indicating whether there is more data available in the table or not. + - If more data is available, the streamer client will request the next batch of data. + - If no more data is available, the streamer marks the streamer client connection as up to date. + - Once the connection is marked as up to date, the streamer starts streaming real-time updates for any new **inserted** records belonging to the target table/view. **Important** only insert updates are eligible for streamers, as we assume the underlying table represents a sequence of messages or record changes that work in an "append-only" fashion. + - The data can be transformed and filtered before being passed on to the **streamer client**. + +:::warning +Streamers can only work as intended by using a numeric field in the underlying table. This field must represent a numeric sequence (perhaps defined as an auto increment field) and be both unique and monotonically increasing. This field must also be indexed. +::: + +Additionally, streamers may have `xlator` plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation [here](/develop/business-components/fix/fix-xlator/) for more information. + + +## Example configuration + +A sample streamer configuration can be seen below: + +```kotlin +streams { + stream("FIX_IN_X", FIX_IN.BY_RX_SEQUENCE) { + batchSize = 500 + + fields { + FIX_IN.FIX_DATA + } + + where { fixIn -> + fixIn.connectionName == "X" + } + } +} +``` + +And the FIX_IN table is defined as seen below: + +```kotlin + table(name = "FIX_IN", id = 7003) { + field(name = "CONNECTION_NAME", type = STRING) + field(name = "RX_SEQUENCE", type = LONG) + field(name = "SENDER_ID", type = STRING) + field(name = "TARGET_ID", type = STRING) + field(name = "CL_ORD_ID", type = STRING) + field(name = "ORIG_CL_ORD_ID", type = STRING) + field(name = "INTERNAL_TARGET", type = STRING) + field(name = "SIZE", type = INT) + field(name = "FIX_DATA", type = STRING) + primaryKey("RX_SEQUENCE") + indices { + unique("CONNECTION_NAME", "RX_SEQUENCE") + } + } +``` + +In this case we are defining a stream named "FIX_IN_X", using the FIX_IN table and the BY_RX_SEQUENCE index. The BY_RX_SEQUENCE index contains a single field named RX_SEQUENCE. + +The `batchSize` configuration has been set to 500, so the streamer will only read 500 records at a time when replaying records. + +The `fields` configuration has been defined to only extract the FIX_DATA field from the FIX_IN table, so no other fields will be provided to any **streamer clients** connecting to this streamer. + +The `where` configuration filters all data to ensure we only send records with CONNECTION_NAME equals to "X". + +See sample workflow diagram below interactions between a hypothetical streamerclient connecting to the streamer defined in our example: + +```mermaid +sequenceDiagram +Streamer client->>+Streamer: Requests data for Stream "FIX_IN" with sequence 5000 +Streamer->>+Database: Requests 500 rows from FIX_IN with RX_SEQUENCE > 5000 ordered by RX_SEQUENCE +Database->>+Streamer: Provides 500 records, last record has RX_SEQUENCE 5500. +Streamer->>+Streamer client: Sends 500 records, MORE_DATA_AVAILABLE=true +Streamer client->>+Streamer: Requests next batch +Streamer->>+Database: Requests 500 rows from FIX_IN with RX_SEQUENCE > 5500 ordered by RX_SEQUENCE +Database->>+Streamer: Provides 253 records, last record has RX_SEQUENCE 5753 (reached end of table). +Streamer->>+Streamer client: Sends 253 records, MORE_DATA_AVAILABLE=false +Update queue->>+Streamer: New FIX_IN table record +Streamer->>+Streamer client: Send record +``` + +## Configuration options + +### `stream` +You can use the `stream` configuration to declare a reliable stream. + +The simplest Streamer definition is: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) +} +``` + +This example creates a stream called `ORDERS_OUT`, based on the `ORDER_OUT` table (or view). The data will be streamed, ordered by timestamp. + +`stream` contains additional configuration items explained below. + +#### `batchSize` + +`batchSize` determines how many records will be read from the database in each query when the streamer is replaying data after a successful streamer client connection. + +The default value is 100. + +Example usage below: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + batchSize = 500 + } +} +``` + +#### `logoffTimeout` + +`logoffTimeout` determines how often the streamer will review its current streamer client connections to check if they are still alive. If a connection is not alive anymore, it will be closed. The value is measured in seconds. + +The default value is 5000. + +Example usage below: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + logoffTimeout = 60 + } +} +``` + +#### `maxLogons` + +`maxLogons` determines how many streamer client connections can be established to this particular streamer. This ensures no more connections than absolutely necessary are allowed. + +The default value is 1. + +Example usage below: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + maxLogons = 2 + } +} +``` + +#### `terminateOnError` + +`terminateOnError` determines the behaviour of the streamer client when an exception is thrown. For example, in the sample code below we apply a transformation to the outgoing `GenesisSet` message that will throw an exception: + +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + toGenesisSet { ordersOut -> + throw RuntimeException() + } + } +} +``` + +By default `terminateOnError` is set to true, which means the streamer process will crash as soon as the first record hits the `toGenesisSet` transformation. If set to `false`, the exception will be logged, but the streamer will continue process rows. + + +#### `where` + +The `where` tag enables the stream to be filtered. It is available in two versions: one that has the streamed row as a parameter, and one that also has the logon message. + +Here, we only stream orders with a quantity greater than 1,000. +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + where { ordersOut -> + ordersOut.quantity > 1_000 + } + } +} +``` + +In this example, we only stream orders with a quantity greater than 1,000 and where the logon message has provided a secret key. +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + where { ordersOut, logonMessage -> + ordersOut.quantity > 1_000 && logonMessage.getString("KEY") == "SECRET" + } + } +} +``` + +#### `fields` + +The `fields` tag enables you to transform the output in a similar way to views, data server and req rep definitions. For example, here we output three fields: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + fields { + ORDERS_OUT.CLIENT_ID + ORDERS_OUT.QUANTITY withPrefix "ORDER" + ORDERS_OUT.CLIENT_ID withAlias "CLIENT" + } + } +} +``` + + + + +#### `toGenesisSet` + +The `toGenesisSet` tag enables you to create a custom GenesisSet (TODO add link) from the table/view entity before it is automatically converted and sent to the streamer client. + +Example below: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + toGenesisSet { ordersOut -> + genesisSet { + "ORDER_QUANTITY" with ordersOut.quantity + "ORDER" with ordersOut.orderId + } + } + } +} +``` + +#### `toEntity` + +The `toEntity` tag allows you to create a custom `DbEntity` (TODO add link) from the table/view entity before it is automatically converted and sent to the streamer client as a `GenesisSet`. + +Example below: +```kotlin +streams { + stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + toGenesisSet { ordersOut -> + genesisSet { + "ORDER_QUANTITY" with ordersOut.quantity + "ORDER" with ordersOut.orderId + } + } + } +} +``` + +#### Custom Log messages + + + +## Metrics + +:::info +Ensure you have [enabled metrics](/build-deploy-operate/operate/metrics/#enabling-metrics) in your environment to view them. +::: + +The metrics for Streamer measure how long it takes to replay a single message batch when working in replay mode: + + +| Metric | Explanation | +|:--------------------------|:------------------------------------------| +| replay_processing_latency | The latency for processing a replay batch | + + +## Runtime configuration + +To include your `*-streamer.kts` file definitions in a runtime process, you will need to ensure the process definition: + +1. Ensure `genesis-pal-streamer` is included in `module` +2. Ensure `global.genesis.streamer.pal` is included in `package` +3. Ensure your streamer.kts file(s) are defined in `script` +4. Ensure `pal` is set in `language` + +If you wish to run a dedicated process for a streamer, the following gives an example full process definition: + +```xml{13} + + POSITION + Streams trades to external FIX gateway + true + -Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -XX:MaxHeapFreeRatio=70 -XX:MinHeapFreeRatio=30 -XX:+UseG1GC -XX:+UseStringDeduplication -XX:OnOutOfMemoryError="handleOutOfMemoryError.sh %p" + genesis-pal-streamer + global.genesis.streamer.pal + true + + INFO,DATADUMP_ON + pal + +``` + + \ No newline at end of file diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/_category_.yml b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/_category_.yml new file mode 100644 index 0000000000..4557136675 --- /dev/null +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/_category_.yml @@ -0,0 +1,3 @@ +label: 'Streamer Client' +className: 'menu__nested-category' +collapsed: false diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx new file mode 100644 index 0000000000..a4344fc25a --- /dev/null +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx @@ -0,0 +1,35 @@ +--- +title: 'Streamer Client' +sidebar_label: 'Streamer Client' +id: server-integrations-streamer-client +keywords: [integration, integrations, streamer, ingress, egress, consume, produce] +tags: +- integration +- integrations +- streamerclient +- ingress +- egress +- consume +- produce +sidebar_position: 2 +--- + +import Log from '/snippet/_LOG.md' +import CommonProcesses from '/snippet/_common-see-processes.md' + +## Overview + + +## Example configuration + + +## Configuration options + + +## Metrics + + +## Runtime configuration + + +## Testing \ No newline at end of file diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/_category_.yml b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/_category_.yml index 5f68f5452b..a4ba13a08b 100644 --- a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/_category_.yml +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/_category_.yml @@ -1,3 +1,3 @@ label: 'Streamer APIs' className: 'menu__nested-category' -collapsed: true +collapsed: false diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/index.mdx b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/index.mdx index a9f6d3db60..b448625b39 100644 --- a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/index.mdx +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/index.mdx @@ -2,16 +2,60 @@ title: 'Streamer APIs' sidebar_label: 'Streamer APIs' id: server-integrations-streamer-apis -keywords: [integration, integrations, streamer, ingress, egress, consume, produce] +keywords: [integration, integrations, streamer, ingress, egress, consume, produce, gateway] tags: - integration - integrations - streamer +- streamerclient - ingress - egress - consume - produce +- gateway sidebar_position: 2 --- -Coming soon... +If your application needs to integrate with external systems through any sort of gateway (e.g. [FIX](/develop/business-components/fix/fix-gateway/)), you need to be able to interpret incoming messages in the format of the external system. Equally, you need to be able to reformat information from your Genesis application when you send messages out to that system. + +For this, Genesis uses Streamers, Streamer Clients and gateways. + +* A **Streamer** listens to a table or view, and streams data out to Streamer Clients. In many cases, you listen to a reliable, auditable table with unique time-stamped updates - an audit table. Or alternatively, a table that represents a unique and sorted sequence of messages and behaves in an **insert-only** fashion. This ensures that, in the event of failure, a Streamer is able to go back and stream from a specific timestamp. +Follow this link to learn more about [streamers](/develop/server-capabilities/integrations/streamer-apis/streamer). +* A **Streamer Client** links a stream of messages provided by a **Streamer** with an [event handler](/develop/server-capabilities/core-business-logic-event-handler/). On start up, it will request data from **streamer** processes depending on its definition. Once it receives the data from the **streamer**, it will apply any defined transformations and pass the result to a target event handler. +Streamer clients are usually configured for replayability. In that case, if the **streamer client** is killed or crashes for any reason, it will request data from the **streamer** again on the next restart, but this time using the last valid processed timestamp/sequence to avoid reprocessing previously successful messages. +Follow this link to know more about [streamer clients](/develop/server-capabilities/integrations/streamer-apis/streamer-client) +* A **Gateway** is a message routing service that connects to an external service, and exposes [event handler](/develop/server-capabilities/core-business-logic-event-handler/) endpoints. For example, a [FIX gateway](/develop/business-components/fix/fix-gateway/) uses the FIX protocol to connect to exchanges or other trading hubs. + +As Streamer and Streamer Client processes can be run separately, it is possible to serve multiple clients with a single **Streamer** to perform different activities. + +For example, you could have two separate Streamer Clients (A and B) listening to Streamer X: + +* Streamer Client A sends data to the FIX gateway. +* Streamer Client B sends the data to a separate external application. + +A sample inbound flow from a FIX Gateway could look like this: + +```mermaid +flowchart LR +A[FIX Gateway] -->|writes to| B(Table) +C --> |reads from / listens to| B(Table) +C --> |streams data to| D(Streamer client) +D --> |connects to| C(Streamer) +D --> |sends data to| E(Event handler) +E --> |sends data acknowledgment to| D(Streamer client) +``` + +A sample outbound flow to a FIX Gateway would look like this instead: + +```mermaid +flowchart LR +A[Genesis Process] -->|writes to| B(Table) +C --> |reads from / listens to| B(Table) +C --> |streams data to| D(Streamer client) +D --> |connects to| C(Streamer) +D --> |sends data to| E(FIX Gateway) +E --> |sends data acknowledgment to| D(Streamer client) +``` + +As you can see, both inbound and outbound flows are in fact the same, because the FIX gateway is both a process that writes to a table, and an event handler in itself. \ No newline at end of file From f698c149b695355ceb4061730afcaf80c6d34c94 Mon Sep 17 00:00:00 2001 From: Jose Ignacio Acin Pozo Date: Sun, 1 Dec 2024 22:37:09 +0000 Subject: [PATCH 2/2] add streamer client docs, fix broken links in fix-xlator, rework some bits in streamer docs --- .../02_streamer-apis/01_streamer/index.mdx | 76 +-- .../02_streamer-client/index.mdx | 508 +++++++++++++++++- .../08_fix/fix-xlator.mdx | 29 +- 3 files changed, 558 insertions(+), 55 deletions(-) diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx index 430e9a18d6..5f41b75482 100644 --- a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/01_streamer/index.mdx @@ -23,11 +23,11 @@ import CommonFieldOverrides from '/snippet/_common-field-overrides.md' **Streamers** provide reliable data streams to [streamer clients](/develop/server-capabilities/integrations/streamer-apis/streamer-client) based on a database table or view. These data streams are reliable as they are driven by an underlying unique, sequenced and monotonically increasing field found in the table or view itself. The core streamer behaviour is the following: -- Streamer receives a **logon** message from a **streamer client** with or without a specific sequence, for a specific streamer definition. -- The streamer now will act differently depending on whether it received a sequence or not. - - If no sequence has been received, the streamer will mark the streamer client connection as up to date. - - If a sequence has been received, the streamer will use it to read a batch of records (see batchSize TODO) from the database using the defined table index. These records will be sent to the streamer client with an additional flag indicating whether there is more data available in the table or not. - - If more data is available, the streamer client will request the next batch of data. +- Streamer receives a **logon** message from a **streamer client** with or without a specific sequence number, for a specific streamer definition. +- The streamer will act differently depending on whether it received a sequence number or not. + - If no sequence number has been received, the streamer will mark the streamer client connection as up to date. + - If a sequence number has been received, the streamer will use it to read a batch of records (see [`batchSize`](#batchsize)) from the database using the defined table index. These records will be sent to the streamer client with an additional flag indicating whether there is more data available in the table or not. + - If more data is available, the streamer client will request the next batch of data once the message is processed. This repeats the steps explained in the previous point, albeit no sequence number is needed, as the streamer knows how to read the next batch of records based on the previous batch. - If no more data is available, the streamer marks the streamer client connection as up to date. - Once the connection is marked as up to date, the streamer starts streaming real-time updates for any new **inserted** records belonging to the target table/view. **Important** only insert updates are eligible for streamers, as we assume the underlying table represents a sequence of messages or record changes that work in an "append-only" fashion. - The data can be transformed and filtered before being passed on to the **streamer client**. @@ -36,7 +36,7 @@ The core streamer behaviour is the following: Streamers can only work as intended by using a numeric field in the underlying table. This field must represent a numeric sequence (perhaps defined as an auto increment field) and be both unique and monotonically increasing. This field must also be indexed. ::: -Additionally, streamers may have `xlator` plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation [here](/develop/business-components/fix/fix-xlator/) for more information. +Additionally, streamers may use `xlator` plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation [here](/develop/business-components/fix/fix-xlator/) for more information. ## Example configuration @@ -79,28 +79,28 @@ And the FIX_IN table is defined as seen below: } ``` -In this case we are defining a stream named "FIX_IN_X", using the FIX_IN table and the BY_RX_SEQUENCE index. The BY_RX_SEQUENCE index contains a single field named RX_SEQUENCE. +In this case we are defining a stream named "FIX_IN_X", using the `FIX_IN` table and the `BY_RX_SEQUENCE` index. The `BY_RX_SEQUENCE` index contains a single field named `RX_SEQUENCE`. The `batchSize` configuration has been set to 500, so the streamer will only read 500 records at a time when replaying records. -The `fields` configuration has been defined to only extract the FIX_DATA field from the FIX_IN table, so no other fields will be provided to any **streamer clients** connecting to this streamer. +The `fields` configuration has been defined to only extract the `FIX_DATA` field from the `FIX_IN` table, so no other fields will be provided to any **streamer clients** connecting to this streamer. In this case, `FIX_DATA` contains a string represenation of the whole FIX message. -The `where` configuration filters all data to ensure we only send records with CONNECTION_NAME equals to "X". +The `where` configuration filters all data to ensure we only send records with `CONNECTION_NAME` equals to "X". -See sample workflow diagram below interactions between a hypothetical streamerclient connecting to the streamer defined in our example: +See sample workflow diagram below interactions between a hypothetical streamer client connecting to the streamer defined in our example: ```mermaid sequenceDiagram Streamer client->>+Streamer: Requests data for Stream "FIX_IN" with sequence 5000 Streamer->>+Database: Requests 500 rows from FIX_IN with RX_SEQUENCE > 5000 ordered by RX_SEQUENCE -Database->>+Streamer: Provides 500 records, last record has RX_SEQUENCE 5500. -Streamer->>+Streamer client: Sends 500 records, MORE_DATA_AVAILABLE=true +Database->>+Streamer: Provides 500 rows, last row has RX_SEQUENCE 5500. +Streamer->>+Streamer client: Sends 500 rows, MORE_DATA_AVAILABLE=true Streamer client->>+Streamer: Requests next batch Streamer->>+Database: Requests 500 rows from FIX_IN with RX_SEQUENCE > 5500 ordered by RX_SEQUENCE -Database->>+Streamer: Provides 253 records, last record has RX_SEQUENCE 5753 (reached end of table). -Streamer->>+Streamer client: Sends 253 records, MORE_DATA_AVAILABLE=false -Update queue->>+Streamer: New FIX_IN table record -Streamer->>+Streamer client: Send record +Database->>+Streamer: Provides 253 rows, last row has RX_SEQUENCE 5753 (reached end of table). +Streamer->>+Streamer client: Sends 253 rows, MORE_DATA_AVAILABLE=false +Update queue->>+Streamer: New FIX_IN table row +Streamer->>+Streamer client: Send row ``` ## Configuration options @@ -111,11 +111,11 @@ You can use the `stream` configuration to declare a reliable stream. The simplest Streamer definition is: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) } ``` -This example creates a stream called `ORDERS_OUT`, based on the `ORDER_OUT` table (or view). The data will be streamed, ordered by timestamp. +This example creates a stream called `ORDER_OUT`, based on the `ORDER_OUT` table (or view). The data will be streamed, ordered by timestamp. `stream` contains additional configuration items explained below. @@ -128,7 +128,7 @@ The default value is 100. Example usage below: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){ batchSize = 500 } } @@ -143,7 +143,7 @@ The default value is 5000. Example usage below: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){ logoffTimeout = 60 } } @@ -158,7 +158,7 @@ The default value is 1. Example usage below: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP){ + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP){ maxLogons = 2 } } @@ -170,7 +170,7 @@ streams { ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { toGenesisSet { ordersOut -> throw RuntimeException() } @@ -188,7 +188,7 @@ The `where` tag enables the stream to be filtered. It is available in two versio Here, we only stream orders with a quantity greater than 1,000. ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { where { ordersOut -> ordersOut.quantity > 1_000 } @@ -199,7 +199,7 @@ streams { In this example, we only stream orders with a quantity greater than 1,000 and where the logon message has provided a secret key. ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { where { ordersOut, logonMessage -> ordersOut.quantity > 1_000 && logonMessage.getString("KEY") == "SECRET" } @@ -209,14 +209,14 @@ streams { #### `fields` -The `fields` tag enables you to transform the output in a similar way to views, data server and req rep definitions. For example, here we output three fields: +The `fields` tag enables you to transform the message output in a similar way to views, data server and req rep definitions. For example, here we output three fields: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { fields { - ORDERS_OUT.CLIENT_ID - ORDERS_OUT.QUANTITY withPrefix "ORDER" - ORDERS_OUT.CLIENT_ID withAlias "CLIENT" + ORDER_OUT.CLIENT_ID + ORDER_OUT.QUANTITY withPrefix "ORDER" + ORDER_OUT.CLIENT_ID withAlias "CLIENT" } } } @@ -227,12 +227,12 @@ streams { #### `toGenesisSet` -The `toGenesisSet` tag enables you to create a custom GenesisSet (TODO add link) from the table/view entity before it is automatically converted and sent to the streamer client. +The `toGenesisSet` tag enables you to create a custom [`GenesisSet`](/develop/server-capabilities/communications-meta/#genesisset) from the table/view entity before it is automatically converted and sent to the streamer client. Example below: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { toGenesisSet { ordersOut -> genesisSet { "ORDER_QUANTITY" with ordersOut.quantity @@ -245,16 +245,16 @@ streams { #### `toEntity` -The `toEntity` tag allows you to create a custom `DbEntity` (TODO add link) from the table/view entity before it is automatically converted and sent to the streamer client as a `GenesisSet`. +The `toEntity` tag allows you to create a custom [`DbEntity`](/develop/server-capabilities/data-access-apis/#database-entities) from the table/view entity before it is automatically converted and sent to the streamer client as a `GenesisSet`. Example below: ```kotlin streams { - stream("ORDERS_OUT", ORDER_OUT.BY_TIMESTAMP) { - toGenesisSet { ordersOut -> - genesisSet { - "ORDER_QUANTITY" with ordersOut.quantity - "ORDER" with ordersOut.orderId + stream("ORDER_OUT", ORDER_OUT.BY_TIMESTAMP) { + toEntity { ordersOut -> + Order { + quantity = ordersOut.quantity + orderId = ordersOut.orderId } } } @@ -271,7 +271,7 @@ streams { Ensure you have [enabled metrics](/build-deploy-operate/operate/metrics/#enabling-metrics) in your environment to view them. ::: -The metrics for Streamer measure how long it takes to replay a single message batch when working in replay mode: +The metrics for the Streamer measure how long it takes to replay a single message batch when working in recovery mode: | Metric | Explanation | diff --git a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx index a4344fc25a..a8d772110b 100644 --- a/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx +++ b/docs/001_develop/02_server-capabilities/011_integrations/02_streamer-apis/02_streamer-client/index.mdx @@ -1,6 +1,6 @@ --- -title: 'Streamer Client' -sidebar_label: 'Streamer Client' +title: 'Streamer client' +sidebar_label: 'Streamer client' id: server-integrations-streamer-client keywords: [integration, integrations, streamer, ingress, egress, consume, produce] tags: @@ -16,20 +16,522 @@ sidebar_position: 2 import Log from '/snippet/_LOG.md' import CommonProcesses from '/snippet/_common-see-processes.md' +import CommonFieldOverrides from '/snippet/_common-field-overrides.md' ## Overview +**Streamer clients** link data streams from [streamers](/develop/server-capabilities/integrations/streamer-apis/streamer/) to a target [event handler](/develop/server-capabilities/core-business-logic-event-handler/) based on their configuration. +This linkage takes into account whether the data has been processed by the event handler in order to provide recoverability in case of failure. + +The core streamer client behaviour is the following: +- Streamer client sends a **logon** message to a **streamer** with or without a specific sequence number, for a specific streamer definition. +- The streamer client will now start receiving messages from the **streamer**, either as part of a recovery message workflow, or real-time messages. + - If the message received contains a flag named `MORE_DATA_AVAILABLE` set to true, the streamer client will send a message to the **streamer** requesting the next batch of data once it has finished processing the message. + - The previous step will be repeated as long as the `MORE_DATA_AVAILABLE` is set to true in the incoming message from the streamer. +- Once `MORE_DATA_AVAILABLE` is received as `false`, the streamer client will stop requesting new batches of data and just process real-time messages as they come. +- These messages can be transformed and filtered before being passed on to the **event handler**. +- Streamer clients also have a backpressure mechanism to avoid overloading the event handler with too many messages. + - For each message sent to an event handler, an internal counter is increased by one. This counter represented the currently unacknowledged messages. + - The expectation is that each message will receive an acknowledgement from the event handler. For each acknowledgement, the internal counter is decreased by one. + - The [`eventHandlerBuffer`](#eventhandlerbuffer) configuration item determines the maximum number of unacknowledged messages allowed on a streamer client configuration. + - If the `eventHandlerBuffer` number is reached, the streamer client will assume the event handler is overloaded, and stop sending messages until the number is decreased. Any messages received from the streamer will be queued in the meantime. +- The streamer client data recovery mechanism is handled by an internal file cache as well as a database record stored in the `PROCESS_REF` table. + - On a failure event, the streamer client will read from the file cache first (and table second if cache is not available) to determine what was the least recent non-acknowledged message sequence number. + - If there are no unacknowledged messages, it will pick the latest acknowledged sequence number. + - Then it will use this sequence number to request data from the streamer. This way it ensures any records after the last processed sequence will be streamed to the event handler. + - More information about sequence numbers and how they are used can be found in the [streamer page](/develop/server-capabilities/integrations/streamer-apis/streamer/). + +:::warning +Streamer client processes should run in `primaryOnly` mode to avoid sending a duplicated message stream to an event handler. +::: + +Additionally, streamer clients may use `xlator` plugins that enhance the available streamer configuration with new configuration blocks and data transformations. See the FIX xlator documentation [here](/develop/business-components/fix/fix-xlator/) for more information. ## Example configuration +A sample streamer client configuration can be seen below: + +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + eventHandlerBuffer = 500 + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } + + streamerClient(clientName = "FILTERED_QUOTE", selectOn = QUOTE.SYMBOL) { + dataSource(processName = "POSITION_STREAMER", sourceName = "QUOTE_STREAM") + + onMessage(where = "VODL") { + sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_VODL_QUOTE") { + QUOTE.SYMBOL + QUOTE.PRICE withAlias "BID_PRICE" + } + } + + onMessage(where = "MSFT") { + sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_MSFT_QUOTE") { + QUOTE.SYMBOL + QUOTE.SECURITY_EXCHANGE withAlias "EXCHANGE" + QUOTE.PRICE withAlias "BID_PRICE" + } + } + } +} +``` + +And the QUOTE table is defined as seen below: +```kotlin + table(name = "QUOTE", id = 7005) { + field(name = "SYMBOL", type = STRING) + field(name = "SECURITY_EXCHANGE", type = STRING) + field(name = "PRICE", type = DOUBLE) + field(Name = "QUOTE_DATETIME", type = DATETIME) + primaryKey("SYMBOL") + } +``` + +In the example we see two different streamer client definitions, one named "ORDER_OUT" and another one named "FILTERED_QUOTE". + +The "ORDER_OUT" definition, is pretty simple: +- The `dataSource` configuration has two parameters `processName` and `sourceName` with the values "POSITION_EVENT_HANDLER" for the former and "EVENT_VODL_QUOTE" for the latter. These correspond to the **streamer** process name, and the **stream** name defined within the streamer we are connecting to. +- The `eventHandlerBuffer` is configured to 500, so a maximum number of 500 events can ever be in flight as explained in the [overview](#overview) section. +- The `onMessage` block defines the simplest `send` configuration to redirect the data received from the streamer to an event handler without any transformations or filtering. In this case, the `targetProcess` parameter refers to the event handler process name (POSITION_EVENT_HANDLER) and the `messageType` parameter refers to the specific event handler endpoint we should route the message to. + +In the case of the "FILTERED_QUOTE" definition, we see a few more configuration settings. +- The `streamerClient` has been defined with a name, but also a `selectOn` field. This allows the streamer client to do specific logic based on one of the message fields. +- In this case the `selectOn` parameter is using the `SYMBOL` field of the `QUOTE` table, as we know the incoming message from the streamer is actually equivalent to the `QUOTE` table entity definition. +- The `dataSource` configuration is similar to "ORDER_OUT", but in this case we are targeting a different streamer definition: "QUOTE_STREAM". +- The `onMessage` blocks have a `where` parameter this time, and it is based on the `selectOn` field: "SYMBOL". + - In this configuration, we have two `onMessage` definitions to define different behaviour depending on whether the incoming "SYMBOL" value from a "QUOTE_STREAM" record is "MSFT" or "VODL". + - In the case of "VODL", we use the `sendFormatted` configuration to target the "POSITION_EVENT_HANDLER" process and the "EVENT_VODL_QUOTE" event handler. Then the message is formatted to only send the values for SYMBOL and PRICE field. It is important to not that the field names can also be formatted, and in this case the PRICE field name is changed to be BID_PRICE using the `withAlias` configuration. + - In the case of "MSFT", we use `sendFormatted` to target "POSITION_EVENT_HANDLER", but this time a different event handler: "EVENT_MSFT_QUOTE". The message itself is also formatted like the previous one, but we also include the field SECURITY_EXCHANGE and rename it as EXCHANGE. + +A sample workflow for the first streamer client definition can be seen below: +```mermaid +sequenceDiagram + participant Streamer + participant Streamer client + participant Event handler +Streamer client->>+Streamer: Requests data for Stream "ORDER_OUT" with sequence 5000 +Streamer->>+Streamer client: Sends 500 rows, MORE_DATA_AVAILABLE=true +Streamer client->>+Streamer client: Processes and queues the 500 rows +Note right of Streamer client: All processed events will be sent asynchronously at the same time a new batch is requested +Streamer client->>+Event handler: Sends Event 1 +Streamer client->>+Event handler: Sends Event 2 +Streamer client->>+Streamer: Requests next batch +Streamer client->>+Event handler: Sends Event 3, etc +Note right of Event handler: All acknowledgements will be sent asynchronously as soon the events have been processed. +Event handler->>+Streamer client: ACK Event 1 +Event handler->>+Streamer client: ACK Event 2 +Event handler->>+Streamer client: ACK Event 3, etc +Streamer->>+Streamer client: Sends 253 rows, MORE_DATA_AVAILABLE=false +Streamer client->>+Streamer client: Processes and queues the 253 rows. +Note right of Streamer client: The same asynchronous pattern of sending events and receiving acks applies. +Streamer->>+Streamer client: Sends 1 row, MORE_DATA_AVAILABLE=false. +Streamer client->>+Streamer client: Processes and queues 1 rows. +Streamer client->>+Event handler: Sends Event X +Event handler->>+Streamer client: ACK Event X +``` + ## Configuration options +### `streamerClient` +You can use the `streamerClient` configuration to declare a reliable stream. + +The simplest Streamer-client definition is: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +The `clientName` parameter is used to uniquely identify a `streamerClient` definition. + +In more detail, there are three types of streamer client definitions, including the one seen above: + +1. `GenesisSet` streamer client. It uses the raw [`GenesisSet`](/develop/server-capabilities/communications-meta/#genesisset) message received from the streamer for transformation and configuration purposes. +```kotlin +// builds a GenesisSet based streamer client +streamerClient(clientName = "{name}") { ... } +``` + +2. Table or View entity streamer client. It uses an existing [`DbEntity`](/develop/server-capabilities/data-access-apis/#database-entities) for type safe configuration. Most useful when the streamer is streaming `DbEntity` objects. +```kotlin +// builds a type safe QUOTE streamer client +streamerClient(clientName = "{name}", source = QUOTES { ... } +``` + +3. And finally, selective streamer clients. These type of streamerclient definitions combine a `selectOn` parameter with different `onMessage` blocks to ensure messages are handled differently depending on the selected field value. +The example below enables you to handle VODL quotes one way and MSFT quotes in another: + +```kotlin +streamerClient(clientName = "CLIENT", selectOn = QUOTE.SYMBOL) { + onMessage(where = "VODL") { ... } + onMessage(where = "MSFT") { ... } +} +``` + +In the case of `GenesisSet` streamer clients, the field selection syntax can be one of the following: + +```kotlin +// use the Fields object: +streamerClient(clientName = "CLIENT", selectionField = Fields.SYMBOL) { ... } + +// specify a field and type +streamerClient(clientName = "CLIENT", selectionField = "SYMBOL", type = INTEGER) { ... } + +// if no type is specified it will default to STRING +streamerClient(clientName = "CLIENT", selectionField = "SYMBOL") { ... } +``` + +#### `dataSource` + +The `dataSource` setting specifies what streamer process, and what stream within that process should be used as part of the `streamerClient` definition. + +The parameters are `processName` and `sourceName` to identify the streamer process name and stream name respectively. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage(subscriptionKey = "POSITION_EVENT_HANDLER") { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + onMessage(subscriptionKey = "POSITION_EVENT_HANDLER") { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### `onMessage` +The `onMessage` configuration defines what the streamer client does with each streamer message. It has three operations: `where`, `send` and `sendFormatted`. + +- When using non-selective [`streamerClient`](#streamerclient) definitions, `onMessage` may optionally have a `subscriptionKey` parameter to identify each `onMessage` block. +Different `onMessage` blocks may have different transformations and `where` configurations too. +In those cases, the `subscriptionKey` is important, as a single streamer client definition may connect to multiple processes or multiple event handler endpoints with different formatting logic, so `subscriptionKey` allows for an easy way to identify each `onMessage` stream. +By default, `subscriptionKey` is equal to the `streamerClient` definition `clientName` parameter. + +- In the case of selective `streamerClient` definitions, a `where` parameter must be provided, and optionally a `subscriptionKey` parameter as well. `subscriptionKey` is necessary for the same reason explained in the previous point and by default it will be equal to the `where` parameter value. + +Some examples for non-selective and selective `onMessage` blocks can be seen below: + +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + eventHandlerBuffer = 500 + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage(subscriptionKey = "POSITION") { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + onMessage(subscriptionKey = "OMS") { + send(targetProcess = "OMS_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } + + streamerClient(clientName = "FILTERED_QUOTE", selectOn = QUOTE.SYMBOL) { + dataSource(processName = "POSITION_STREAMER", sourceName = "QUOTE_STREAM") + + onMessage(where = "VODL") { + sendFormatted(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_VODL_QUOTE") { + QUOTE.SYMBOL + QUOTE.PRICE withAlias "BID_PRICE" + } + } + + onMessage(where = "MSFT", subscriptionKey = "OMS") { + sendFormatted(targetProcess = "OMS_EVENT_HANDLER", messageType = "EVENT_MSFT_QUOTE") { + QUOTE.SYMBOL + QUOTE.SECURITY_EXCHANGE withAlias "EXCHANGE" + QUOTE.PRICE withAlias "BID_PRICE" + } + } + } +} +``` + +##### `where` + +The `where` configurations ensures only certain messages are sent to the event handler based on a predicate. This operation has one parameter corresponding to the streamer client message type. This can be: +- a table or view entity +- a GenesisSet + +As this operation is a predicate, it must return a Boolean value. + +Example below for a `DbEntity` approach using the `QUOTE` table: + +```kotlin +where { quote -> + quote.price > 0.0 +} +``` + +Another example this time using `GenesisSet`: + +```kotlin +where { quote -> + quote.getDouble("PRICE", 0.0) > 0.0 +} +``` + +##### `send` and `sendFormatted` + +The `send` configuration allows directs and optionally formats the outgoing message. +It requires two parameters `targetProcess` and `messageType`. + +Each `onMessage` block must have at least one `send` block. + +For example: +```kotlin +send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_QUOTE_INSERT") +``` + +This will send the full content of the streamer message on to the target. + +In addition, for entity streamers, you can format the message in the same way as you would define the output of a view, data server, or request reply. + +Use `sendFormatted` in this scenario: + +```kotlin +sendFormatted("POSITION_EVENT_HANDLER", "EVENT_QUOTE_INSERT") { + QUOTES.SYMBOL + QUOTES.PRICE withAlias "BID_PRICE" +} +``` + + + +Finally, you can craft the message from scratch. + +This example uses just the streamer message as a parameter, which may be a `GenesisSet` or a `DbEntity` as explained in the [`where`](#where) section: + +```kotlin +send("POSITION_EVENT_HANDLER", "EVENT_QUOTE_INSERT") { quote -> + genesisSet { + "SYMBOL" with quote.symbol + "BID_PRICE" with quote.price + } +} +``` +The example below uses the streamer message as a parameter (in this case a `DbEntity` object for `QUOTE`) and an optional `GenesisSet` parameter that acts as the output object. +The `GenesisSet` object will be empty and this method is just a convenient way of writing the same code as in the previous example, without having to create a new `GenesisSet` object. + +```kotlin +send("QUOTE_HANDLER", "QUOTE_EVENT") { quote, set -> + set.setString("SYMBOL", quote.symbol) +} +``` +When using this `send` example, you need to specify both parameters, (`quote ->` or `quote, set ->`). The default `it` Kotlin parameter does not work in this case. + + +#### `isReplayable` + +`isReplayable` is a boolean flag that determines whether the stream should keep track of sequence numbers for recoverability purposes on process start (see [overview](#overview) for more information on recoverability). + +Default value is `false`. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + isReplayable = true + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### `eventHandlerBuffer` + +`eventHandlerBuffer` is a setting to specify how many unacknowledged messages can be "in-flight" before the streamer client stops sending. If the event handler fails to respond after this number of messages is reached, the streamer client stops sending messages. + +The default value is 50. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + isReplayable = 50 + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + + +#### `sentWarningRange` + +`sentWarningRange` is a setting to specify a time range in seconds to wait before marking the streamer client as WARNING or ERROR once `eventHandlerBuffer` has been reached. + +It compares the last time a message was sent to the event handler against the current time to understand the current elapsed time: +- If there are no unacknowledged messages, mark process as HEALTHY. +- If elapsed time is less than lower part of the range, mark process as HEALTHY. +- If elapsed time is between the lower part of then range and the upper part of the range, mark process as WARNING. +- If elapsed time is above the upper part of the range, mark process as ERROR. + +The default value is 5 seconds for the lower part of the range, and 60 seconds for the upper part of the range. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + sentWarningRange = 5L..60L + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### `receiveWarningRange` + +`receiveWarningRange` is similar to the previous setting, but it applies even when `eventHandlerBuffer` max size has not been reached. This can capture odd scenarios in which only a handful of messages are never acknowledged, but otherwise the data stream seems to work as expected. + +It compares the last time a message was received from the event handler against the current time to understand the current elapsed time: +- First the checks in `sentWarningRange` apply. Additionally, if the elapsed time for the last time a message was sent is less than the lower part of the `sentWarningRange` the process is always marked as healthy. +- If the previous point doesn't apply, the same logic used in `sentWarningRange` applies here: + - If elapsed time is less than lower part of the range, mark process as HEALTHY. + - If elapsed time is between the lower part of then range and the upper part of the range, mark process as WARNING. + - If elapsed time is above the upper part of the range, mark process as ERROR. + + +The default value is 5 seconds for the lower part of the range, and 60 seconds for the upper part of the range. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + receiveWarningRange = 5L..60L + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### `updateFrequency` + +`updateFrequency` is the time interval in milliseconds used to perform the checks for `receiveWarningRange` and `sentWarningRange`. + +The default value is 2000 milliseconds. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + updateFrequency = 5L..60L + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### `cacheEntries` + +:::warning +This setting must be used with extreme caution. If the default value is changed, the corresponding cache file (found inside $GENESIS_HOME/runtime/cache) for the relevant streamer client definition needs to be deleted before the streamer client process is restarted. This setting is marked as deprecated in the code to highlight the importance of this fact. +::: + +`cacheEntries` sets how many entries will be allocated in the streamer client cache file. + +Each streamer client subscription creates a `ChronicleMap` instance with capacity for 2000 entries by default. +This is used as the cache for events that have been received from the streamer and sent to the event handler. +The map uses the unique numeric sequence value provided by the streamer as a key, and then a status (`SENT` or `COMPLETE`) as the value. + +Once an event has been sent to the event handler, we mark the sequence reference as `SENT`. When the streamer client receives a response, it is marked as `COMPLETE`. + +Every two seconds, a job runs to update the PROCESS_REF database table with the data stored in the `ChronicleMap` instance (if there is any). The value stored in the database is either: + +- the oldest `SENT` value (if available) +or +- if no `SENT` values are stored, the latest `COMPLETE` value + +The idea is that whenever it is necessary, we can always replay events from the streamer, starting from the oldest `SENT` (there was no response for that event, and we can't guarantee it was processed successfully). If no `SENT` values are available, then it is safe to replay from the latest `COMPLETE` event. As part of this job, we also remove entries from the ChronicleMap that have been marked as `COMPLETE`. + +If the `ChronicleMap` file is not present, the database value will be used. + +`ChronicleMap` instances need to be pre-allocated with a fixed set of parameters. If the streamer client tries to store more than 2000 entries before the cleaning/flushing job runs (i.e. within two seconds), it exceeds the maximum capacity. This is a possibility when the stream is overloaded. In this scenario, `cacheEntries` should be changed to accommodate higher volumes. + +Example usage below: +```kotlin +streamerClients { + streamerClient(clientName = "ORDER_OUT") { + cacheEntries = 2000 + dataSource(processName = "POSITION_STREAMER", sourceName = "ORDER_OUT") + onMessage { + send(targetProcess = "POSITION_EVENT_HANDLER", messageType = "EVENT_ORDER_OUT") + } + } +} +``` + +#### Custom Log messages + + + ## Metrics +:::info +Ensure you have [enabled metrics](/build-deploy-operate/operate/metrics/#enabling-metrics) in your environment to view them. +::: + +The metrics for the Streamer client measure how long it takes for each streamer client to be processed by the target event handlers, as well as the total number of unacknowledged messages. + +These metrics allow monitoring tools to understand if some specific event handlers are overwhelmed, as well as how quickly they can deal with each streamer client message. + +If the unacknowledged message size stays at a high number, the possible causes are: +- The event handler is unable to keep up with the message flow. +- The event handler is not sending an acknowledgement in response to the messages for any reason. + +| Metric | Explanation | +|:----------------------------|:--------------------------------------------------------------------------------| +| unreplied_messages_size | The number of outstanding messages for each streamer client definition | +| outbound_processing_latency | The event handler processing latency for each streamer client definition | + ## Runtime configuration +To include your `*-streamer-client.kts` file definitions in a runtime process, you will need to ensure the process definition: + +1. Ensure `genesis-pal-streamerclient` is included in `module` +2. Ensure `global.genesis.streamerclient.pal` is included in `package` +3. Ensure your streamer-client.kts file(s) are defined in `script` +4. Ensure `pal` is set in `language` + +If you wish to run a dedicated process for a streamer, the following gives an example full process definition: + +```xml{13} + + POSITION + Streams trades to external FIX gateway + true + -Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -XX:MaxHeapFreeRatio=70 -XX:MinHeapFreeRatio=30 -XX:+UseG1GC -XX:+UseStringDeduplication -XX:OnOutOfMemoryError="handleOutOfMemoryError.sh %p" + genesis-pal-streamerclient + global.genesis.streamerclient.pal + true + + INFO,DATADUMP_ON + pal + +``` -## Testing \ No newline at end of file + \ No newline at end of file diff --git a/docs/001_develop/04_business-components/08_fix/fix-xlator.mdx b/docs/001_develop/04_business-components/08_fix/fix-xlator.mdx index 4c4285304d..cfacfafaf1 100644 --- a/docs/001_develop/04_business-components/08_fix/fix-xlator.mdx +++ b/docs/001_develop/04_business-components/08_fix/fix-xlator.mdx @@ -13,7 +13,8 @@ tags: - FIX --- -The FIX Xlator is a plugin for the [Streamer](../../../server/integration/gateways-and-streamers/streamer/) and [Streamer client](../../../server/integration/gateways-and-streamers/streamer-client/), which enables type-safe handling of FIX messages. It also gives access to a set of vital integration features, such as FIX_IN, EXECUTION_REPORT and CUSTOM_FIX. +The FIX Xlator is a plugin for the [Streamer](/develop/server-capabilities/integrations/streamer-apis/streamer/) and [Streamer client](/develop/server-capabilities/integrations/streamer-apis/streamer-client/), which enables type-safe handling of FIX messages. +It also allows you to easily define streams and streamer client definitions that work out of the box with the existing `FIX_IN` table defined in the FIX business component, as well as potential custom FIX tables containing fix data. ### Enabling the FIX Xlator @@ -43,32 +44,32 @@ fixConfiguration { ``` The `plugins` tag enables the plugin, and the `fixConfiguration` tag specifies the version to use. The version refers to a class generated by the fix-codegen-plugin as part of step 2, and will match the version name specified in the plugin configuration in the pom.xml file. - - ### Streamer -Enabling the plugin in a Streamer definition enables the `fixStream` definition. +Enabling the plugin in a streamer definition enables the `fixStream` configuration syntax. **FIX Streams**: -FIX Streams are enhanced stream definitions that come with a few useful defaults, enhanced fixed handling and automatic conversion to GenesisSet. +FIX Streams are enhanced stream definitions that come with a few useful defaults, enhanced FIX handling and automatic conversion to `GenesisSet`. #### Types of fixStream -There are three separate types of fixStream configuration: +There are three separate types of fixStream configuration: base `FIX_IN` streams, specific message type streams based on `FIX_IN` contents, custom table streams for custom specific types. +See sample code below: ```kotlin -fixStream("FIX_IN") +fixStream("FIX_IN") // Base FIX_IN table -fixStream("EXECUTION_REPORT") +fixStream("EXECUTION_REPORT") // Stream for FIX ExecutionReport types -fixStream("CUSTOM", CUSTOM_FIX.FIX_INDEX, CUSTOM_FIX.DATA, ExecutionReport::class) +fixStream("CUSTOM", CUSTOM_FIX.FIX_INDEX, CUSTOM_FIX.DATA, ExecutionReport::class) // Custom stream for a custom table and optionally a FIX type. ``` +See the configuration for the stream examples defined above in the table below: -| Name | Source Table | Stream Index | Fix Column | Stream Type | -| --- | --- | --- | --- | --- | -| FIX_IN | FIX_IN | BY_RX_SEQUENCE | FIX_DATA | Message | -| EXECUTION_REPORT | FIX_IN | BY_RX_SEQUENCE | FIX_DATA | ExecutionReport | -| CUSTOM | CUSTOM_FIX | FIX_INDEX | DATA | ExecutionReport | +| Stream Name | Source Table | Source Table Index | Table Column | Stream Message Type | +|------------------|--------------|--------------------|--------------|---------------------| +| FIX_IN | FIX_IN | BY_RX_SEQUENCE | FIX_DATA | Message | +| EXECUTION_REPORT | FIX_IN | BY_RX_SEQUENCE | FIX_DATA | ExecutionReport | +| CUSTOM | CUSTOM_FIX | FIX_INDEX | DATA | ExecutionReport | When using the FIX_IN table, the appropriate index and column are selected automatically.