Skip to content

Commit 2750996

Browse files
authored
Merge pull request #6 from PandaTechAM/development
updated readme
2 parents 27c5869 + d7b5b96 commit 2750996

16 files changed

+344
-363
lines changed

Readme.md

+34-25
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,23 @@
1-
# Pandatech.MassTransit.PostgresOutbox
1+
- [1. Pandatech.MassTransit.PostgresOutbox](#1-pandatechmasstransitpostgresoutbox)
2+
- [1.1. Features](#11-features)
3+
- [1.2. Getting Started](#12-getting-started)
4+
- [1.3. Installation](#13-installation)
5+
- [1.4. Configuration](#14-configuration)
6+
- [1.5. Usage](#15-usage)
7+
- [1.5.1. Configuration](#151-configuration)
8+
- [1.5.2. Publishing Messages (Outbox Pattern)](#152-publishing-messages-outbox-pattern)
9+
- [1.5.3. Consuming Messages (Inbox Pattern)](#153-consuming-messages-inbox-pattern)
10+
- [1.6. License](#16-license)
11+
12+
# 1. Pandatech.MassTransit.PostgresOutbox
213

314
Welcome to the Pandatech MassTransit PostgreSQL Outbox Extension repository. This library is designed to enhance
415
MassTransit's capabilities by introducing robust support for the Outbox and Inbox patterns with a particular focus on
516
PostgreSQL, alongside seamless integration with multiple DbContexts in Entity Framework Core. This extension is ideal
617
for developers seeking to ensure reliable message delivery and processing in distributed, microservice-oriented
718
architectures.
819

9-
## Features
20+
## 1.1. Features
1021

1122
- **Multiple DbContext Support**: Operate within complex systems using multiple data contexts without hassle.
1223
- **Outbox Pattern Implementation**: Reliably handle message sending operations, ensuring no messages are lost in
@@ -17,23 +28,23 @@ architectures.
1728
control, making your message handling processes more robust.
1829
- **Seamless Integration**: Designed to fit effortlessly into existing MassTransit and EF Core based projects.
1930

20-
## Getting Started
31+
## 1.2. Getting Started
2132

2233
To get started with the Pandatech MassTransit PostgreSQL Outbox Extension, ensure you have the following prerequisites:
2334

2435
- .NET Core 8 or later
2536
- An existing MassTransit project
2637
- PostgreSQL database
2738

28-
## Installation
39+
## 1.3. Installation
2940

3041
The library can be installed via NuGet Package Manager. Use the following command:
3142

3243
```bash
3344
Install-Package Pandatech.MassTransit.PostgresOutbox
3445
```
3546

36-
## Configuration
47+
## 1.4. Configuration
3748

3849
Before diving into the usage, it's essential to configure the Pandatech MassTransit PostgreSQL Outbox Extension in your
3950
application. This involves setting up your DbContexts, configuring MassTransit to use the extension, and initializing
@@ -42,47 +53,45 @@ the Outbox and Inbox features.
4253
Stay tuned for the next sections where we'll cover the usage details, showcasing how you can leverage this powerful
4354
extension to enhance your distributed systems.
4455

45-
## Usage
56+
## 1.5. Usage
57+
4658
Take into account that examples below are given for configuring both inbox and outbox patterns.
47-
If you need only one of those , consider using appropriate methods available(eg. instead of AddOutboxInboxServices use AddInboxServices and etc).
59+
If you need only one of those , consider using appropriate methods available(eg. instead of AddOutboxInboxServices use
60+
AddInboxServices and etc).
4861

49-
Configuration
62+
### 1.5.1. Configuration
5063

51-
Entity Configuration: Ensure your DbContext implements the IOutboxDbContext and IInboxDbContext interfaces.
52-
Configure your entities and generate migrations.
53-
Call ConfigureInboxOutboxEntities on your ModelBuilder to configure the necessary tables for inbox and outbox patterns.
64+
**Entity Configuration:** Ensure your `DbContext` implements the `IOutboxDbContext` and `IInboxDbContext` interfaces.
65+
Configure your entities and generate migrations.
66+
Call `ConfigureInboxOutboxEntities` on your `ModelBuilder` to configure the necessary tables for inbox and outbox patterns.
5467

5568
```csharp
56-
5769
protected override void OnModelCreating(ModelBuilder modelBuilder)
5870
{
5971
modelBuilder.ConfigureInboxOutboxEntities();
6072
}
6173
```
62-
63-
And you need to call UseQueryLocks() inside AddDbContext or AddDbContextPool , this needs for enabling ForUpdate feature.
64-
74+
And you need to call `UseQueryLocks()` inside `AddDbContext` or `AddDbContextPool` , this needs for enabling `ForUpdate`
75+
feature.
6576
```csharp
66-
builder.Services.AddDbContextPool<PostgresContext>(options =>
77+
builder.Services.AddDbContextPool<PostgresContext>(options =>
6778
options.UseNpgsql(connectionString)
6879
.UseQueryLocks());
6980
```
7081

71-
Service Registration: Register essential services on startup, specifying the DbContext type.
82+
**Service Registration:** Register essential services on startup, specifying the `DbContext` type.
7283
You can optionally override settings(its optional parameter).
7384

7485
```csharp
75-
7686
services.AddOutboxInboxServices<PostgresContext>();
7787
```
7888

79-
Publishing Messages (Outbox Pattern)
89+
### 1.5.2. Publishing Messages (Outbox Pattern)
8090

81-
To publish a message using the outbox pattern, call the AddToOutbox method on your DbContext,
82-
specifying your message. Remember to call SaveChanges() to persist the message to the database.
91+
To publish a message using the outbox pattern, call the `AddToOutbox` method on your `DbContext`,
92+
specifying your message. Remember to call `SaveChanges()` to persist the message to the database.
8393

8494
```csharp
85-
8695
dbContext.Orders.Add(new Order
8796
{
8897
Amount = 555,
@@ -96,10 +105,10 @@ dbContext.AddToOutbox(new OrderCreatedEvent());
96105
dbContext.SaveChanges();
97106
```
98107

99-
Consuming Messages (Inbox Pattern)
108+
### 1.5.3. Consuming Messages (Inbox Pattern)
100109

101110
To consume messages using the inbox pattern, create a consumer that inherits from
102-
InboxConsumer<TMessage, TDbContext> class, specifying the message type and DbContext type as generic arguments.
111+
`InboxConsumer<TMessage, TDbContext>` class, specifying the message type and `DbContext` type as generic arguments.
103112

104113
```csharp
105114

@@ -120,6 +129,6 @@ public class YourConsumer : InboxConsumer<YourMessage, PostgresContext>
120129
}
121130
```
122131

123-
## License
132+
## 1.6. License
124133

125134
Pandatech.MassTransit.PostgresOutbox is licensed under the MIT License.
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
using MassTransit.PostgresOutbox.Entities;
22
using Microsoft.EntityFrameworkCore;
33

4-
namespace MassTransit.PostgresOutbox.Abstractions
4+
namespace MassTransit.PostgresOutbox.Abstractions;
5+
6+
public interface IInboxDbContext
57
{
6-
public interface IInboxDbContext
7-
{
8-
public DbSet<InboxMessage> InboxMessages { get; set; }
9-
}
10-
}
8+
public DbSet<InboxMessage> InboxMessages { get; set; }
9+
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
using MassTransit.PostgresOutbox.Entities;
22
using Microsoft.EntityFrameworkCore;
33

4-
namespace MassTransit.PostgresOutbox.Abstractions
4+
namespace MassTransit.PostgresOutbox.Abstractions;
5+
6+
public interface IOutboxDbContext
57
{
6-
public interface IOutboxDbContext
7-
{
8-
public DbSet<OutboxMessage> OutboxMessages { get; set; }
9-
}
10-
}
8+
public DbSet<OutboxMessage> OutboxMessages { get; set; }
9+
}

src/MassTransit.PostgresOutbox/Abstractions/InboxConsumer.cs

+58-59
Original file line numberDiff line numberDiff line change
@@ -6,76 +6,75 @@
66
using Microsoft.Extensions.DependencyInjection;
77
using Microsoft.Extensions.Logging;
88

9-
namespace MassTransit.PostgresOutbox.Abstractions
9+
namespace MassTransit.PostgresOutbox.Abstractions;
10+
11+
public abstract class InboxConsumer<TMessage, TDbContext> : IConsumer<TMessage>
12+
where TMessage : class
13+
where TDbContext : DbContext, IInboxDbContext
1014
{
11-
public abstract class InboxConsumer<TMessage, TDbContext> : IConsumer<TMessage>
12-
where TMessage : class
13-
where TDbContext : DbContext, IInboxDbContext
14-
{
15-
private readonly string _consumerId;
16-
private readonly IServiceScopeFactory _serviceScopeFactory;
15+
private readonly string _consumerId;
16+
private readonly IServiceScopeFactory _serviceScopeFactory;
1717

18-
protected InboxConsumer(IServiceScopeFactory serviceScopeFactory)
19-
{
20-
_consumerId = GetType().ToString();
21-
_serviceScopeFactory = serviceScopeFactory;
22-
}
18+
protected InboxConsumer(IServiceScopeFactory serviceScopeFactory)
19+
{
20+
_consumerId = GetType().ToString();
21+
_serviceScopeFactory = serviceScopeFactory;
22+
}
2323

24-
public async Task Consume(ConsumeContext<TMessage> context)
25-
{
26-
using var scope = _serviceScopeFactory.CreateScope();
27-
var messageId = context.Headers.Get<Guid>(Constants.OutboxMessageId);
24+
public async Task Consume(ConsumeContext<TMessage> context)
25+
{
26+
using var scope = _serviceScopeFactory.CreateScope();
27+
var messageId = context.Headers.Get<Guid>(Constants.OutboxMessageId);
2828

29-
var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
30-
var logger = scope.ServiceProvider.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
29+
var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
30+
var logger = scope.ServiceProvider.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
3131

32-
var exists = await dbContext.InboxMessages.AnyAsync(x => x.MessageId == messageId && x.ConsumerId == _consumerId);
32+
var exists = await dbContext.InboxMessages.AnyAsync(x => x.MessageId == messageId && x.ConsumerId == _consumerId);
3333

34-
if (!exists)
34+
if (!exists)
35+
{
36+
dbContext.InboxMessages.Add(new InboxMessage
3537
{
36-
dbContext.InboxMessages.Add(new InboxMessage
37-
{
38-
MessageId = messageId!.Value,
39-
CreatedAt = DateTime.UtcNow,
40-
State = MessageState.New,
41-
ConsumerId = _consumerId,
42-
});
38+
MessageId = messageId!.Value,
39+
CreatedAt = DateTime.UtcNow,
40+
State = MessageState.New,
41+
ConsumerId = _consumerId,
42+
});
4343

44-
await dbContext.SaveChangesAsync();
45-
}
46-
47-
using var transactionScope = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted);
44+
await dbContext.SaveChangesAsync();
45+
}
4846

49-
var inboxMessage = await dbContext.InboxMessages
50-
.Where(x => x.MessageId == messageId)
51-
.Where(x => x.ConsumerId == _consumerId)
52-
.Where(x => x.State == MessageState.New)
53-
.ForUpdate(LockBehavior.SkipLocked)
54-
.FirstOrDefaultAsync();
47+
using var transactionScope = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted);
5548

56-
if (inboxMessage == null)
57-
{
58-
return;
59-
}
49+
var inboxMessage = await dbContext.InboxMessages
50+
.Where(x => x.MessageId == messageId)
51+
.Where(x => x.ConsumerId == _consumerId)
52+
.Where(x => x.State == MessageState.New)
53+
.ForUpdate(LockBehavior.SkipLocked)
54+
.FirstOrDefaultAsync();
6055

61-
try
62-
{
63-
await Consume(context.Message);
64-
inboxMessage.State = MessageState.Done;
65-
}
66-
catch (Exception ex)
67-
{
68-
logger.LogError(ex, "Exception thrown while consuming message");
69-
throw;
70-
}
71-
finally
72-
{
73-
inboxMessage!.UpdatedAt = DateTime.UtcNow;
74-
await dbContext.SaveChangesAsync();
75-
await transactionScope.CommitAsync();
76-
}
56+
if (inboxMessage == null)
57+
{
58+
return;
7759
}
7860

79-
public abstract Task Consume(TMessage message);
61+
try
62+
{
63+
await Consume(context.Message);
64+
inboxMessage.State = MessageState.Done;
65+
}
66+
catch (Exception ex)
67+
{
68+
logger.LogError(ex, "Exception thrown while consuming message");
69+
throw;
70+
}
71+
finally
72+
{
73+
inboxMessage!.UpdatedAt = DateTime.UtcNow;
74+
await dbContext.SaveChangesAsync();
75+
await transactionScope.CommitAsync();
76+
}
8077
}
81-
}
78+
79+
public abstract Task Consume(TMessage message);
80+
}
+5-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
namespace MassTransit.PostgresOutbox
1+
namespace MassTransit.PostgresOutbox;
2+
3+
public class Constants
24
{
3-
public class Constants
4-
{
5-
public const string OutboxMessageId = "OutboxMessageId";
6-
}
7-
}
5+
public const string OutboxMessageId = "OutboxMessageId";
6+
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
using MassTransit.PostgresOutbox.Enums;
22

3-
namespace MassTransit.PostgresOutbox.Entities
3+
namespace MassTransit.PostgresOutbox.Entities;
4+
5+
public class InboxMessage
46
{
5-
public class InboxMessage
6-
{
7-
public required Guid MessageId { get; set; }
8-
public required string ConsumerId { get; set; }
9-
public MessageState State { get; set; } = MessageState.New;
10-
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
11-
public DateTime? UpdatedAt { get; set; }
12-
}
13-
}
7+
public required Guid MessageId { get; set; }
8+
public required string ConsumerId { get; set; }
9+
public MessageState State { get; set; } = MessageState.New;
10+
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
11+
public DateTime? UpdatedAt { get; set; }
12+
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
using MassTransit.PostgresOutbox.Enums;
22

3-
namespace MassTransit.PostgresOutbox.Entities
4-
{
5-
public class OutboxMessage
6-
{
7-
public required Guid Id { get; set; }
8-
public MessageState State { get; set; } = MessageState.New;
9-
public required string Payload { get; set; }
10-
public required string Type { get; set; }
11-
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
12-
public DateTime? UpdatedAt { get; set; }
3+
namespace MassTransit.PostgresOutbox.Entities;
134

14-
}
15-
}
5+
public class OutboxMessage
6+
{
7+
public required Guid Id { get; set; }
8+
public MessageState State { get; set; } = MessageState.New;
9+
public required string Payload { get; set; }
10+
public required string Type { get; set; }
11+
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
12+
public DateTime? UpdatedAt { get; set; }
13+
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
namespace MassTransit.PostgresOutbox.Enums
1+
namespace MassTransit.PostgresOutbox.Enums;
2+
3+
public enum MessageState
24
{
3-
public enum MessageState
4-
{
5-
New = 1,
6-
Done = 2,
7-
}
8-
}
5+
New = 1,
6+
Done = 2,
7+
}

0 commit comments

Comments
 (0)