-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathProcessingLoop.cs
81 lines (60 loc) · 2.16 KB
/
ProcessingLoop.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// ReSharper disable UnusedVariable
public class ProcessingLoop
{
SqlConnection sqlConnection = null!;
string connectionString = null!;
async Task RowTracking()
{
long newRowVersion = 0;
#region RowVersionTracker
var versionTracker = new RowVersionTracker();
// create table
await versionTracker.CreateTable(sqlConnection);
// save row version
await versionTracker.Save(sqlConnection, newRowVersion);
// get row version
var startingRow = await versionTracker.Get(sqlConnection);
#endregion
}
async Task ReadLoop()
{
#region ProcessingLoop
var rowVersionTracker = new RowVersionTracker();
var startingRow = await rowVersionTracker.Get(sqlConnection);
static async Task Callback(
SqlTransaction transaction,
IncomingMessage message,
Cancel cancel)
{
if (message.Body == null)
{
return;
}
using var reader = new StreamReader(message.Body);
var bodyText = await reader.ReadToEndAsync(cancel);
Console.WriteLine($"Message received in error message:\r\n{bodyText}");
}
static void ErrorCallback(Exception exception) =>
Environment.FailFast("Message processing loop failed", exception);
Task<SqlTransaction> BuildTransaction(Cancel cancel) =>
ConnectionHelpers.BeginTransaction(connectionString, cancel);
Task PersistRowVersion(
SqlTransaction transaction,
long rowVersion,
Cancel cancel) =>
rowVersionTracker.Save(sqlConnection, rowVersion, cancel);
var processingLoop = new MessageProcessingLoop(
table: "error",
delay: TimeSpan.FromSeconds(1),
transactionBuilder: BuildTransaction,
callback: Callback,
errorCallback: ErrorCallback,
startingRow: startingRow,
persistRowVersion: PersistRowVersion);
processingLoop.Start();
Console.ReadKey();
await processingLoop.Stop();
#endregion
}
}