diff --git a/bus.go b/bus.go index 837f23b..cfce1cf 100644 --- a/bus.go +++ b/bus.go @@ -21,18 +21,8 @@ import ( "github.com/livekit/psrpc/internal/bus" ) -const ( - LegacySubLegacyPub = bus.LegacySubLegacyPub - LegacySubCompatiblePub = bus.LegacySubCompatiblePub - WildcardSubCompatiblePub = bus.RouterSubCompatiblePub - WildcardSubWildcardPub = bus.RouterSubWildcardPub -) - -func SetChannelMode(m uint32) { - if m <= WildcardSubWildcardPub { - bus.ChannelMode.Store(m) - } -} +// TODO: clean up +func SetChannelMode(m uint32) {} type Channel = bus.Channel type MessageBus bus.MessageBus diff --git a/internal/bus/bus.go b/internal/bus/bus.go index cb3d3e9..38ac325 100644 --- a/internal/bus/bus.go +++ b/internal/bus/bus.go @@ -16,7 +16,6 @@ package bus import ( "context" - "sync/atomic" "google.golang.org/protobuf/proto" ) @@ -25,15 +24,6 @@ const ( DefaultChannelSize = 100 ) -const ( - LegacySubLegacyPub = iota - LegacySubCompatiblePub - RouterSubCompatiblePub - RouterSubWildcardPub -) - -var ChannelMode atomic.Uint32 - type Channel struct { Legacy, Server, Local string } diff --git a/internal/bus/bus_nats.go b/internal/bus/bus_nats.go index 285d8d7..34bc769 100644 --- a/internal/bus/bus_nats.go +++ b/internal/bus/bus_nats.go @@ -17,11 +17,9 @@ package bus import ( "context" "fmt" - "io" "sync" "github.com/nats-io/nats.go" - "go.uber.org/multierr" "google.golang.org/protobuf/proto" ) @@ -29,13 +27,13 @@ type natsMessageBus struct { nc *nats.Conn mu sync.Mutex - routers map[string]*natsWildcardRouter + routers map[string]*natsRouter } func NewNatsMessageBus(nc *nats.Conn) MessageBus { return &natsMessageBus{ nc: nc, - routers: map[string]*natsWildcardRouter{}, + routers: map[string]*natsRouter{}, } } @@ -44,45 +42,22 @@ func (n *natsMessageBus) Publish(_ context.Context, channel Channel, msg proto.M if err != nil { return err } - - if ChannelMode.Load() != RouterSubWildcardPub { - err = multierr.Append(err, n.nc.Publish(channel.Legacy, b)) - } - if ChannelMode.Load() != LegacySubLegacyPub { - err = multierr.Append(err, n.nc.Publish(channel.Server, b)) - } - return err + return n.nc.Publish(channel.Server, b) } func (n *natsMessageBus) Subscribe(_ context.Context, channel Channel, size int) (Reader, error) { if channel.Local == "" { - if ChannelMode.Load() == RouterSubWildcardPub { - return n.subscribe(channel.Server, size, false) - } else { - return n.subscribeCompatible(channel, size, false) - } + return n.subscribe(channel.Server, size, false) } else { - if ChannelMode.Load() == RouterSubWildcardPub { - return n.subscribeRouter(channel, size, false) - } else { - return n.subscribeCompatibleRouter(channel, size, false) - } + return n.subscribeRouter(channel, size, false) } } func (n *natsMessageBus) SubscribeQueue(_ context.Context, channel Channel, size int) (Reader, error) { if channel.Local == "" { - if ChannelMode.Load() == RouterSubWildcardPub { - return n.subscribe(channel.Server, size, true) - } else { - return n.subscribeCompatible(channel, size, true) - } + return n.subscribe(channel.Server, size, true) } else { - if ChannelMode.Load() == RouterSubWildcardPub { - return n.subscribeRouter(channel, size, true) - } else { - return n.subscribeCompatibleRouter(channel, size, true) - } + return n.subscribeRouter(channel, size, true) } } @@ -105,39 +80,33 @@ func (n *natsMessageBus) subscribe(channel string, size int, queue bool) (*natsS }, nil } -func (n *natsMessageBus) subscribeCompatible(channel Channel, size int, queue bool) (*natsCompatibleSubscription, error) { - sub, err := n.subscribe(channel.Server, size, queue) - if err != nil { - return nil, err - } - legacySub, err := n.subscribe(channel.Legacy, size, queue) - if err != nil { - sub.Close() - return nil, err +func (n *natsMessageBus) unsubscribeRouter(r *natsRouter, channel string) { + n.mu.Lock() + defer n.mu.Unlock() + if r.close(channel) { + delete(n.routers, r.channel) } - - return &natsCompatibleSubscription{ - sub: sub, - legacySub: legacySub, - msgChan: sub.msgChan, - legacyMsgChan: legacySub.msgChan, - }, nil } -func (n *natsMessageBus) subscribeWildcardRouter(channel string, sub *natsWildcardSubscription, queue bool) error { +func (n *natsMessageBus) subscribeRouter(channel Channel, size int, queue bool) (*natsRouterSubscription, error) { + sub := &natsRouterSubscription{ + msgChan: make(chan *nats.Msg, size), + channel: channel.Local, + } + n.mu.Lock() - r, ok := n.routers[channel] + r, ok := n.routers[channel.Server] if !ok { - r = &natsWildcardRouter{ - routes: map[string]*natsWildcardSubscription{}, + r = &natsRouter{ + routes: map[string]*natsRouterSubscription{}, bus: n, - channel: channel, + channel: channel.Server, queue: queue, } - n.routers[channel] = r + n.routers[channel.Server] = r } else if r.queue != queue { n.mu.Unlock() - return fmt.Errorf("subscription type mismatch for channel %q %q", channel, sub.channel) + return nil, fmt.Errorf("subscription type mismatch for channel %q %q", channel, sub.channel) } r.open(sub.channel, sub) @@ -145,63 +114,25 @@ func (n *natsMessageBus) subscribeWildcardRouter(channel string, sub *natsWildca n.mu.Unlock() if ok { - return nil + return sub, nil } var err error if queue { - r.sub, err = n.nc.QueueSubscribe(channel, "bus", r.write) + r.sub, err = n.nc.QueueSubscribe(channel.Server, "bus", r.write) } else { - r.sub, err = n.nc.Subscribe(channel, r.write) + r.sub, err = n.nc.Subscribe(channel.Server, r.write) } if err != nil { n.mu.Lock() - delete(n.routers, channel) + delete(n.routers, channel.Server) n.mu.Unlock() - } - return err -} - -func (n *natsMessageBus) unsubscribeWildcardRouter(r *natsWildcardRouter, channel string) { - n.mu.Lock() - defer n.mu.Unlock() - if r.close(channel) { - delete(n.routers, r.channel) - } -} - -func (n *natsMessageBus) subscribeRouter(channel Channel, size int, queue bool) (*natsWildcardSubscription, error) { - sub := &natsWildcardSubscription{ - msgChan: make(chan *nats.Msg, size), - channel: channel.Local, - } - - if err := n.subscribeWildcardRouter(channel.Server, sub, queue); err != nil { return nil, err } return sub, nil } -func (n *natsMessageBus) subscribeCompatibleRouter(channel Channel, size int, queue bool) (*natsCompatibleSubscription, error) { - sub, err := n.subscribeRouter(channel, size, queue) - if err != nil { - return nil, err - } - legacySub, err := n.subscribe(channel.Legacy, size, queue) - if err != nil { - sub.Close() - return nil, err - } - - return &natsCompatibleSubscription{ - sub: sub, - legacySub: legacySub, - msgChan: sub.msgChan, - legacyMsgChan: legacySub.msgChan, - }, nil -} - type natsSubscription struct { sub *nats.Subscription msgChan chan *nats.Msg @@ -221,22 +152,22 @@ func (n *natsSubscription) Close() error { return err } -type natsWildcardRouter struct { +type natsRouter struct { sub *nats.Subscription mu sync.Mutex - routes map[string]*natsWildcardSubscription + routes map[string]*natsRouterSubscription bus *natsMessageBus channel string queue bool } -func (n *natsWildcardRouter) open(channel string, s *natsWildcardSubscription) { +func (n *natsRouter) open(channel string, s *natsRouterSubscription) { n.mu.Lock() defer n.mu.Unlock() n.routes[channel] = s } -func (n *natsWildcardRouter) close(channel string) bool { +func (n *natsRouter) close(channel string) bool { n.mu.Lock() defer n.mu.Unlock() delete(n.routes, channel) @@ -247,7 +178,7 @@ func (n *natsWildcardRouter) close(channel string) bool { return false } -func (n *natsWildcardRouter) write(m *nats.Msg) { +func (n *natsRouter) write(m *nats.Msg) { channel, err := deserializeChannel(m.Data) if err != nil { return @@ -260,20 +191,17 @@ func (n *natsWildcardRouter) write(m *nats.Msg) { } } -type natsWildcardSubscription struct { +type natsRouterSubscription struct { msgChan chan *nats.Msg - router *natsWildcardRouter + router *natsRouter channel string } -func (n *natsWildcardSubscription) write(m *nats.Msg) { - select { - case n.msgChan <- m: - default: - } +func (n *natsRouterSubscription) write(m *nats.Msg) { + n.msgChan <- m } -func (n *natsWildcardSubscription) read() ([]byte, bool) { +func (n *natsRouterSubscription) read() ([]byte, bool) { msg, ok := <-n.msgChan if !ok { return nil, false @@ -281,41 +209,8 @@ func (n *natsWildcardSubscription) read() ([]byte, bool) { return msg.Data, true } -func (n *natsWildcardSubscription) Close() error { - n.router.bus.unsubscribeWildcardRouter(n.router, n.channel) +func (n *natsRouterSubscription) Close() error { + n.router.bus.unsubscribeRouter(n.router, n.channel) close(n.msgChan) return nil } - -type natsCompatibleSubscription struct { - sub, legacySub io.Closer - msgChan, legacyMsgChan chan *nats.Msg -} - -func (n *natsCompatibleSubscription) read() ([]byte, bool) { - for { - select { - case msg, ok := <-n.msgChan: - if !ok { - return nil, false - } - switch ChannelMode.Load() { - case RouterSubCompatiblePub, RouterSubWildcardPub: - return msg.Data, true - } - - case msg, ok := <-n.legacyMsgChan: - if !ok { - return nil, false - } - switch ChannelMode.Load() { - case LegacySubLegacyPub, LegacySubCompatiblePub: - return msg.Data, true - } - } - } -} - -func (n *natsCompatibleSubscription) Close() error { - return multierr.Combine(n.sub.Close(), n.legacySub.Close()) -} diff --git a/internal/bus/bus_nats_test.go b/internal/bus/bus_nats_test.go deleted file mode 100644 index d0e47e9..0000000 --- a/internal/bus/bus_nats_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package bus - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/nats-io/nats.go" - "github.com/stretchr/testify/require" - - "github.com/livekit/psrpc/internal" -) - -func TestNats(t *testing.T) { - ctx := context.Background() - - t.Run("wildcard", func(t *testing.T) { - nc, _ := nats.Connect(nats.DefaultURL) - t.Cleanup(nc.Close) - bus := NewNatsMessageBus(nc) - - cases := []struct { - minPubMode int - subMode int - localRouter bool - }{ - {LegacySubLegacyPub, LegacySubLegacyPub, false}, - {LegacySubLegacyPub, LegacySubCompatiblePub, false}, - {LegacySubLegacyPub, LegacySubLegacyPub, true}, - {LegacySubLegacyPub, LegacySubCompatiblePub, true}, - {LegacySubLegacyPub, RouterSubCompatiblePub, true}, - {LegacySubCompatiblePub, RouterSubWildcardPub, true}, - } - - for _, c := range cases { - ChannelMode.Store(uint32(c.subMode)) - - chA := Channel{ - Legacy: "test|foo|bar", - Server: "test", - } - chB := Channel{ - Legacy: "test|foo|baz", - Server: "test", - } - if c.localRouter { - chA.Local = "foo.bar" - chB.Local = "foo.baz" - } - - subA, err := Subscribe[*internal.Request](ctx, bus, chA, DefaultChannelSize) - require.NoError(t, err) - subB, err := Subscribe[*internal.Request](ctx, bus, chB, DefaultChannelSize) - require.NoError(t, err) - - for pubMode := c.minPubMode; pubMode <= c.subMode; pubMode++ { - ChannelMode.Store(uint32(pubMode)) - - t.Run(fmt.Sprintf("sub:%d/pub:%d/wild:%t", c.subMode, pubMode, c.localRouter), func(t *testing.T) { - require.NoError(t, bus.Publish(ctx, chA, &internal.Request{RequestId: "1"})) - - select { - case <-subA.Channel(): - case <-time.After(100 * time.Millisecond): - require.FailNow(t, "expected message from channel A") - } - - select { - case <-subB.Channel(): - require.FailNow(t, "expected no message from channel B") - case <-time.After(100 * time.Millisecond): - } - - require.NoError(t, bus.Publish(ctx, chB, &internal.Request{RequestId: "1"})) - - select { - case <-subB.Channel(): - case <-time.After(100 * time.Millisecond): - require.FailNow(t, "expected message from channel B") - } - - select { - case <-subA.Channel(): - require.FailNow(t, "expected no message from channel A") - case <-time.After(100 * time.Millisecond): - } - }) - } - - require.NoError(t, subA.Close()) - require.NoError(t, subB.Close()) - } - }) -} diff --git a/internal/bus/subscription.go b/internal/bus/subscription.go index ed0bd3c..5a45fae 100644 --- a/internal/bus/subscription.go +++ b/internal/bus/subscription.go @@ -16,8 +16,6 @@ package bus import ( "google.golang.org/protobuf/proto" - - "github.com/livekit/psrpc/internal/logger" ) type Subscription[MessageType proto.Message] interface { @@ -42,7 +40,6 @@ func newSubscription[MessageType proto.Message](sub Reader, size int) Subscripti p, err := deserialize(b) if err != nil { - logger.Error(err, "failed to deserialize message") continue } msgChan <- p.(MessageType) diff --git a/internal/test/my_service/my_service_test.go b/internal/test/my_service/my_service_test.go index dc0026a..08085b7 100644 --- a/internal/test/my_service/my_service_test.go +++ b/internal/test/my_service/my_service_test.go @@ -31,26 +31,38 @@ import ( func TestGeneratedService(t *testing.T) { t.Run("Local", func(t *testing.T) { - testGeneratedService(t, psrpc.NewLocalMessageBus()) + testGeneratedService(t, (func() func() psrpc.MessageBus { + bus := psrpc.NewLocalMessageBus() + return func() psrpc.MessageBus { return bus } + })()) }) t.Run("Redis", func(t *testing.T) { - rc := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{"localhost:6379"}}) - testGeneratedService(t, psrpc.NewRedisMessageBus(rc)) + testGeneratedService(t, func() psrpc.MessageBus { + rc := redis.NewUniversalClient(&redis.UniversalOptions{Addrs: []string{"localhost:6379"}}) + return psrpc.NewRedisMessageBus(rc) + }) }) t.Run("Nats", func(t *testing.T) { - nc, _ := nats.Connect(nats.DefaultURL) - testGeneratedService(t, psrpc.NewNatsMessageBus(nc)) + testGeneratedService(t, func() psrpc.MessageBus { + nc, _ := nats.Connect(nats.DefaultURL) + return psrpc.NewNatsMessageBus(nc) + }) }) } -func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { +func testGeneratedService(t *testing.T, bus func() psrpc.MessageBus) { ctx := context.Background() req := &MyRequest{} update := &MyUpdate{} - sA := createServer(t, bus) - sB := createServer(t, bus) + sA := createServer(t, bus()) + sB := createServer(t, bus()) + + t.Cleanup(func() { + shutdown(t, sA) + shutdown(t, sB) + }) requestCount := 0 requestHook := func(ctx context.Context, req proto.Message, rpcInfo psrpc.RPCInfo) { @@ -60,8 +72,8 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { responseHook := func(ctx context.Context, req proto.Message, rpcInfo psrpc.RPCInfo, res proto.Message, err error) { responseCount++ } - cA := createClient(t, bus, psrpc.WithClientRequestHooks(requestHook), psrpc.WithClientResponseHooks(responseHook)) - cB := createClient(t, bus) + cA := createClient(t, bus(), psrpc.WithClientRequestHooks(requestHook), psrpc.WithClientResponseHooks(responseHook)) + cB := createClient(t, bus()) // rpc NormalRPC(MyRequest) returns (MyResponse); _, err := cA.NormalRPC(ctx, req) @@ -101,7 +113,7 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { require.NotNil(t, res) require.NoError(t, res.Err) case <-time.After(time.Second * 3): - t.Fatalf("timed out") + require.FailNow(t, "timed out") } } @@ -121,7 +133,7 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { require.NoError(t, stream.Close(nil)) // let the service goroutine run - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Second) sA.Lock() sB.Lock() @@ -138,7 +150,7 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { require.NoError(t, sA.server.RegisterGetRegionStatsTopic("regionB")) sA.server.DeregisterGetRegionStatsTopic("regionB") require.NoError(t, sB.server.RegisterGetRegionStatsTopic("regionB")) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Second) respChan, err = cB.GetRegionStats(ctx, "regionB", req) require.NoError(t, err) @@ -147,7 +159,7 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { require.NotNil(t, res) require.NoError(t, res.Err) case <-time.After(time.Second): - t.Fatalf("timed out") + require.FailNow(t, "timed out") } sA.Lock() @@ -157,56 +169,20 @@ func testGeneratedService(t *testing.T, bus psrpc.MessageBus) { sA.Unlock() sB.Unlock() - // rpc ProcessUpdate(Ignored) returns (MyUpdate) { - // option (psrpc.options).subscription = true; - subA, err := cA.SubscribeProcessUpdate(ctx) - require.NoError(t, err) - subB, err := cB.SubscribeProcessUpdate(ctx) - require.NoError(t, err) - time.Sleep(time.Millisecond * 100) - - require.NoError(t, sA.server.PublishProcessUpdate(ctx, update)) - requireOne(t, subA, subB) - require.NoError(t, subA.Close()) - require.NoError(t, subB.Close()) - // rpc UpdateRegionState(Ignored) returns (MyUpdate) { // option (psrpc.options).subscription = true; // option (psrpc.options).topics = true; // option (psrpc.options).type = MULTI; - subA, err = cA.SubscribeUpdateRegionState(ctx, "regionA") + subA, err := cA.SubscribeUpdateRegionState(ctx, "regionA") require.NoError(t, err) - subB, err = cB.SubscribeUpdateRegionState(ctx, "regionA") + subB, err := cB.SubscribeUpdateRegionState(ctx, "regionA") require.NoError(t, err) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Second) require.NoError(t, sB.server.PublishUpdateRegionState(ctx, "regionA", update)) requireTwo(t, subA, subB) require.NoError(t, subA.Close()) require.NoError(t, subB.Close()) - - shutdown(t, sA) - shutdown(t, sB) -} - -func requireOne(t *testing.T, subA, subB psrpc.Subscription[*MyUpdate]) { - for i := 0; i < 2; i++ { - select { - case <-subA.Channel(): - if i == 0 { - continue - } - case <-subB.Channel(): - if i == 0 { - continue - } - case <-time.After(time.Second): - if i == 1 { - continue - } - } - t.Fatalf("%d responses received", i*2) - } } func requireTwo(t *testing.T, subA, subB psrpc.Subscription[*MyUpdate]) { @@ -215,7 +191,7 @@ func requireTwo(t *testing.T, subA, subB psrpc.Subscription[*MyUpdate]) { case <-subA.Channel(): case <-subB.Channel(): case <-time.After(time.Second): - t.Fatalf("timed out") + require.FailNow(t, "timed out") } } } @@ -246,7 +222,7 @@ func shutdown(t *testing.T, s *MyService) { case <-done: // continue case <-time.After(time.Second * 3): - t.Fatalf("shutdown not returning") + require.FailNow(t, "shutdown not returning") } } diff --git a/internal/test/psrpc_test.go b/internal/test/psrpc_test.go index 55c7828..909055d 100644 --- a/internal/test/psrpc_test.go +++ b/internal/test/psrpc_test.go @@ -40,7 +40,10 @@ func TestRPC(t *testing.T) { }{ { label: "Local", - bus: func() psrpc.MessageBus { return psrpc.NewLocalMessageBus() }, + bus: (func() func() psrpc.MessageBus { + bus := psrpc.NewLocalMessageBus() + return func() psrpc.MessageBus { return bus } + })(), }, { label: "Redis", @@ -61,29 +64,29 @@ func TestRPC(t *testing.T) { for _, c := range cases { c := c t.Run(fmt.Sprintf("RPC/%s", c.label), func(t *testing.T) { - testRPC(t, c.bus()) + testRPC(t, c.bus) }) t.Run(fmt.Sprintf("Stream/%s", c.label), func(t *testing.T) { - testStream(t, c.bus()) + testStream(t, c.bus) }) } } -func testRPC(t *testing.T, bus psrpc.MessageBus) { +func testRPC(t *testing.T, bus func() psrpc.MessageBus) { serviceName := "test" serverA := server.NewRPCServer(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) serverB := server.NewRPCServer(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) serverC := server.NewRPCServer(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) t.Cleanup(func() { serverA.Close(true) @@ -94,7 +97,7 @@ func testRPC(t *testing.T, bus psrpc.MessageBus) { c, err := client.NewRPCClient(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) require.NoError(t, err) retErr := psrpc.NewErrorf(psrpc.Internal, "foo") @@ -119,6 +122,7 @@ func testRPC(t *testing.T, bus psrpc.MessageBus) { require.NoError(t, err) err = server.RegisterHandler[*internal.Request, *internal.Response](serverB, rpc, nil, addOne, nil) require.NoError(t, err) + time.Sleep(time.Second) ctx := context.Background() requestID := rand.NewRequestID() @@ -141,6 +145,7 @@ func testRPC(t *testing.T, bus psrpc.MessageBus) { require.NoError(t, err) err = server.RegisterHandler[*internal.Request, *internal.Response](serverC, multiRpc, nil, returnError, nil) require.NoError(t, err) + time.Sleep(time.Second) requestID = rand.NewRequestID() resChan, err := client.RequestMulti[*internal.Response]( @@ -168,13 +173,13 @@ func testRPC(t *testing.T, bus psrpc.MessageBus) { } } -func testStream(t *testing.T, bus psrpc.MessageBus) { +func testStream(t *testing.T, bus func() psrpc.MessageBus) { serviceName := "test_stream" serverA := server.NewRPCServer(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) t.Cleanup(func() { serverA.Close(true) @@ -183,7 +188,7 @@ func testStream(t *testing.T, bus psrpc.MessageBus) { c, err := client.NewRPCClientWithStreams(&info.ServiceDefinition{ Name: serviceName, ID: rand.NewString(), - }, bus) + }, bus()) require.NoError(t, err) serverClose := make(chan struct{}) diff --git a/pkg/info/channels.go b/pkg/info/channels.go index d3452da..9bb666a 100644 --- a/pkg/info/channels.go +++ b/pkg/info/channels.go @@ -128,21 +128,9 @@ func formatServerChannel(service string, topic []string, queue bool) string { } func formatChannel(delim byte, parts ...any) string { - buf := make([]byte, 0, 4*channelPartsLen(parts...)/3) - return string(appendChannelParts(buf, delim, parts...)) -} - -func channelPartsLen[T any](parts ...T) int { - var n int - for _, t := range parts { - switch v := any(t).(type) { - case string: - n += len(v) + 1 - case []string: - n += channelPartsLen(v...) - } - } - return n + p := scratch.Get().(*[]byte) + defer scratch.Put(p) + return string(appendChannelParts(*p, delim, parts...)) } func appendChannelParts[T any](buf []byte, delim byte, parts ...T) []byte {