Skip to content

Commit

Permalink
Merge pull request #103 from EventStore/configurable-delays
Browse files Browse the repository at this point in the history
Configurable delays
  • Loading branch information
alexeyzimarev authored Oct 2, 2024
2 parents 9027db5 + f206caf commit f0f2c04
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 51 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/docs-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Setup Hugo
uses: peaceiris/actions-hugo@v2
uses: peaceiris/actions-hugo@v3
with:
hugo-version: 'latest'
extended: true
-
name: Install Node
uses: actions/setup-node@v4
with:
node-version: 19.9.0
node-version: 20
-
name: Install Dependencies
run: npm install
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull-request-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Check pull requests
uses: EventStore/Automations/pr-check@master
2 changes: 1 addition & 1 deletion .github/workflows/replicator-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/replicator-publish-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Set up QEMU
uses: docker/setup-qemu-action@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/replicator-publish-helm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
steps:
-
name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
-
name: Install Helm
uses: azure/setup-helm@v4
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ARG TARGETARCH
WORKDIR /app

ARG RUNTIME
RUN curl -sL https://deb.nodesource.com/setup_19.x | bash - \
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y --no-install-recommends nodejs \
&& npm install -g yarn

Expand All @@ -18,7 +18,7 @@ COPY ./src/es-replicator/ClientApp/package.json ./src/es-replicator/ClientApp/
COPY ./src/es-replicator/ClientApp/yarn.lock ./src/es-replicator/ClientApp/
RUN cd ./src/es-replicator/ClientApp && yarn install

FROM builder as publish
FROM builder AS publish
ARG TARGETARCH
COPY ./src ./src
RUN dotnet publish ./src/es-replicator -c Release -a $TARGETARCH -clp:NoSummary --no-self-contained -o /app/publish
Expand Down
46 changes: 24 additions & 22 deletions docs/content/docs/Configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,30 @@ The settings file has the `replicator` root level, all settings are children to

Available configuration options are:

| Option | Description |
|:----------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `replicator.reader.connectionString` | Connection string for the source cluster or instance |
| `replicator.reader.protocol` | Reader protocol (`tcp` or `grpc`) |
| `replicator.reader.pageSize` | Reader page size (only applicable for TCP protocol |
| `replicator.sink.connectionString` | Connection string for the target cluster or instance |
| `replicator.sink.protocol` | Writer protocol (`tcp` or `grpc`) |
| `replicator.sink.partitionCount` | Number of [partitioned]({{% ref "writers" %}}) concurrent writers |
| `replicator.sink.partitioner` | Custom JavaScript [partitioner]({{% ref "writers" %}}) |
| `replicator.sink.bufferSize` | Size of the sink buffer, `1000` events by default |
| `replicator.scavenge` | Enable real-time [scavenge]({{% ref "scavenge" %}}) |
| `replicator.runContinuously` | Set to `false` if you want Replicator to stop when it reaches the end of `$all` stream. Default is `true`, so the replication continues until you stop it explicitly. |
| `replicator.filters` | Add one or more of provided [filters]({{% ref "filters" %}}) |
| `replicator.transform` | Configure the [event transformation]({{% ref "Transforms" %}}) |
| `replicator.transform.bufferSize` | Size of the prepare buffer (filtering and transformations), `1000` events by default |
| `replicator.checkpoint.type` | Type of checkpoint store (`file` or `mongo`), `file` by default |
| `replicator.checkpoint.path` | The file path or connection string, `./checkpoint` by default |
| `replicator.checkpoint.checkpointAfter` | The number of events that must be replicated before a checkpoint is stored, `1000` events by default |
| `replicator.checkpoint.database` | The name of the Mongo database, `replicator` by default |
| `replicator.checkpoint.instanceId` | The name of the replicator instance to isolate checkpoints with in the Mongo database, `default` by default |
| `replicator.checkpoint.seeder.type` | Type of checkpoint seeder to use (`none` or `chaser`), `none` by default |
| `replicator.checkpoint.seeder.path` | The file path of the `chaser.chk`, empty by default |
| Option | Description |
|:---------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `replicator.reader.connectionString` | Connection string for the source cluster or instance |
| `replicator.reader.protocol` | Reader protocol (`tcp` or `grpc`) |
| `replicator.reader.pageSize` | Reader page size (only applicable for TCP protocol |
| `replicator.sink.connectionString` | Connection string for the target cluster or instance |
| `replicator.sink.protocol` | Writer protocol (`tcp` or `grpc`) |
| `replicator.sink.partitionCount` | Number of [partitioned]({{% ref "writers" %}}) concurrent writers |
| `replicator.sink.partitioner` | Custom JavaScript [partitioner]({{% ref "writers" %}}) |
| `replicator.sink.bufferSize` | Size of the sink buffer, `1000` events by default |
| `replicator.scavenge` | Enable real-time [scavenge]({{% ref "scavenge" %}}) |
| `replicator.runContinuously` | Set to `false` if you want Replicator to stop when it reaches the end of `$all` stream. Default is `true`, so the replication continues until you stop it explicitly. |
| `replicator.filters` | Add one or more of provided [filters]({{% ref "filters" %}}) |
| `replicator.transform` | Configure the [event transformation]({{% ref "Transforms" %}}) |
| `replicator.transform.bufferSize` | Size of the prepare buffer (filtering and transformations), `1000` events by default |
| `replicator.checkpoint.type` | Type of checkpoint store (`file` or `mongo`), `file` by default |
| `replicator.checkpoint.path` | The file path or connection string, `./checkpoint` by default |
| `replicator.checkpoint.checkpointAfter` | The number of events that must be replicated before a checkpoint is stored, `1000` events by default |
| `replicator.checkpoint.database` | The name of the Mongo database, `replicator` by default |
| `replicator.checkpoint.instanceId` | The name of the replicator instance to isolate checkpoints with in the Mongo database, `default` by default |
| `replicator.checkpoint.seeder.type` | Type of checkpoint seeder to use (`none` or `chaser`), `none` by default |
| `replicator.checkpoint.seeder.path` | The file path of the `chaser.chk`, empty by default |
| `replicator.restartDelayInSeconds` | The number of seconds between replication restarts, `5` by default |
| `replicator.reportMetricsFrequencyInSeconds` | The frequency at which to report certain metrics expressed in seconds, `5` by default |

## Enable verbose logging

Expand Down
18 changes: 10 additions & 8 deletions src/EventStore.Replicator/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ CancellationToken stoppingToken
break;
}

Log.Info("Will restart in 5 sec");
Log.Info("Will restart in {0} sec", replicatorOptions.RestartDelay.TotalSeconds);

try {
await Task.Delay(5000, stoppingToken);
}
catch (OperationCanceledException) {
// stopping now
break;
if (replicatorOptions.RestartDelay != TimeSpan.Zero) {
try {
await Task.Delay(replicatorOptions.RestartDelay, stoppingToken);
}
catch (OperationCanceledException) {
// stopping now
break;
}
}
}
}
Expand Down Expand Up @@ -172,7 +174,7 @@ async Task Report() {
ReplicationMetrics.LastSourcePosition.Set(position.Value);
}

