Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stuck and inconsistent consumer when deleting non-acked messages #6374

Closed
roeschter opened this issue Jan 14, 2025 · 5 comments · Fixed by #6387 or #6399
Closed

Stuck and inconsistent consumer when deleting non-acked messages #6374

roeschter opened this issue Jan 14, 2025 · 5 comments · Fixed by #6387 or #6399
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@roeschter
Copy link

Observed behavior

A consumer gets stuck and eventually unstuck after a random time, ranging from a few seconds to an hour. Even before the consumer gets stuck the statistics reported by the nats consumer info are seemingly incorrect. Outstanding Acks are too high. Redelivery count jumps wildly and sometimes goes backwards.

The effect was observed independently in separate environment. It is a bit random though and may take a few minutes to appears:

  1. Java, Windows 11, local 3 node cluster
  2. Golang, CLoud Linux, 3 nod cluster

Expected behavior

Consistent consumer behavior even when messages marked for redelivery are deleted

Server and client version

Nats server 2.10.24
Latest Java client
Latest Go client

Host environment

Windows 11 as well as Cloud Linux

Steps to reproduce

Conceptually:

  1. A high percentage of message not acked and being redelivered
  2. A high percentage of message deleted, while pending redelivery
  3. 3 node cluster

Exact steps:

  1. Start a 3 node cluster
  2. Start the Feeder (recreates a replica 3 stream)
  3. Start the Consumer
  4. Wait for "consumer stalled"
  5. It may be required to stop and restart the consumer.
  6. Observe the consumer info - Redelivery count may go down. Outstanding ack may be growing to unrealistic values.

Reproducer20250114_noack_delete.zip

@roeschter roeschter added the defect Suspected defect such as a bug or regression label Jan 14, 2025
@tehsphinx
Copy link

tehsphinx commented Jan 14, 2025

@roeschter thank you for reporting this!

We are seeing this behaviour as well on a linux based 3 node cluster installation. I was also able to reproduce it on a docker based local cluster (macOS).

After a few hundred to a few thousand messages our consumers have unprocessed messages that do not get processed even though there is 1 pull request waiting per consumer:

96368b23-6895-4fd0-9392-244bb27679a5

Also the Ack Pending doesn't seem to make sense. The numbers keep increasing although we have limited redeliveries to 4 (with 35s delay).

The consumers seem to wake up randomly and deliver the unprocessed messages, but that happens with increasing delays the longer the consumers run. We have observed delays of several hours.

The consumers are durable in our case. Restarting the subscription on a consumer often does nothing. Unprocessed messages stay unprocessed. Removing the consumers and recreating them with the same config (DeliverAll) on the existing stream helps for a while. As mentioned above: after a few hundred/thousand new messages things start to pile up again.

@MauriceVanVeen MauriceVanVeen self-assigned this Jan 20, 2025
@MauriceVanVeen
Copy link
Member

@tehsphinx, could you share how you reproduced this? Do you also have redeliveries and delete messages from the stream?

@tehsphinx
Copy link

tehsphinx commented Jan 21, 2025

@MauriceVanVeen apart from the steps and scripts @roeschter provided above I've written a (relatively simple) Go test to reproduce this. However it still could take quite a while until the issue was hit. Once jetstream hit that "broken state" it was reproducible with every run of the test script.

