-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stuck and inconsistent consumer when deleting non-acked messages #6374
Comments
@roeschter thank you for reporting this! We are seeing this behaviour as well on a linux based 3 node cluster installation. I was also able to reproduce it on a docker based local cluster (macOS). After a few hundred to a few thousand messages our consumers have unprocessed messages that do not get processed even though there is 1 pull request waiting per consumer: Also the The consumers seem to wake up randomly and deliver the unprocessed messages, but that happens with increasing delays the longer the consumers run. We have observed delays of several hours. The consumers are durable in our case. Restarting the subscription on a consumer often does nothing. Unprocessed messages stay unprocessed. Removing the consumers and recreating them with the same config (DeliverAll) on the existing stream helps for a while. As mentioned above: after a few hundred/thousand new messages things start to pile up again. |
@tehsphinx, could you share how you reproduced this? Do you also have redeliveries and delete messages from the stream? |
@MauriceVanVeen apart from the steps and scripts @roeschter provided above I've written a (relatively simple) Go test to reproduce this. However it still could take quite a while until the issue was hit. Once jetstream hit that "broken state" it was reproducible with every run of the test script. func TestDeleteMsg(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctxSignal, cancel := signal.NotifyContext(ctx, os.Interrupt, os.Kill)
defer cancel()
nc, js, err := getJS()
if err != nil {
log.Err(err).Msg("failed to connect to nats")
t.Fatal(err)
}
defer nc.Close()
streamName := "test_delete_msg"
// streamName := "bench" + randString(12)
stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{streamName + ".>"},
Retention: jetstream.LimitsPolicy,
MaxAge: 30 * 24 * time.Hour,
})
if err != nil {
t.Fatal(err)
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: streamName + "_consumer",
DeliverPolicy: jetstream.DeliverAllPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
AckWait: 100 * time.Millisecond,
MaxDeliver: 4,
MaxAckPending: -1,
})
if err != nil {
t.Fatal(err)
}
fmt.Println("consuming messages...")
messagesCtx, err := consumer.Messages()
if err != nil {
t.Fatal(err)
}
defer messagesCtx.Stop()
chErr := make(chan error)
var count, processed, acked, termed, deleted, unhandled int
go func() {
for {
msg, r := messagesCtx.Next()
if r != nil {
chErr <- r
return
}
count++
meta, r := msg.Metadata()
if r != nil {
chErr <- r
return
}
if 4 <= meta.NumDelivered {
processed++
unhandled++
} else if count%14 == 0 {
processed++
acked++
_ = msg.Ack()
} else if count%8 == 0 {
processed++
termed++
_ = msg.Term()
} else if count%6 == 0 {
processed++
deleted++
if e := stream.DeleteMsg(ctx, meta.Sequence.Stream); e != nil {
chErr <- r
return
}
}
fmt.Println(count, processed, meta.NumDelivered, "got msg", msg.Subject())
}
}()
sendCount := func() int {
var i int
for {
select {
case <-ctxSignal.Done():
return i
default:
}
i++
subj := streamName + "." + strconv.Itoa(i%2) + ".something." + strconv.Itoa(i)
msg := "hello " + strconv.Itoa(i)
if _, r := js.Publish(ctx, subj, []byte(msg)); r != nil {
t.Fatal(r)
}
}
}()
go func() {
for {
time.Sleep(50 * time.Millisecond)
if sendCount <= processed {
break
}
}
time.Sleep(1000 * time.Millisecond)
messagesCtx.Drain()
}()
var processingErr error
select {
case <-ctx.Done():
case processingErr = <-chErr:
}
fmt.Println("-----------------------------------")
fmt.Println("sent", sendCount, "processed", processed)
fmt.Println("received", count)
fmt.Println("acked", acked, "termed", termed)
fmt.Println("deleted", deleted, "unhandled", unhandled)
info, err := consumer.Info(context.Background())
if err != nil {
t.Fatal(err)
}
fmt.Println("-----------------------------------")
fmt.Println("Stream:", info.Stream)
fmt.Println("Consumer:", info.Name)
fmt.Println("NumAckPending: ", info.NumAckPending)
fmt.Println("NumRedelivered: ", info.NumRedelivered)
fmt.Println("NumWaiting: ", info.NumWaiting)
fmt.Println("NumPending: ", info.NumPending)
fmt.Println("-----------------------------------")
fmt.Printf("%+v\n", info)
if r := processingErr; r != nil && !errors.Is(r, jetstream.ErrMsgIteratorClosed) {
t.Fatal(r)
}
}
func getJS(addresses ...string) (*nats.Conn, jetstream.JetStream, error) {
address := "127.0.0.1"
if addr, ok := os.LookupEnv("NATS_ADDRESS"); ok {
address = addr
}
if len(addresses) != 0 {
address = addresses[0]
}
host, _, err := net.SplitHostPort(address)
if err != nil {
if !strings.Contains(err.Error(), "missing port in address") {
return nil, nil, err
}
host = address
}
var opts []nats.Option
if !slices.Contains([]string{"localhost", "127.0.0.1", "nats"}, host) {
opts = append(opts, nats.Secure(&tls.Config{InsecureSkipVerify: true}))
opts = append(opts, nats.UserInfo("js", ""))
}
fmt.Println("connecting to NATS at", address)
nc, err := nats.Connect(address, opts...)
if err != nil {
return nil, nil, errs.WithField(err, "address", address)
}
js, err := jetstream.New(nc)
if err != nil {
return nil, nil, errs.WithMsg(err, "failed to connect to JetStream")
}
return nc, js, nil
} The test can be aborted with a kill signal and will print out the state of the consumer after. Note: sry if the test doen't work out of the box. Just copy/pasted from our code. |
@tehsphinx, have been running the code for ~30 minutes but it didn't seem to repro for me thusfar. |
@MauriceVanVeen Just saw your comment about differences on Not sure if that is of consequence for this fix. |
#6399) Follow-up of #6387 Deleting from `o.pending` and `o.rdc` in `o.getNextMsg` was a correctness issue. Due to us calling `go o.processTerm` we could go into `o.getNextMsg` before `o.processTerm` has adjusted the accounting for the message that's deleted. So we must not change `o.pending/o.rdc` ourselves, and we can just skip, waiting for `o.processTerm` to run. Resolves #6374 Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Observed behavior
A consumer gets stuck and eventually unstuck after a random time, ranging from a few seconds to an hour. Even before the consumer gets stuck the statistics reported by the
nats consumer info
are seemingly incorrect. Outstanding Acks are too high. Redelivery count jumps wildly and sometimes goes backwards.The effect was observed independently in separate environment. It is a bit random though and may take a few minutes to appears:
Expected behavior
Consistent consumer behavior even when messages marked for redelivery are deleted
Server and client version
Nats server 2.10.24
Latest Java client
Latest Go client
Host environment
Windows 11 as well as Cloud Linux
Steps to reproduce
Conceptually:
Exact steps:
Reproducer20250114_noack_delete.zip
The text was updated successfully, but these errors were encountered: