Skip to content

Commit

Permalink
Merge pull request #26 from phymbert/bug/issue-23-tcp-connection-not-…
Browse files Browse the repository at this point in the history
…released

fix: request http is not cancelled on Close
phymbert authored Dec 7, 2024
2 parents deb3ed3 + 20d68a1 commit c2f5a4e
Showing 2 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ See the [K6 SSE Extension design](docs/design/021-sse-api.md).

## k6 version

This extension is tested with `k6` version `v0.55.0` last release is [v0.1.5](https://github.com/phymbert/xk6-sse/releases/tag/v0.1.5).
This extension is tested with `k6` version `v0.55.0` last release is [v0.1.6](https://github.com/phymbert/xk6-sse/releases/tag/v0.1.6).

## Build

12 changes: 8 additions & 4 deletions sse.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ type Client struct {
samplesOutput chan<- metrics.SampleContainer
builtinMetrics *metrics.BuiltinMetrics
sseMetrics *sseMetrics
cancelRequest context.CancelFunc
}

// HTTPResponse is the http response returned by sse.open.
@@ -169,6 +170,8 @@ func (mi *sse) Open(url string, args ...sobek.Value) (*HTTPResponse, error) {
func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime,
url string, args *sseOpenArgs,
) (*Client, func(), error) {
reqCtx, cancel := context.WithCancel(ctx)

sseClient := Client{
ctx: ctx,
rt: rt,
@@ -179,6 +182,7 @@ func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime,
tagsAndMeta: args.tagsAndMeta,
builtinMetrics: state.BuiltinMetrics,
sseMetrics: mi.metrics,
cancelRequest: cancel,
}

// Overriding the NextProtos to avoid talking http2
@@ -206,7 +210,7 @@ func (mi *sse) open(ctx context.Context, state *lib.State, rt *sobek.Runtime,
httpMethod = args.method
}

req, err := http.NewRequestWithContext(ctx, httpMethod, url, strings.NewReader(args.body))
req, err := http.NewRequestWithContext(reqCtx, httpMethod, url, strings.NewReader(args.body))
if err != nil {
return &sseClient, nil, err
}
@@ -259,7 +263,9 @@ func (c *Client) On(event string, handler sobek.Value) {

// Close the event loop
func (c *Client) Close() error {
return c.closeResponseBody()
err := c.closeResponseBody()
c.cancelRequest()
return err
}

func (c *Client) handleEvent(event string, args ...sobek.Value) {
@@ -283,8 +289,6 @@ func (c *Client) closeResponseBody() error {
c.handleEvent("error", c.rt.ToValue(err))
}
close(c.done)
// Ensure response body is read in order for http tcp connection to be reused
_, _ = io.Copy(io.Discard, c.resp.Body)
})

return err

0 comments on commit c2f5a4e

Please sign in to comment.