Skip to content

Commit

Permalink
feat: streaming requests (slices only)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonirico committed Aug 31, 2022
1 parent ac994da commit 0e571de
Show file tree
Hide file tree
Showing 19 changed files with 544 additions and 123 deletions.
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,58 @@ func main() {
}
```

#### Stream data to server

[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go)

```go
type metric struct {
Time time.Time `json:"t"`
Temp float32 `json:"T"`
}

func CreateStream() error {
points := []metric{
{
Time: time.Unix(time.Now().Unix()-1, 0),
Temp: 39,
},
{
Time: time.Now(),
Temp: 40,
},
}

slice := withttp.Slice[metric](points)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
)

call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()).
WithMethod(http.MethodPost).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, metric](slice),
).
WithExpectedStatusCodes(http.StatusOK)

err := call.CallEndpoint(context.Background(), testEndpoint)

received := call.Req.Body()

fmt.Printf("recv: '%s'", string(received))
/*
{"t":"2022-09-01T00:58:15+02:00","T":39}
{"t":"2022-09-01T00:58:16.15846898+02:00","T":40}
*/
return err
}
```

#### Several endpoints

In case of a wide range catalog of endpoints, predefined parameters and behaviours can be
defined by employing an endpoint definition.

Expand Down Expand Up @@ -170,3 +222,7 @@ func main() {
}
}
```

### Caveats:

- Fasthttp request streams are not supported as of now.
14 changes: 8 additions & 6 deletions adapter_fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

type (
fastHttpReqAdapter struct {
stream io.ReadWriteCloser

req *fasthttp.Request
}

Expand Down Expand Up @@ -58,9 +60,9 @@ func (a *fastHttpReqAdapter) SetURL(u *url.URL) {
}
}

