diff --git a/README.md b/README.md index c93d6f2..7fe6fe4 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,58 @@ func main() { } ``` +#### Stream data to server + +[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go) + +```go +type metric struct { + Time time.Time `json:"t"` + Temp float32 `json:"T"` +} + +func CreateStream() error { + points := []metric{ + { + Time: time.Unix(time.Now().Unix()-1, 0), + Temp: 39, + }, + { + Time: time.Now(), + Temp: 40, + }, + } + + slice := withttp.Slice[metric](points) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). + WithMethod(http.MethodPost). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, metric](slice), + ). + WithExpectedStatusCodes(http.StatusOK) + + err := call.CallEndpoint(context.Background(), testEndpoint) + + received := call.Req.Body() + + fmt.Printf("recv: '%s'", string(received)) + /* + {"t":"2022-09-01T00:58:15+02:00","T":39} + {"t":"2022-09-01T00:58:16.15846898+02:00","T":40} + */ + return err +} +``` + +#### Several endpoints + In case of a wide range catalog of endpoints, predefined parameters and behaviours can be defined by employing an endpoint definition. @@ -170,3 +222,7 @@ func main() { } } ``` + +### Caveats: + +- Fasthttp request streams are not supported as of now. \ No newline at end of file diff --git a/adapter_fasthttp.go b/adapter_fasthttp.go index 879c242..2e32147 100644 --- a/adapter_fasthttp.go +++ b/adapter_fasthttp.go @@ -11,6 +11,8 @@ import ( type ( fastHttpReqAdapter struct { + stream io.ReadWriteCloser + req *fasthttp.Request } @@ -58,9 +60,9 @@ func (a *fastHttpReqAdapter) SetURL(u *url.URL) { } } -func (a *fastHttpReqAdapter) SetBodyStream(body io.ReadCloser, bodySize int) { - // TODO: Caveat. Content type may be unknown at the time of setting streams. - a.req.SetBodyStream(body, bodySize) +func (a *fastHttpReqAdapter) SetBodyStream(body io.ReadWriteCloser, bodySize int) { + a.stream = body + a.req.SetBodyStream(a.stream, bodySize) } func (a *fastHttpReqAdapter) SetBody(body []byte) { @@ -71,8 +73,8 @@ func (a *fastHttpReqAdapter) Body() []byte { return a.req.Body() } -func (a *fastHttpReqAdapter) BodyStream() io.ReadCloser { - return io.NopCloser(bytes.NewReader(a.req.Body())) +func (a *fastHttpReqAdapter) BodyStream() io.ReadWriteCloser { + return a.stream } func (a *fastHttpReqAdapter) URL() *url.URL { @@ -125,7 +127,7 @@ func adaptResFastHttp(res *fasthttp.Response) Response { } func (a *fastHttpResAdapter) SetBody(body io.ReadCloser) { - a.res.SetBodyStream(body, 0) + a.res.SetBodyStream(body, -1) } func (a *fastHttpResAdapter) Status() int { diff --git a/adapter_mock.go b/adapter_mock.go index c21c99e..fb38e7a 100644 --- a/adapter_mock.go +++ b/adapter_mock.go @@ -21,7 +21,7 @@ func (a *MockHttpClientAdapter) Request() (Request, error) { return adaptReqMock(req), err } -func (a *MockHttpClientAdapter) Do(ctx context.Context, req Request) (Response, error) { +func (a *MockHttpClientAdapter) Do(_ context.Context, _ Request) (Response, error) { return adaptResMock(&http.Response{}), nil } diff --git a/adapter_native.go b/adapter_native.go index 5a3d660..f0d3fb1 100644 --- a/adapter_native.go +++ b/adapter_native.go @@ -10,6 +10,8 @@ import ( type ( nativeReqAdapter struct { + body io.ReadWriteCloser + req *http.Request } @@ -38,13 +40,15 @@ func (a *nativeReqAdapter) SetURL(u *url.URL) { a.req.URL = u } -func (a *nativeReqAdapter) SetBodyStream(body io.ReadCloser, _ int) { +func (a *nativeReqAdapter) SetBodyStream(body io.ReadWriteCloser, _ int) { + a.body = body a.req.Body = body } func (a *nativeReqAdapter) SetBody(payload []byte) { // TODO: pool these readers - a.req.Body = io.NopCloser(bytes.NewReader(payload)) + a.body = closableReaderWriter{ReadWriter: bytes.NewBuffer(payload)} + a.req.Body = a.body } func (a *nativeReqAdapter) Body() []byte { @@ -52,8 +56,8 @@ func (a *nativeReqAdapter) Body() []byte { return bts } -func (a *nativeReqAdapter) BodyStream() io.ReadCloser { - return a.req.Body +func (a *nativeReqAdapter) BodyStream() io.ReadWriteCloser { + return a.body } func (a *nativeReqAdapter) URL() *url.URL { diff --git a/call.go b/call.go index 9eff18d..8314bf7 100644 --- a/call.go +++ b/call.go @@ -4,7 +4,7 @@ import ( "bytes" "context" "io" - "strconv" + "sync" ) type ( @@ -12,7 +12,7 @@ type ( Parse(c *Call[T], r Response) error } - CalReqOption[T any] interface { + CallReqOption[T any] interface { Configure(c *Call[T], r Request) error } @@ -34,6 +34,9 @@ type ( ReqContentType ContentType ReqBodyRaw []byte + ReqIsStream bool + + ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error } ) @@ -68,7 +71,7 @@ func (c *Call[T]) withRes(fn CallResOption[T]) *Call[T] { return c } -func (c *Call[T]) withReq(fn CallReqOptionFunc[T]) *Call[T] { +func (c *Call[T]) withReq(fn CallReqOption[T]) *Call[T] { c.reqOptions = append( c.reqOptions, ReqOptionFunc(func(req Request) error { @@ -96,16 +99,6 @@ func (c *Call[T]) configureReq(req Request) error { return nil } -func (c *Call[T]) Request(opts ...ReqOption) *Call[T] { - c.reqOptions = append(c.reqOptions, opts...) - return c -} - -func (c *Call[T]) Response(opts ...ResOption) *Call[T] { - c.resOptions = append(c.resOptions, opts...) - return c -} - func (c *Call[T]) Call(ctx context.Context) (err error) { req, err := c.client.Request() defer func() { c.Req = req }() @@ -118,8 +111,23 @@ func (c *Call[T]) Call(ctx context.Context) (err error) { return } + var wg *sync.WaitGroup + + if c.ReqIsStream { + wg = &sync.WaitGroup{} + wg.Add(1) + + go func() { + _ = c.ReqStreamWriter(ctx, c, req, wg) + }() + } + res, err := c.client.Do(ctx, req) + if c.ReqIsStream { + wg.Wait() + } + if err != nil { return } @@ -151,8 +159,23 @@ func (c *Call[T]) CallEndpoint(ctx context.Context, e *Endpoint) (err error) { return } + var wg *sync.WaitGroup + + if c.ReqIsStream { + wg = &sync.WaitGroup{} + wg.Add(1) + + go func() { + _ = c.ReqStreamWriter(ctx, c, req, wg) + }() + } + res, err := c.client.Do(ctx, req) + if c.ReqIsStream { + wg.Wait() + } + if err != nil { return } @@ -171,81 +194,3 @@ func (c *Call[T]) CallEndpoint(ctx context.Context, e *Endpoint) (err error) { return } - -func (c *Call[T]) WithURL(raw string) *Call[T] { - return c.withReq(WithURL[T](raw)) -} - -func (c *Call[T]) WithURI(raw string) *Call[T] { - return c.withReq(WithURI[T](raw)) -} - -func (c *Call[T]) WithMethod(method string) *Call[T] { - return c.withReq(WithMethod[T](method)) -} - -// WithBodyStream receives a stream of data to set on the request. Second parameter `bodySize` indicates -// the estimated content-length of this stream. Required when employing fasthttp http client. -func (c *Call[T]) WithBodyStream(rc io.ReadCloser, bodySize int) *Call[T] { - return c.withReq(WithBodyStream[T](rc, bodySize)) -} - -func (c *Call[T]) WithBody(payload any) *Call[T] { - return c.withReq(WithBody[T](payload)) -} - -func (c *Call[T]) WithRawBody(payload []byte) *Call[T] { - return c.withReq(WithRawBody[T](payload)) -} - -func (c *Call[T]) WithContentLength(length int) *Call[T] { - return c.WithHeader("content-length", strconv.FormatInt(int64(length), 10), true) -} - -func (c *Call[T]) WithHeader(key, value string, override bool) *Call[T] { - return c.withReq(WithHeader[T](key, value, override)) -} - -func (c *Call[T]) WithHeaderFunc(fn func() (key, value string, override bool)) *Call[T] { - return c.withReq(WithHeaderFunc[T](fn)) -} - -func (c *Call[T]) WithContentType(ct ContentType) *Call[T] { - return c.withReq(WithContentType[T](ct)) -} - -func (c *Call[T]) WithReadBody() *Call[T] { - return c.withRes(WithParseBodyRaw[T]()) -} - -func (c *Call[T]) WithStreamChan(factory StreamFactory[T], ch chan<- T) *Call[T] { - return c.withRes(WithStreamChan[T](factory, ch)) -} - -func (c *Call[T]) WithStream(factory StreamFactory[T], fn func(T) bool) *Call[T] { - return c.withRes(WithStream[T](factory, fn)) -} - -func (c *Call[T]) WithJSONEachRowChan(out chan<- T) *Call[T] { - return c.WithStreamChan(NewJSONEachRowStreamFactory[T](), out) -} - -func (c *Call[T]) WithJSONEachRow(fn func(T) bool) *Call[T] { - return c.WithStream(NewJSONEachRowStreamFactory[T](), fn) -} - -func (c *Call[T]) WithIgnoreBody() *Call[T] { - return c.withRes(WithIgnoredBody[T]()) -} - -func (c *Call[T]) WithParseJSON() *Call[T] { - return c.withRes(WithJSON[T]()) -} - -func (c *Call[T]) WithAssert(fn func(req Response) error) *Call[T] { - return c.withRes(WithAssertion[T](fn)) -} - -func (c *Call[T]) WithExpectedStatusCodes(states ...int) *Call[T] { - return c.withRes(WithExpectedStatusCodes[T](states...)) -} diff --git a/call_req.go b/call_req.go new file mode 100644 index 0000000..3c31164 --- /dev/null +++ b/call_req.go @@ -0,0 +1,89 @@ +package withttp + +import ( + "io" + "strconv" +) + +type ( + StreamCallReqOption[T any] interface { + CallReqOption[T] + + stream() + } + + StreamCallReqOptionFunc[T any] func(c *Call[T], req Request) error +) + +func (s StreamCallReqOptionFunc[T]) stream() {} + +func (s StreamCallReqOptionFunc[T]) Configure(c *Call[T], req Request) error { + return s(c, req) +} + +func (c *Call[T]) Request(opts ...ReqOption) *Call[T] { + c.reqOptions = append(c.reqOptions, opts...) + return c +} + +func (c *Call[T]) WithURL(raw string) *Call[T] { + return c.withReq(WithURL[T](raw)) +} + +func (c *Call[T]) WithURI(raw string) *Call[T] { + return c.withReq(WithURI[T](raw)) +} + +func (c *Call[T]) WithMethod(method string) *Call[T] { + return c.withReq(WithMethod[T](method)) +} + +type ( + rangeable[T any] interface { + Range(func(int, T) bool) + } + + Slice[T any] []T +) + +func (s Slice[T]) Range(fn func(int, T) bool) { + for i, x := range s { + if !fn(i, x) { + return + } + } +} + +func (c *Call[T]) WithRequestStreamBody(opt StreamCallReqOptionFunc[T]) *Call[T] { + return c.withReq(opt) +} + +// WithBodyStream receives a stream of data to set on the request. Second parameter `bodySize` indicates +// the estimated content-length of this stream. Required when employing fasthttp http client. +func (c *Call[T]) WithBodyStream(rc io.ReadWriteCloser, bodySize int) *Call[T] { + return c.withReq(WithBodyStream[T](rc, bodySize)) +} + +func (c *Call[T]) WithBody(payload any) *Call[T] { + return c.withReq(WithBody[T](payload)) +} + +func (c *Call[T]) WithRawBody(payload []byte) *Call[T] { + return c.withReq(WithRawBody[T](payload)) +} + +func (c *Call[T]) WithContentLength(length int) *Call[T] { + return c.WithHeader("content-length", strconv.FormatInt(int64(length), 10), true) +} + +func (c *Call[T]) WithHeader(key, value string, override bool) *Call[T] { + return c.withReq(WithHeader[T](key, value, override)) +} + +func (c *Call[T]) WithHeaderFunc(fn func() (key, value string, override bool)) *Call[T] { + return c.withReq(WithHeaderFunc[T](fn)) +} + +func (c *Call[T]) WithContentType(ct ContentType) *Call[T] { + return c.withReq(WithContentType[T](ct)) +} diff --git a/call_res.go b/call_res.go new file mode 100644 index 0000000..eceeba5 --- /dev/null +++ b/call_res.go @@ -0,0 +1,42 @@ +package withttp + +func (c *Call[T]) Response(opts ...ResOption) *Call[T] { + c.resOptions = append(c.resOptions, opts...) + return c +} + +func (c *Call[T]) WithReadBody() *Call[T] { + return c.withRes(WithParseBodyRaw[T]()) +} + +func (c *Call[T]) WithParseStreamChan(factory StreamFactory[T], ch chan<- T) *Call[T] { + return c.withRes(WithParseStreamChan[T](factory, ch)) +} + +func (c *Call[T]) WithParseStream(factory StreamFactory[T], fn func(T) bool) *Call[T] { + return c.withRes(WithParseStream[T](factory, fn)) +} + +func (c *Call[T]) WithParseJSONEachRowChan(out chan<- T) *Call[T] { + return c.WithParseStreamChan(NewJSONEachRowStreamFactory[T](), out) +} + +func (c *Call[T]) WithParseJSONEachRow(fn func(T) bool) *Call[T] { + return c.WithParseStream(NewJSONEachRowStreamFactory[T](), fn) +} + +func (c *Call[T]) WithIgnoreResponseBody() *Call[T] { + return c.withRes(WithIgnoredBody[T]()) +} + +func (c *Call[T]) WithParseJSON() *Call[T] { + return c.withRes(WithParseJSON[T]()) +} + +func (c *Call[T]) WithAssert(fn func(req Response) error) *Call[T] { + return c.withRes(WithAssertion[T](fn)) +} + +func (c *Call[T]) WithExpectedStatusCodes(states ...int) *Call[T] { + return c.withRes(WithExpectedStatusCodes[T](states...)) +} diff --git a/codec/constants.go b/codec/constants.go index 50b6127..7540d56 100644 --- a/codec/constants.go +++ b/codec/constants.go @@ -1,5 +1,6 @@ package codec var ( - NativeJSONCodec = NewNativeJsonCodec() + NativeJSONCodec = NewNativeJsonCodec() + NativeJSONEachRowCodec = NewNativeJsonEachRowCodec(NativeJSONCodec) ) diff --git a/codec/json_each_row.go b/codec/json_each_row.go new file mode 100644 index 0000000..1ceb180 --- /dev/null +++ b/codec/json_each_row.go @@ -0,0 +1,25 @@ +package codec + +const ( + LN = byte('\n') +) + +type ( + NativeJsonEachRowCodec struct { + NativeJsonCodec + } +) + +func (c NativeJsonEachRowCodec) Encode(t any) (bts []byte, err error) { + bts, err = c.NativeJsonCodec.Encode(t) + if err != nil { + return + } + + bts = append(bts, LN) + return +} + +func NewNativeJsonEachRowCodec(inner NativeJsonCodec) NativeJsonEachRowCodec { + return NativeJsonEachRowCodec{NativeJsonCodec: inner} +} diff --git a/content_types.go b/content_types.go index 12b7a06..e549cc5 100644 --- a/content_types.go +++ b/content_types.go @@ -3,7 +3,6 @@ package withttp import ( "github.com/pkg/errors" "github.com/sonirico/withttp/codec" - "strings" ) type ( @@ -11,7 +10,8 @@ type ( ) var ( - ContentTypeJSON = "application/json" + ContentTypeJSON ContentType = "application/json" + ContentTypeJSONEachRow ContentType = "application/jsoneachrow" ) var ( @@ -22,17 +22,12 @@ func (c ContentType) String() string { return string(c) } -func (c ContentType) IsJSON() bool { - lower := strings.ToLower(strings.TrimSpace(c.String())) - hasApp := strings.Contains(lower, "application") - hasJSon := strings.Contains(lower, "json") - return hasApp && hasJSon -} - func (c ContentType) Codec() (codec.Codec, error) { - switch { - case c.IsJSON(): + switch c { + case ContentTypeJSON: return codec.NativeJSONCodec, nil + case ContentTypeJSONEachRow: + return codec.NativeJSONEachRowCodec, nil default: return nil, errors.Wrapf(ErrUnknownContentType, "got: '%s'", c.String()) } diff --git a/endpoint.go b/endpoint.go index 4bab4c4..37f041d 100644 --- a/endpoint.go +++ b/endpoint.go @@ -14,11 +14,11 @@ type ( SetURL(*url.URL) // SetBodyStream sets the stream of body data belonging to a request. bodySize parameter is needed // when using fasthttp implementation. - SetBodyStream(rc io.ReadCloser, bodySize int) + SetBodyStream(rc io.ReadWriteCloser, bodySize int) SetBody([]byte) Body() []byte - BodyStream() io.ReadCloser + BodyStream() io.ReadWriteCloser URL() *url.URL } diff --git a/examples/mock/main.go b/examples/mock/main.go index e8f013d..0f76881 100644 --- a/examples/mock/main.go +++ b/examples/mock/main.go @@ -44,7 +44,7 @@ func main() { override = true return }). - WithJSONEachRowChan(res). + WithParseJSONEachRowChan(res). WithExpectedStatusCodes(http.StatusOK) go func() { diff --git a/examples/request_stream/main.go b/examples/request_stream/main.go new file mode 100644 index 0000000..f6e7f5a --- /dev/null +++ b/examples/request_stream/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/sonirico/withttp" +) + +type metric struct { + Time time.Time `json:"t"` + Temp float32 `json:"T"` +} + +func CreateStream() error { + points := []metric{ + { + Time: time.Unix(time.Now().Unix()-1, 0), + Temp: 39, + }, + { + Time: time.Now(), + Temp: 40, + }, + } + + slice := withttp.Slice[metric](points) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). + WithMethod(http.MethodPost). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, metric](slice), + ). + WithExpectedStatusCodes(http.StatusOK) + + err := call.CallEndpoint(context.Background(), testEndpoint) + + received := call.Req.Body() + + fmt.Printf("recv: '%s'", string(received)) + /* + {"t":"2022-09-01T00:58:15+02:00","T":39} + {"t":"2022-09-01T00:58:16.15846898+02:00","T":40} + */ + return err +} + +func main() { + _ = CreateStream() +} diff --git a/testing.go b/testing.go new file mode 100644 index 0000000..87a8f72 --- /dev/null +++ b/testing.go @@ -0,0 +1,36 @@ +package withttp + +import ( + "github.com/pkg/errors" + "strings" + "testing" +) + +func assertError(t *testing.T, expected, actual error) bool { + t.Helper() + + if actual != nil { + if expected != nil { + if !errors.Is(expected, actual) { + t.Errorf("unexpected error, want %s, have %s", + expected, actual) + return false + } + } else { + t.Errorf("unexpected error, want none, have %s", actual) + return false + } + } else { + if expected != nil { + t.Errorf("unexpected error, want %s, have none", + expected) + return false + } + } + + return true +} + +func streamTextJoin(sep string, items []string) []byte { + return []byte(strings.Join(items, sep)) +} diff --git a/utils_buf.go b/utils_buf.go new file mode 100644 index 0000000..a7d0a0b --- /dev/null +++ b/utils_buf.go @@ -0,0 +1,13 @@ +package withttp + +import "io" + +type ( + closableReaderWriter struct { + io.ReadWriter + } +) + +func (b closableReaderWriter) Close() error { + return nil +} diff --git a/utils_str.go b/utils_str.go index b59edc2..6f33400 100644 --- a/utils_str.go +++ b/utils_str.go @@ -12,3 +12,7 @@ func StrIsset(s string) bool { func BtsIsset(bts []byte) bool { return len(bytes.TrimSpace(bts)) > 0 } + +func BytesEquals(a, b []byte) bool { + return bytes.Compare(bytes.TrimSpace(a), bytes.TrimSpace(b)) == 0 +} diff --git a/with_req.go b/with_req.go index 0977cf0..a14829d 100644 --- a/with_req.go +++ b/with_req.go @@ -1,8 +1,12 @@ package withttp import ( + "bytes" + "context" + "github.com/sonirico/withttp/codec" "io" "net/url" + "sync" ) func WithHeader[T any](k, v string, override bool) CallReqOptionFunc[T] { @@ -79,7 +83,29 @@ func WithBody[T any](payload any) CallReqOptionFunc[T] { } } -func WithBodyStream[T any](rc io.ReadCloser, bodySize int) CallReqOptionFunc[T] { +func WithRequestStreamBody[T, U any](r rangeable[U]) StreamCallReqOptionFunc[T] { + return func(c *Call[T], req Request) error { + c.ReqIsStream = true + + buf := closableReaderWriter{ReadWriter: bytes.NewBuffer(nil)} // TODO: pool buffer + req.SetBodyStream(buf, -1) // TODO: bodySize + + c.ReqStreamWriter = func(ctx context.Context, c *Call[T], req Request, wg *sync.WaitGroup) (err error) { + defer func() { wg.Done() }() + + encoder, err := c.ReqContentType.Codec() + if err != nil { + return err + } + + err = EncodeStream(ctx, r, req, encoder) + return nil + } + return nil + } +} + +func WithBodyStream[T any](rc io.ReadWriteCloser, bodySize int) CallReqOptionFunc[T] { return func(c *Call[T], req Request) (err error) { req.SetBodyStream(rc, bodySize) return nil @@ -87,10 +113,37 @@ func WithBodyStream[T any](rc io.ReadCloser, bodySize int) CallReqOptionFunc[T] } func EncodeBody(payload any, contentType ContentType) (bts []byte, err error) { - codec, err := contentType.Codec() + encoder, err := contentType.Codec() if err != nil { return } - bts, err = codec.Encode(payload) + bts, err = encoder.Encode(payload) + return +} + +func EncodeStream[T any](ctx context.Context, r rangeable[T], req Request, encoder codec.Encoder) (err error) { + stream := req.BodyStream() + + defer func() { _ = stream.Close() }() + + var bts []byte + + r.Range(func(i int, x T) bool { + select { + case <-ctx.Done(): + return false + default: + if bts, err = encoder.Encode(x); err != nil { + return false + } + + if _, err = stream.Write(bts); err != nil { + return false + } + + return true + } + }) + return } diff --git a/with_req_test.go b/with_req_test.go new file mode 100644 index 0000000..3600277 --- /dev/null +++ b/with_req_test.go @@ -0,0 +1,98 @@ +package withttp + +import ( + "bytes" + "context" + "io" + "net/http" + "testing" +) + +func TestCall_StreamingRequestFromSlice(t *testing.T) { + type ( + payload struct { + Name string `json:"name"` + } + + args struct { + Stream []payload + } + + want struct { + expectedErr error + ReceivedPayload []byte + } + + testCase struct { + name string + args args + want want + } + ) + + tests := []testCase{ + { + name: "one element in the stream", + args: args{ + Stream: []payload{ + { + Name: "I am the first payload", + }, + }, + }, + want: want{ + ReceivedPayload: []byte(`{"name":"I am the first payload"}`), + }, + }, + { + name: "several elements in the stream", + args: args{ + Stream: []payload{ + { + Name: "I am the first payload", + }, + { + Name: "I am the second payload", + }, + }, + }, + want: want{ + ReceivedPayload: streamTextJoin("\n", []string{ + `{"name":"I am the first payload"}`, + `{"name":"I am the second payload"}`, + }), + }, + }, + } + + endpoint := NewEndpoint("mock"). + Response(WithMockedRes(func(res Response) { + res.SetStatus(http.StatusOK) + res.SetBody(io.NopCloser(bytes.NewReader(nil))) + })) + + for _, test := range tests { + + t.Run(test.name, func(t *testing.T) { + + call := NewCall[any](NewMockHttpClientAdapter()). + WithContentType(ContentTypeJSONEachRow). + WithRequestStreamBody( + WithRequestStreamBody[any, payload](Slice[payload](test.args.Stream)), + ). + WithExpectedStatusCodes(http.StatusOK) + + err := call.CallEndpoint(context.TODO(), endpoint) + + if !assertError(t, test.want.expectedErr, err) { + t.FailNow() + } + actualReceivedBody := call.Req.Body() + + if !BytesEquals(test.want.ReceivedPayload, actualReceivedBody) { + t.Errorf("unexpected received payload\nwant '%s'\nhave '%s'", + string(test.want.ReceivedPayload), string(actualReceivedBody)) + } + }) + } +} diff --git a/with_res.go b/with_res.go index 5fdeb50..07f5926 100644 --- a/with_res.go +++ b/with_res.go @@ -32,20 +32,20 @@ func WithParseBodyRaw[T any]() CallResOptionFunc[T] { } } -func WithJSON[T any]() CallResOptionFunc[T] { +func WithParseJSON[T any]() CallResOptionFunc[T] { return func(c *Call[T], res Response) (err error) { c.BodyParsed, err = ReadJSON[T](c.bodyReader(res)) return } } -func WithStream[T any](factory StreamFactory[T], fn func(T) bool) CallResOptionFunc[T] { +func WithParseStream[T any](factory StreamFactory[T], fn func(T) bool) CallResOptionFunc[T] { return func(c *Call[T], res Response) (err error) { return ReadStream[T](c.bodyReader(res), factory, fn) } } -func WithStreamChan[T any](factory StreamFactory[T], out chan<- T) CallResOptionFunc[T] { +func WithParseStreamChan[T any](factory StreamFactory[T], out chan<- T) CallResOptionFunc[T] { return func(c *Call[T], res Response) (err error) { return ReadStreamChan(c.bodyReader(res), factory, out) }