Skip to content

Commit

Permalink
Merge pull request #2 from sonirico/feat/stream-requests-reader-channel
Browse files Browse the repository at this point in the history
feat: advanced streaming requests
  • Loading branch information
sonirico authored Sep 1, 2022
2 parents de788da + 2fc2d28 commit 5eb1fc5
Show file tree
Hide file tree
Showing 16 changed files with 480 additions and 121 deletions.
5 changes: 5 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
root = true

[*]

tab_width = 2
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ build:
.PHONY: format
format:
$(info: Make: Format)
gofmt -w ./**/*
goimports -w ./**/*
golines -w ./**/*
gofmt -w ./**/*.go
gofmt -w ./*.go
goimports -w ./**/*.go
goimports -w ./*.go
golines -w ./**/*.go
golines -w ./*.go

.PHONY: lint
lint:
Expand Down
97 changes: 86 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
}
```

#### Stream data to server
#### Stream data to server (from a slice)

[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go)

Expand All @@ -62,7 +62,7 @@ func CreateStream() error {
},
}

slice := withttp.Slice[metric](points)
stream := withttp.Slice[metric](points)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
Expand All @@ -72,21 +72,96 @@ func CreateStream() error {
call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()).
WithMethod(http.MethodPost).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, metric](slice),
withttp.WithRequestStreamBody[any, metric](stream),
).
WithExpectedStatusCodes(http.StatusOK)

err := call.CallEndpoint(context.Background(), testEndpoint)
return call.CallEndpoint(context.Background(), testEndpoint)
}
```

#### Stream data to server (from a channel)

[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go)

```go
func CreateStreamChannel() error {
points := make(chan metric, 2)

received := call.Req.Body()
go func() {
points <- metric{
Time: time.Unix(time.Now().Unix()-1, 0),
Temp: 39,
}

points <- metric{
Time: time.Now(),
Temp: 40,
}

close(points)
}()

stream := withttp.Channel[metric](points)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
)

call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()).
WithMethod(http.MethodPost).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, metric](stream),
).
WithExpectedStatusCodes(http.StatusOK)

return call.CallEndpoint(context.Background(), testEndpoint)
}
```

#### Stream data to server (from a reader)

[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go)

```go
func CreateStreamReader() error {
buf := bytes.NewBuffer(nil)

go func() {
buf.WriteString("{\"t\":\"2022-09-01T00:58:15+02:00\"")
buf.WriteString(",\"T\":39}\n{\"t\":\"2022-09-01T00:59:15+02:00\",\"T\":40}\n")
}()

streamFactory := withttp.NewProxyStreamFactory(1 << 10)

stream := withttp.NewStreamFromReader(buf, streamFactory)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
)

call := withttp.NewCall[any](withttp.NewDefaultNativeHttpClientAdapter()).
WithMethod(http.MethodPost).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, []byte](stream),
).
WithExpectedStatusCodes(http.StatusOK)

fmt.Printf("recv: '%s'", string(received))
/*
{"t":"2022-09-01T00:58:15+02:00","T":39}
{"t":"2022-09-01T00:58:16.15846898+02:00","T":40}
*/
return err
return call.CallEndpoint(context.Background(), testEndpoint)
}
```

Expand Down
8 changes: 5 additions & 3 deletions adapter_fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package withttp
import (
"bytes"
"context"
"github.com/valyala/fasthttp"
"io"
"net/http"
"net/url"

"github.com/valyala/fasthttp"
)

type (
Expand Down Expand Up @@ -69,8 +70,9 @@ func (a *fastHttpReqAdapter) SetBody(body []byte) {
a.req.SetBody(body)
}

func (a *fastHttpReqAdapter) Body() []byte {
return a.req.Body()
func (a *fastHttpReqAdapter) Body() (bts []byte) {
bts, _ = io.ReadAll(a.stream)
return
}

func (a *fastHttpReqAdapter) BodyStream() io.ReadWriteCloser {
Expand Down
4 changes: 3 additions & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type (
ReqBodyRaw []byte
ReqIsStream bool

ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error
ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error
ReqStreamSniffer func([]byte, error)
ReqShouldSniff bool
}
)

Expand Down
16 changes: 2 additions & 14 deletions call_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,8 @@ func (c *Call[T]) WithMethod(method string) *Call[T] {
return c.withReq(WithMethod[T](method))
}

type (
rangeable[T any] interface {
Range(func(int, T) bool)
}

Slice[T any] []T
)

func (s Slice[T]) Range(fn func(int, T) bool) {
for i, x := range s {
if !fn(i, x) {
return
}
}
func (c *Call[T]) WithRequestSniffed(fn func([]byte, error)) *Call[T] {
return c.withReq(WithRequestSniffer[T](fn))
}

func (c *Call[T]) WithRequestStreamBody(opt StreamCallReqOptionFunc[T]) *Call[T] {
Expand Down
7 changes: 7 additions & 0 deletions codec/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package codec

import "github.com/pkg/errors"

var (
NativeJSONCodec = NewNativeJsonCodec()
NativeJSONEachRowCodec = NewNativeJsonEachRowCodec(NativeJSONCodec)
ProxyBytesEncoder = ProxyBytesCodec{}
)

var (
ErrTypeAssertion = errors.New("unexpected type")
)
5 changes: 5 additions & 0 deletions codec/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ type (
Encoder
Decoder
}

NoopCodec struct{}
)

func (c NoopCodec) Encode(_ any) (bts []byte, err error) { return }
func (c NoopCodec) Decode(_ []byte, _ any) (err error) { return }
20 changes: 20 additions & 0 deletions codec/proxy_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package codec

import "github.com/pkg/errors"

type (
ProxyBytesCodec struct{}
)

func (e ProxyBytesCodec) Encode(x any) ([]byte, error) {
bts, ok := x.([]byte)
if !ok {
return nil, errors.Wrapf(ErrTypeAssertion, "want '[]byte', have %T", x)
}

return bts, nil
}

func (e ProxyBytesCodec) Decode(_ []byte, _ any) error {
panic("not implemented")
}
86 changes: 76 additions & 10 deletions examples/request_stream/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"context"
"fmt"
"net/http"
Expand All @@ -26,7 +27,7 @@ func CreateStream() error {
},
}

slice := withttp.Slice[metric](points)
stream := withttp.Slice[metric](points)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
Expand All @@ -36,23 +37,88 @@ func CreateStream() error {
call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()).
WithMethod(http.MethodPost).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, metric](slice),
withttp.WithRequestStreamBody[any, metric](stream),
).
WithExpectedStatusCodes(http.StatusOK)

err := call.CallEndpoint(context.Background(), testEndpoint)
return call.CallEndpoint(context.Background(), testEndpoint)
}

func CreateStreamChannel() error {
points := make(chan metric, 2)

go func() {
points <- metric{
Time: time.Unix(time.Now().Unix()-1, 0),
Temp: 39,
}

points <- metric{
Time: time.Now(),
Temp: 40,
}

close(points)
}()

stream := withttp.Channel[metric](points)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
)

call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()).
WithMethod(http.MethodPost).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, metric](stream),
).
WithExpectedStatusCodes(http.StatusOK)

return call.CallEndpoint(context.Background(), testEndpoint)
}

received := call.Req.Body()
func CreateStreamReader() error {
buf := bytes.NewBuffer(nil)

go func() {
buf.WriteString("{\"t\":\"2022-09-01T00:58:15+02:00\"")
buf.WriteString(",\"T\":39}\n{\"t\":\"2022-09-01T00:59:15+02:00\",\"T\":40}\n")
}()

streamFactory := withttp.NewProxyStreamFactory(1 << 10)

stream := withttp.NewStreamFromReader(buf, streamFactory)

testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
Request(
withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
)

call := withttp.NewCall[any](withttp.NewDefaultNativeHttpClientAdapter()).
WithMethod(http.MethodPost).
WithRequestSniffed(func(data []byte, err error) {
fmt.Printf("recv: '%s', err: %v", string(data), err)
}).
WithContentType(withttp.ContentTypeJSONEachRow).
WithRequestStreamBody(
withttp.WithRequestStreamBody[any, []byte](stream),
).
WithExpectedStatusCodes(http.StatusOK)

fmt.Printf("recv: '%s'", string(received))
/*
{"t":"2022-09-01T00:58:15+02:00","T":39}
{"t":"2022-09-01T00:58:16.15846898+02:00","T":40}
*/
return err
return call.CallEndpoint(context.Background(), testEndpoint)
}

func main() {
_ = CreateStream()
_ = CreateStreamChannel()
_ = CreateStreamReader()
}
Loading

0 comments on commit 5eb1fc5

Please sign in to comment.