await Task.Delay(5000, stoppingToken).ConfigureAwait(false);
await Task.Delay(replicatorOptions.ReportMetricsFrequency, stoppingToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException) {
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Replicator/ReplicatorOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace EventStore.Replicator;

public record ReplicatorOptions(bool RestartOnFailure, bool RunContinuously);
public record ReplicatorOptions(bool RestartOnFailure, bool RunContinuously, TimeSpan RestartDelay, TimeSpan ReportMetricsFrequency);
18 changes: 10 additions & 8 deletions src/es-replicator/Settings/ReplicatorSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ public record Filter {
}

public record Replicator {
public EsdbSettings Reader { get; init; }
public SinkSettings Sink { get; init; }
public bool Scavenge { get; init; }
public bool RestartOnFailure { get; init; } = true;
public bool RunContinuously { get; init; } = true;
public Checkpoint Checkpoint { get; init; } = new();
public TransformSettings Transform { get; init; } = new();
public Filter[] Filters { get; init; }
public EsdbSettings Reader { get; init; }
public SinkSettings Sink { get; init; }
public bool Scavenge { get; init; }
public bool RestartOnFailure { get; init; } = true;
public bool RunContinuously { get; init; } = true;
public int RestartDelayInSeconds { get; init; } = 5;
public int ReportMetricsFrequencyInSeconds { get; init; } = 5;
public Checkpoint Checkpoint { get; init; } = new();
public TransformSettings Transform { get; init; } = new();
public Filter[] Filters { get; init; }
}

public static class ConfigExtensions {
Expand Down
7 changes: 6 additions & 1 deletion src/es-replicator/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ public static void ConfigureServices(WebApplicationBuilder builder) {
);

services.AddSingleton(
new ReplicatorOptions(replicatorOptions.RestartOnFailure, replicatorOptions.RunContinuously)
new ReplicatorOptions(
replicatorOptions.RestartOnFailure,
replicatorOptions.RunContinuously,
TimeSpan.FromSeconds(replicatorOptions.RestartDelayInSeconds),
TimeSpan.FromSeconds(replicatorOptions.ReportMetricsFrequencyInSeconds)
)
);

RegisterCheckpointStore(replicatorOptions.Checkpoint, services);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task Verify() {
new PreparePipelineOptions(null, null),
new ChaserCheckpointSeeder(chaser_chk_copy, store),
store,
new ReplicatorOptions(false, false),
new ReplicatorOptions(false, false, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)),
CancellationToken.None
);

Expand Down
2 changes: 1 addition & 1 deletion test/EventStore.Replicator.Tests/CustomPartitionerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task ShouldKeepOrderWithinPartition() {
prepareOptions,
new NoCheckpointSeeder(),
_fixture.CheckpointStore,
new ReplicatorOptions(false, false),
new ReplicatorOptions(false, false, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)),
CancellationToken.None
);
await Timing.Measure("Replication", replication);
Expand Down

0 comments on commit f0f2c04

Please sign in to comment.