From 20d68a1268a9c9d136d9bc707499c14bc518eec3 Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Sat, 7 Dec 2024 18:18:17 +0100 Subject: [PATCH] Fix #23 Fix request is not cancelled on Close triggering to many tcp connections on Close() --- README.md | 2 +- sse.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 933c1d9..f5e35c9 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ See the [K6 SSE Extension design](docs/design/021-sse-api.md). ## k6 version -This extension is tested with `k6` version `v0.55.0` last release is [v0.1.5](https://github.com/phymbert/xk6-sse/releases/tag/v0.1.5). +This extension is tested with `k6` version `v0.55.0` last release is [v0.1.6](https://github.com/phymbert/xk6-sse/releases/tag/v0.1.6). ## Build diff --git a/sse.go b/sse.go index c88fd8c..95fe46d 100644 --- a/sse.go +++ b/sse.go @@ -56,6 +56,7 @@ type Client struct { samplesOutput chan<- metrics.SampleContainer builtinMetrics *metrics.BuiltinMetrics sseMetrics *sseMetrics + cancelRequest context.CancelFunc } // HTTPResponse is the http response returned by sse.open. @@ -169,6 +170,8 @@ func (mi *sse) Open(url string, args ...sobek.Value) (*HTTPResponse, error) { func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime, url string, args *sseOpenArgs, ) (*Client, func(), error) { + reqCtx, cancel := context.WithCancel(ctx) + sseClient := Client{ ctx: ctx, rt: rt, @@ -179,6 +182,7 @@ func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime, tagsAndMeta: args.tagsAndMeta, builtinMetrics: state.BuiltinMetrics, sseMetrics: mi.metrics, + cancelRequest: cancel, } // Overriding the NextProtos to avoid talking http2 @@ -206,7 +210,7 @@ func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime, httpMethod = args.method } - req, err := http.NewRequestWithContext(ctx, httpMethod, url, strings.NewReader(args.body)) + req, err := http.NewRequestWithContext(reqCtx, httpMethod, url, strings.NewReader(args.body)) if err != nil { return &sseClient, nil, err } @@ -259,7 +263,9 @@ func (c *Client) On(event string, handler sobek.Value) { // Close the event loop func (c *Client) Close() error { - return c.closeResponseBody() + err := c.closeResponseBody() + c.cancelRequest() + return err } func (c *Client) handleEvent(event string, args ...sobek.Value) { @@ -283,8 +289,6 @@ func (c *Client) closeResponseBody() error { c.handleEvent("error", c.rt.ToValue(err)) } close(c.done) - // Ensure response body is read in order for http tcp connection to be reused - _, _ = io.Copy(io.Discard, c.resp.Body) }) return err