func TestDeleteMsg(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ctxSignal, cancel := signal.NotifyContext(ctx, os.Interrupt, os.Kill)
	defer cancel()

	nc, js, err := getJS()
	if err != nil {
		log.Err(err).Msg("failed to connect to nats")
		t.Fatal(err)
	}
	defer nc.Close()

	streamName := "test_delete_msg"
	// streamName := "bench" + randString(12)
	stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
		Name:      streamName,
		Subjects:  []string{streamName + ".>"},
		Retention: jetstream.LimitsPolicy,
		MaxAge:    30 * 24 * time.Hour,
	})
	if err != nil {
		t.Fatal(err)
	}

	consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Durable:       streamName + "_consumer",
		DeliverPolicy: jetstream.DeliverAllPolicy,
		AckPolicy:     jetstream.AckExplicitPolicy,
		AckWait:       100 * time.Millisecond,
		MaxDeliver:    4,
		MaxAckPending: -1,
	})
	if err != nil {
		t.Fatal(err)
	}

	fmt.Println("consuming messages...")
	messagesCtx, err := consumer.Messages()
	if err != nil {
		t.Fatal(err)
	}
	defer messagesCtx.Stop()

	chErr := make(chan error)

	var count, processed, acked, termed, deleted, unhandled int
	go func() {
		for {
			msg, r := messagesCtx.Next()
			if r != nil {
				chErr <- r
				return
			}

			count++

			meta, r := msg.Metadata()
			if r != nil {
				chErr <- r
				return
			}
			if 4 <= meta.NumDelivered {
				processed++
				unhandled++
			} else if count%14 == 0 {
				processed++
				acked++
				_ = msg.Ack()
			} else if count%8 == 0 {
				processed++
				termed++
				_ = msg.Term()
			} else if count%6 == 0 {
				processed++
				deleted++
				if e := stream.DeleteMsg(ctx, meta.Sequence.Stream); e != nil {
					chErr <- r
					return
				}
			}

			fmt.Println(count, processed, meta.NumDelivered, "got msg", msg.Subject())
		}
	}()

	sendCount := func() int {
		var i int
		for {
			select {
			case <-ctxSignal.Done():
				return i
			default:
			}

			i++
			subj := streamName + "." + strconv.Itoa(i%2) + ".something." + strconv.Itoa(i)
			msg := "hello " + strconv.Itoa(i)
			if _, r := js.Publish(ctx, subj, []byte(msg)); r != nil {
				t.Fatal(r)
			}
		}
	}()

	go func() {
		for {
			time.Sleep(50 * time.Millisecond)
			if sendCount <= processed {
				break
			}
		}
		time.Sleep(1000 * time.Millisecond)
		messagesCtx.Drain()
	}()

	var processingErr error
	select {
	case <-ctx.Done():
	case processingErr = <-chErr:
	}

	fmt.Println("-----------------------------------")
	fmt.Println("sent", sendCount, "processed", processed)
	fmt.Println("received", count)
	fmt.Println("acked", acked, "termed", termed)
	fmt.Println("deleted", deleted, "unhandled", unhandled)

	info, err := consumer.Info(context.Background())
	if err != nil {
		t.Fatal(err)
	}
	fmt.Println("-----------------------------------")
	fmt.Println("Stream:", info.Stream)
	fmt.Println("Consumer:", info.Name)
	fmt.Println("NumAckPending: ", info.NumAckPending)
	fmt.Println("NumRedelivered: ", info.NumRedelivered)
	fmt.Println("NumWaiting: ", info.NumWaiting)
	fmt.Println("NumPending: ", info.NumPending)

	fmt.Println("-----------------------------------")
	fmt.Printf("%+v\n", info)

	if r := processingErr; r != nil && !errors.Is(r, jetstream.ErrMsgIteratorClosed) {
		t.Fatal(r)
	}
}

func getJS(addresses ...string) (*nats.Conn, jetstream.JetStream, error) {
	address := "127.0.0.1"
	if addr, ok := os.LookupEnv("NATS_ADDRESS"); ok {
		address = addr
	}
	if len(addresses) != 0 {
		address = addresses[0]
	}

	host, _, err := net.SplitHostPort(address)
	if err != nil {
		if !strings.Contains(err.Error(), "missing port in address") {
			return nil, nil, err
		}
		host = address
	}

	var opts []nats.Option
	if !slices.Contains([]string{"localhost", "127.0.0.1", "nats"}, host) {
		opts = append(opts, nats.Secure(&tls.Config{InsecureSkipVerify: true}))
		opts = append(opts, nats.UserInfo("js", ""))
	}

	fmt.Println("connecting to NATS at", address)
	nc, err := nats.Connect(address, opts...)
	if err != nil {
		return nil, nil, errs.WithField(err, "address", address)
	}

	js, err := jetstream.New(nc)
	if err != nil {
		return nil, nil, errs.WithMsg(err, "failed to connect to JetStream")
	}
	return nc, js, nil
}

The test can be aborted with a kill signal and will print out the state of the consumer after.

Note: sry if the test doen't work out of the box. Just copy/pasted from our code.

@MauriceVanVeen
Copy link
Member

@tehsphinx, have been running the code for ~30 minutes but it didn't seem to repro for me thusfar.
Seeing the stream.DeleteMsg and how it's similar to @roeschter's repro I'm assuming it should also be fixed by the linked PR.

@tehsphinx
Copy link

tehsphinx commented Jan 22, 2025

@MauriceVanVeen Just saw your comment about differences on Purge and Delete in the PR. While in the above test script messages only get deleted, in the implementation we are using both Purge by subject and Delete by stream sequence. Some of the messages are sent to (mobile) clients and only after they get "acknowledged" we remove them. Some acknowledgement messages only let me reproduce the subject (which is unique) and don't contain a sequence id.

Not sure if that is of consequence for this fix.

derekcollison added a commit that referenced this issue Jan 23, 2025
#6399)

Follow-up of #6387

Deleting from `o.pending` and `o.rdc` in `o.getNextMsg` was a
correctness issue. Due to us calling `go o.processTerm` we could go into
`o.getNextMsg` before `o.processTerm` has adjusted the accounting for
the message that's deleted. So we must not change `o.pending/o.rdc`
ourselves, and we can just skip, waiting for `o.processTerm` to run.

Resolves #6374

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
3 participants