Skip to content

Commit

Permalink
improve new consumers joining
Browse files Browse the repository at this point in the history
  • Loading branch information
covrom committed Nov 28, 2022
1 parent a1dd3c6 commit 7afd82e
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 45 deletions.
19 changes: 4 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@ This library does not have this disadvantage.
The connection string must be defined in the `REDIS_URL` environment value.

## Warning about creating a topic consumer group for the first time
A consumer group (but not a consumer!) must be created before posting messages to topic with unattached consumers.
This driver does not support new consumers attaching with a new group name after the publisher has sent multiple messages to a topic, because they do not receive previous messages.
All consumers already have a group, even if there is only one consumer in the group.

You can attach topic consumers before attach publishers, or create groups manually:
Consumer groups receive the same messages from the topic, and consumers within the group receive these messages exclusively.

```go
opt, err := redis.ParseURL(os.Getenv("REDIS_URL"))
if err != nil {
return err
}
rdb := redis.NewClient(opt)
![Messages flow](flow.png)

if _, err := rdb.XGroupCreateMkStream(context.Background(),
// here $ is needed, see https://redis.io/commands/xgroup-create/
"topics/1", "group1", "$").Result(); err != nil {
return err
}
```
This driver supports new consumers joining with a new group name after the publisher has sent multiple messages to a topic before the group was created. These consumers will receive all previous non-ACK-ed messages from the beginning of the topic.

## How to open topic and send message
```go
Expand Down
73 changes: 46 additions & 27 deletions basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redispubsub_test
import (
"bytes"
"context"
"sync"
"testing"
"time"

Expand All @@ -12,23 +11,10 @@ import (
)

func TestBasicUsage(t *testing.T) {
// consumer group must be created before posting messages with unattached consumers
if _, err := redisCli.XGroupCreateMkStream(context.Background(),
"topics/1", "group1", "$").Result(); err != nil {
t.Error(err)
return
}

ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "redis://topics/1")
if err != nil {
t.Errorf("could not open topic: %v", err)
return
}
defer topic.Shutdown(ctx)

orig := &pubsub.Message{
Body: []byte("Hello, World!\n"),
orig1 := &pubsub.Message{
Body: []byte("Message #1"),
// Metadata is optional and can be nil.
Metadata: map[string]string{
// These are examples of metadata.
Expand All @@ -38,7 +24,46 @@ func TestBasicUsage(t *testing.T) {
},
}

orig2 := &pubsub.Message{
Body: []byte("Message #2"),
// Metadata is optional and can be nil.
Metadata: map[string]string{
// These are examples of metadata.
// There is nothing special about the key names.
"language": "en",
},
}

// send before consumer attach
pubTest(ctx, orig1, t)
time.Sleep(100 * time.Millisecond)
pubTest(ctx, orig2, t)
time.Sleep(100 * time.Millisecond)
pubTest(ctx, orig1, t)
time.Sleep(100 * time.Millisecond)

// attach consumer and create group if needed
subTest(ctx, orig1, t)
time.Sleep(100 * time.Millisecond)
subTest(ctx, orig2, t)
time.Sleep(100 * time.Millisecond)
subTest(ctx, orig1, t)
time.Sleep(100 * time.Millisecond)

res, err := redisCli.XPending(ctx, "topics/1", "group1").Result()
if res.Count != 0 {
t.Error(res.Count, err)
}
}

func pubTest(ctx context.Context, orig *pubsub.Message, t *testing.T) {
topic, err := pubsub.OpenTopic(ctx, "redis://topics/1")
if err != nil {
t.Errorf("could not open topic: %v", err)
return
}
defer topic.Shutdown(ctx)

err = topic.Send(ctx, orig)
if err != nil {
t.Error(err)
Expand All @@ -56,12 +81,12 @@ func TestBasicUsage(t *testing.T) {
t.Error(err)
return
}
}

wg := &sync.WaitGroup{}
wg.Add(1)

func subTest(ctx context.Context, orig *pubsub.Message, t *testing.T) {
done := make(chan struct{})
go func() {
defer wg.Done()
defer close(done)
subs, err := pubsub.OpenSubscription(ctx, "redis://group1?consumer=cons1&topic=topics/1")
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -97,11 +122,5 @@ func TestBasicUsage(t *testing.T) {
}
}
}()

wg.Wait()

res, err := redisCli.XPending(ctx, "topics/1", "group1").Result()
if res.Count != 0 {
t.Error(res.Count, err)
}
<-done
}
Binary file added flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type subscription struct {

// SubscriptionOptions contains configuration for subscriptions.
type SubscriptionOptions struct {
From string // starting id, $ by default
From string // starting id ($ after tail of stream), 0 by default (from head of stream)
Consumer string // unique consumer name
NoAck bool
}
Expand All @@ -52,7 +52,7 @@ func openSubscription(broker *redis.Client, group, topic string, opts *Subscript
opts = &SubscriptionOptions{}
}
if opts.From == "" {
opts.From = "$"
opts.From = "0"
}
// Create a consumer group eater on the stream, and start consuming from
// the latest message (represented by $) or From id
Expand Down
2 changes: 1 addition & 1 deletion urlopener.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
var topic, consumer string
var noack bool
from := "$"
from := ""
for param, value := range u.Query() {
switch param {
case "topic":
Expand Down

0 comments on commit 7afd82e

Please sign in to comment.