func (a *fastHttpReqAdapter) SetBodyStream(body io.ReadCloser, bodySize int) {
// TODO: Caveat. Content type may be unknown at the time of setting streams.
a.req.SetBodyStream(body, bodySize)
func (a *fastHttpReqAdapter) SetBodyStream(body io.ReadWriteCloser, bodySize int) {
a.stream = body
a.req.SetBodyStream(a.stream, bodySize)
}

func (a *fastHttpReqAdapter) SetBody(body []byte) {
Expand All @@ -71,8 +73,8 @@ func (a *fastHttpReqAdapter) Body() []byte {
return a.req.Body()
}

func (a *fastHttpReqAdapter) BodyStream() io.ReadCloser {
return io.NopCloser(bytes.NewReader(a.req.Body()))
func (a *fastHttpReqAdapter) BodyStream() io.ReadWriteCloser {
return a.stream
}

func (a *fastHttpReqAdapter) URL() *url.URL {
Expand Down Expand Up @@ -125,7 +127,7 @@ func adaptResFastHttp(res *fasthttp.Response) Response {
}

func (a *fastHttpResAdapter) SetBody(body io.ReadCloser) {
a.res.SetBodyStream(body, 0)
a.res.SetBodyStream(body, -1)
}

func (a *fastHttpResAdapter) Status() int {
Expand Down
2 changes: 1 addition & 1 deletion adapter_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (a *MockHttpClientAdapter) Request() (Request, error) {
return adaptReqMock(req), err
}

func (a *MockHttpClientAdapter) Do(ctx context.Context, req Request) (Response, error) {
func (a *MockHttpClientAdapter) Do(_ context.Context, _ Request) (Response, error) {
return adaptResMock(&http.Response{}), nil
}

Expand Down
12 changes: 8 additions & 4 deletions adapter_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

type (
nativeReqAdapter struct {
body io.ReadWriteCloser

req *http.Request
}

Expand Down Expand Up @@ -38,22 +40,24 @@ func (a *nativeReqAdapter) SetURL(u *url.URL) {
a.req.URL = u
}

func (a *nativeReqAdapter) SetBodyStream(body io.ReadCloser, _ int) {
func (a *nativeReqAdapter) SetBodyStream(body io.ReadWriteCloser, _ int) {
a.body = body
a.req.Body = body
}

func (a *nativeReqAdapter) SetBody(payload []byte) {
// TODO: pool these readers
a.req.Body = io.NopCloser(bytes.NewReader(payload))
a.body = closableReaderWriter{ReadWriter: bytes.NewBuffer(payload)}
a.req.Body = a.body
}

func (a *nativeReqAdapter) Body() []byte {
bts, _ := io.ReadAll(a.req.Body)
return bts
}

func (a *nativeReqAdapter) BodyStream() io.ReadCloser {
return a.req.Body
func (a *nativeReqAdapter) BodyStream() io.ReadWriteCloser {
return a.body
}

func (a *nativeReqAdapter) URL() *url.URL {
Expand Down
127 changes: 36 additions & 91 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"bytes"
"context"
"io"
"strconv"
"sync"
)

type (
CallResOption[T any] interface {
Parse(c *Call[T], r Response) error
}

CalReqOption[T any] interface {
CallReqOption[T any] interface {
Configure(c *Call[T], r Request) error
}

Expand All @@ -34,6 +34,9 @@ type (

ReqContentType ContentType
ReqBodyRaw []byte
ReqIsStream bool

ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error
}
)

Expand Down Expand Up @@ -68,7 +71,7 @@ func (c *Call[T]) withRes(fn CallResOption[T]) *Call[T] {
return c
}

func (c *Call[T]) withReq(fn CallReqOptionFunc[T]) *Call[T] {
func (c *Call[T]) withReq(fn CallReqOption[T]) *Call[T] {
c.reqOptions = append(
c.reqOptions,
ReqOptionFunc(func(req Request) error {
Expand Down Expand Up @@ -96,16 +99,6 @@ func (c *Call[T]) configureReq(req Request) error {
return nil
}

func (c *Call[T]) Request(opts ...ReqOption) *Call[T] {
c.reqOptions = append(c.reqOptions, opts...)
return c
}

func (c *Call[T]) Response(opts ...ResOption) *Call[T] {
c.resOptions = append(c.resOptions, opts...)
return c
}

func (c *Call[T]) Call(ctx context.Context) (err error) {
req, err := c.client.Request()
defer func() { c.Req = req }()
Expand All @@ -118,8 +111,23 @@ func (c *Call[T]) Call(ctx context.Context) (err error) {
return
}

var wg *sync.WaitGroup

if c.ReqIsStream {
wg = &sync.WaitGroup{}
wg.Add(1)

go func() {
_ = c.ReqStreamWriter(ctx, c, req, wg)
}()
}

res, err := c.client.Do(ctx, req)

if c.ReqIsStream {
wg.Wait()
}

if err != nil {
return
}
Expand Down Expand Up @@ -151,8 +159,23 @@ func (c *Call[T]) CallEndpoint(ctx context.Context, e *Endpoint) (err error) {
return
}

var wg *sync.WaitGroup

if c.ReqIsStream {
wg = &sync.WaitGroup{}
wg.Add(1)

go func() {
_ = c.ReqStreamWriter(ctx, c, req, wg)
}()
}

res, err := c.client.Do(ctx, req)

if c.ReqIsStream {
wg.Wait()
}

if err != nil {
return
}
Expand All @@ -171,81 +194,3 @@ func (c *Call[T]) CallEndpoint(ctx context.Context, e *Endpoint) (err error) {

return
}

func (c *Call[T]) WithURL(raw string) *Call[T] {
return c.withReq(WithURL[T](raw))
}

func (c *Call[T]) WithURI(raw string) *Call[T] {
return c.withReq(WithURI[T](raw))
}

func (c *Call[T]) WithMethod(method string) *Call[T] {
return c.withReq(WithMethod[T](method))
}

// WithBodyStream receives a stream of data to set on the request. Second parameter `bodySize` indicates
// the estimated content-length of this stream. Required when employing fasthttp http client.
func (c *Call[T]) WithBodyStream(rc io.ReadCloser, bodySize int) *Call[T] {
return c.withReq(WithBodyStream[T](rc, bodySize))
}

func (c *Call[T]) WithBody(payload any) *Call[T] {
return c.withReq(WithBody[T](payload))
}

func (c *Call[T]) WithRawBody(payload []byte) *Call[T] {
return c.withReq(WithRawBody[T](payload))
}

func (c *Call[T]) WithContentLength(length int) *Call[T] {
return c.WithHeader("content-length", strconv.FormatInt(int64(length), 10), true)
}

func (c *Call[T]) WithHeader(key, value string, override bool) *Call[T] {
return c.withReq(WithHeader[T](key, value, override))
}

func (c *Call[T]) WithHeaderFunc(fn func() (key, value string, override bool)) *Call[T] {
return c.withReq(WithHeaderFunc[T](fn))
}

func (c *Call[T]) WithContentType(ct ContentType) *Call[T] {
return c.withReq(WithContentType[T](ct))
}

func (c *Call[T]) WithReadBody() *Call[T] {
return c.withRes(WithParseBodyRaw[T]())
}

func (c *Call[T]) WithStreamChan(factory StreamFactory[T], ch chan<- T) *Call[T] {
return c.withRes(WithStreamChan[T](factory, ch))
}

func (c *Call[T]) WithStream(factory StreamFactory[T], fn func(T) bool) *Call[T] {
return c.withRes(WithStream[T](factory, fn))
}

func (c *Call[T]) WithJSONEachRowChan(out chan<- T) *Call[T] {
return c.WithStreamChan(NewJSONEachRowStreamFactory[T](), out)
}

func (c *Call[T]) WithJSONEachRow(fn func(T) bool) *Call[T] {
return c.WithStream(NewJSONEachRowStreamFactory[T](), fn)
}

func (c *Call[T]) WithIgnoreBody() *Call[T] {
return c.withRes(WithIgnoredBody[T]())
}

func (c *Call[T]) WithParseJSON() *Call[T] {
return c.withRes(WithJSON[T]())
}

func (c *Call[T]) WithAssert(fn func(req Response) error) *Call[T] {
return c.withRes(WithAssertion[T](fn))
}

func (c *Call[T]) WithExpectedStatusCodes(states ...int) *Call[T] {
return c.withRes(WithExpectedStatusCodes[T](states...))
}
Loading

0 comments on commit 0e571de

Please sign in to comment.