diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..c0ea65c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,5 @@ +root = true + +[*] + +tab_width = 2 \ No newline at end of file diff --git a/Makefile b/Makefile index ae0b825..e4ffd39 100644 --- a/Makefile +++ b/Makefile @@ -48,9 +48,12 @@ build: .PHONY: format format: $(info: Make: Format) - gofmt -w ./**/* - goimports -w ./**/* - golines -w ./**/* + gofmt -w ./**/*.go + gofmt -w ./*.go + goimports -w ./**/*.go + goimports -w ./*.go + golines -w ./**/*.go + golines -w ./*.go .PHONY: lint lint: diff --git a/README.md b/README.md index a2a3d47..2f40f3f 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ func main() { } ``` -#### Stream data to server +#### Stream data to server (from a slice) [See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go) @@ -62,7 +62,7 @@ func CreateStream() error { }, } - slice := withttp.Slice[metric](points) + stream := withttp.Slice[metric](points) testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). Request( @@ -72,21 +72,96 @@ func CreateStream() error { call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). WithMethod(http.MethodPost). WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). WithRequestStreamBody( - withttp.WithRequestStreamBody[any, metric](slice), + withttp.WithRequestStreamBody[any, metric](stream), ). WithExpectedStatusCodes(http.StatusOK) - err := call.CallEndpoint(context.Background(), testEndpoint) + return call.CallEndpoint(context.Background(), testEndpoint) +} +``` + +#### Stream data to server (from a channel) + +[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go) + +```go +func CreateStreamChannel() error { + points := make(chan metric, 2) - received := call.Req.Body() + go func() { + points <- metric{ + Time: time.Unix(time.Now().Unix()-1, 0), + Temp: 39, + } + + points <- metric{ + Time: time.Now(), + Temp: 40, + } + + close(points) + }() + + stream := withttp.Channel[metric](points) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). + WithMethod(http.MethodPost). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, metric](stream), + ). + WithExpectedStatusCodes(http.StatusOK) + + return call.CallEndpoint(context.Background(), testEndpoint) +} +``` + +#### Stream data to server (from a reader) + +[See full example](https://github.com/sonirico/withttp/blob/main/examples/request_stream/main.go) + +```go +func CreateStreamReader() error { + buf := bytes.NewBuffer(nil) + + go func() { + buf.WriteString("{\"t\":\"2022-09-01T00:58:15+02:00\"") + buf.WriteString(",\"T\":39}\n{\"t\":\"2022-09-01T00:59:15+02:00\",\"T\":40}\n") + }() + + streamFactory := withttp.NewProxyStreamFactory(1 << 10) + + stream := withttp.NewStreamFromReader(buf, streamFactory) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultNativeHttpClientAdapter()). + WithMethod(http.MethodPost). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, []byte](stream), + ). + WithExpectedStatusCodes(http.StatusOK) - fmt.Printf("recv: '%s'", string(received)) - /* - {"t":"2022-09-01T00:58:15+02:00","T":39} - {"t":"2022-09-01T00:58:16.15846898+02:00","T":40} - */ - return err + return call.CallEndpoint(context.Background(), testEndpoint) } ``` diff --git a/adapter_fasthttp.go b/adapter_fasthttp.go index 2e32147..b17c776 100644 --- a/adapter_fasthttp.go +++ b/adapter_fasthttp.go @@ -3,10 +3,11 @@ package withttp import ( "bytes" "context" - "github.com/valyala/fasthttp" "io" "net/http" "net/url" + + "github.com/valyala/fasthttp" ) type ( @@ -69,8 +70,9 @@ func (a *fastHttpReqAdapter) SetBody(body []byte) { a.req.SetBody(body) } -func (a *fastHttpReqAdapter) Body() []byte { - return a.req.Body() +func (a *fastHttpReqAdapter) Body() (bts []byte) { + bts, _ = io.ReadAll(a.stream) + return } func (a *fastHttpReqAdapter) BodyStream() io.ReadWriteCloser { diff --git a/call.go b/call.go index 8314bf7..901bfaa 100644 --- a/call.go +++ b/call.go @@ -36,7 +36,9 @@ type ( ReqBodyRaw []byte ReqIsStream bool - ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error + ReqStreamWriter func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error + ReqStreamSniffer func([]byte, error) + ReqShouldSniff bool } ) diff --git a/call_req.go b/call_req.go index 3c31164..e4df1c8 100644 --- a/call_req.go +++ b/call_req.go @@ -38,20 +38,8 @@ func (c *Call[T]) WithMethod(method string) *Call[T] { return c.withReq(WithMethod[T](method)) } -type ( - rangeable[T any] interface { - Range(func(int, T) bool) - } - - Slice[T any] []T -) - -func (s Slice[T]) Range(fn func(int, T) bool) { - for i, x := range s { - if !fn(i, x) { - return - } - } +func (c *Call[T]) WithRequestSniffed(fn func([]byte, error)) *Call[T] { + return c.withReq(WithRequestSniffer[T](fn)) } func (c *Call[T]) WithRequestStreamBody(opt StreamCallReqOptionFunc[T]) *Call[T] { diff --git a/codec/constants.go b/codec/constants.go index 7540d56..a2210b0 100644 --- a/codec/constants.go +++ b/codec/constants.go @@ -1,6 +1,13 @@ package codec +import "github.com/pkg/errors" + var ( NativeJSONCodec = NewNativeJsonCodec() NativeJSONEachRowCodec = NewNativeJsonEachRowCodec(NativeJSONCodec) + ProxyBytesEncoder = ProxyBytesCodec{} +) + +var ( + ErrTypeAssertion = errors.New("unexpected type") ) diff --git a/codec/noop.go b/codec/noop.go index 6f8cd5a..93372da 100644 --- a/codec/noop.go +++ b/codec/noop.go @@ -13,4 +13,9 @@ type ( Encoder Decoder } + + NoopCodec struct{} ) + +func (c NoopCodec) Encode(_ any) (bts []byte, err error) { return } +func (c NoopCodec) Decode(_ []byte, _ any) (err error) { return } diff --git a/codec/proxy_bytes.go b/codec/proxy_bytes.go new file mode 100644 index 0000000..255186b --- /dev/null +++ b/codec/proxy_bytes.go @@ -0,0 +1,20 @@ +package codec + +import "github.com/pkg/errors" + +type ( + ProxyBytesCodec struct{} +) + +func (e ProxyBytesCodec) Encode(x any) ([]byte, error) { + bts, ok := x.([]byte) + if !ok { + return nil, errors.Wrapf(ErrTypeAssertion, "want '[]byte', have %T", x) + } + + return bts, nil +} + +func (e ProxyBytesCodec) Decode(_ []byte, _ any) error { + panic("not implemented") +} diff --git a/examples/request_stream/main.go b/examples/request_stream/main.go index f6e7f5a..4da98f9 100644 --- a/examples/request_stream/main.go +++ b/examples/request_stream/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "fmt" "net/http" @@ -26,7 +27,7 @@ func CreateStream() error { }, } - slice := withttp.Slice[metric](points) + stream := withttp.Slice[metric](points) testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). Request( @@ -36,23 +37,88 @@ func CreateStream() error { call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). WithMethod(http.MethodPost). WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). WithRequestStreamBody( - withttp.WithRequestStreamBody[any, metric](slice), + withttp.WithRequestStreamBody[any, metric](stream), ). WithExpectedStatusCodes(http.StatusOK) - err := call.CallEndpoint(context.Background(), testEndpoint) + return call.CallEndpoint(context.Background(), testEndpoint) +} + +func CreateStreamChannel() error { + points := make(chan metric, 2) + + go func() { + points <- metric{ + Time: time.Unix(time.Now().Unix()-1, 0), + Temp: 39, + } + + points <- metric{ + Time: time.Now(), + Temp: 40, + } + + close(points) + }() + + stream := withttp.Channel[metric](points) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultFastHttpHttpClientAdapter()). + WithMethod(http.MethodPost). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, metric](stream), + ). + WithExpectedStatusCodes(http.StatusOK) + + return call.CallEndpoint(context.Background(), testEndpoint) +} - received := call.Req.Body() +func CreateStreamReader() error { + buf := bytes.NewBuffer(nil) + + go func() { + buf.WriteString("{\"t\":\"2022-09-01T00:58:15+02:00\"") + buf.WriteString(",\"T\":39}\n{\"t\":\"2022-09-01T00:59:15+02:00\",\"T\":40}\n") + }() + + streamFactory := withttp.NewProxyStreamFactory(1 << 10) + + stream := withttp.NewStreamFromReader(buf, streamFactory) + + testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example"). + Request( + withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"), + ) + + call := withttp.NewCall[any](withttp.NewDefaultNativeHttpClientAdapter()). + WithMethod(http.MethodPost). + WithRequestSniffed(func(data []byte, err error) { + fmt.Printf("recv: '%s', err: %v", string(data), err) + }). + WithContentType(withttp.ContentTypeJSONEachRow). + WithRequestStreamBody( + withttp.WithRequestStreamBody[any, []byte](stream), + ). + WithExpectedStatusCodes(http.StatusOK) - fmt.Printf("recv: '%s'", string(received)) - /* - {"t":"2022-09-01T00:58:15+02:00","T":39} - {"t":"2022-09-01T00:58:16.15846898+02:00","T":40} - */ - return err + return call.CallEndpoint(context.Background(), testEndpoint) } func main() { _ = CreateStream() + _ = CreateStreamChannel() + _ = CreateStreamReader() } diff --git a/stream.go b/stream.go deleted file mode 100644 index 78b3b5d..0000000 --- a/stream.go +++ /dev/null @@ -1,70 +0,0 @@ -package withttp - -import ( - "bufio" - "context" - "encoding/json" - "io" -) - -type ( - Stream[T any] interface { - Next(ctx context.Context) bool - Data() T - Err() error - } - - StreamFactory[T any] interface { - Get(r io.Reader) Stream[T] - } - - StreamFactoryFunc[T any] func(reader io.Reader) Stream[T] -) - -func (f StreamFactoryFunc[T]) Get(r io.Reader) Stream[T] { - return f(r) -} - -type ( - JSONEachRowStream[T any] struct { - current T - - scanner *bufio.Scanner - - err error - } -) - -func (s *JSONEachRowStream[T]) Next(_ context.Context) bool { - if !s.scanner.Scan() { - return false - } - - s.err = json.Unmarshal(s.scanner.Bytes(), &s.current) - - return true -} - -func (s *JSONEachRowStream[T]) Data() T { - return s.current -} - -func (s *JSONEachRowStream[T]) Err() error { - if s.err != nil { - return s.err - } - - return s.scanner.Err() -} - -func NewJSONEachRowStream[T any](r io.Reader) Stream[T] { - return &JSONEachRowStream[T]{ - scanner: bufio.NewScanner(r), - } -} - -func NewJSONEachRowStreamFactory[T any]() StreamFactory[T] { - return StreamFactoryFunc[T](func(r io.Reader) Stream[T] { - return NewJSONEachRowStream[T](r) - }) -} diff --git a/stream_factory.go b/stream_factory.go new file mode 100644 index 0000000..4c239b1 --- /dev/null +++ b/stream_factory.go @@ -0,0 +1,149 @@ +package withttp + +import ( + "bufio" + "context" + "encoding/json" + "io" +) + +type ( + Stream[T any] interface { + Next(ctx context.Context) bool + Data() T + Err() error + } + + StreamFactory[T any] interface { + Get(r io.Reader) Stream[T] + } + + StreamFactoryFunc[T any] func(reader io.Reader) Stream[T] +) + +func (f StreamFactoryFunc[T]) Get(r io.Reader) Stream[T] { + return f(r) +} + +type ( + JSONEachRowStream[T any] struct { + current T + + inner Stream[[]byte] + + err error + } + + NewLineStream struct { + current []byte + + scanner *bufio.Scanner + + err error + } + + ProxyStream struct { + err error + current []byte + buffer []byte + reader io.Reader + } +) + +func (s *ProxyStream) Next(_ context.Context) bool { + s.err = nil + bts := s.buffer[:cap(s.buffer)] + read, err := s.reader.Read(bts) + + if err != nil || read == 0 { + s.err = err + return false + } + + s.current = make([]byte, read) + copy(s.current, bts) + return true +} + +func (s *ProxyStream) Data() []byte { + return s.current +} + +func (s *ProxyStream) Err() error { + return s.err +} + +func (s *NewLineStream) Next(_ context.Context) bool { + if !s.scanner.Scan() { + s.err = s.scanner.Err() + return false + } + s.current = s.scanner.Bytes() + return true +} + +func (s *NewLineStream) Data() []byte { + return s.current +} + +func (s *NewLineStream) Err() error { + if s.err != nil { + return s.err + } + + return s.scanner.Err() +} + +func (s *JSONEachRowStream[T]) Next(ctx context.Context) bool { + if !s.inner.Next(ctx) { + return false + } + + s.err = json.Unmarshal(s.inner.Data(), &s.current) + + return true +} + +func (s *JSONEachRowStream[T]) Data() T { + return s.current +} + +func (s *JSONEachRowStream[T]) Err() error { + if s.err != nil { + return s.err + } + + return s.inner.Err() +} + +func NewNewLineStream(r io.Reader) Stream[[]byte] { + return &NewLineStream{scanner: bufio.NewScanner(r)} +} + +func NewNewLineStreamFactory() StreamFactory[[]byte] { + return StreamFactoryFunc[[]byte](func(r io.Reader) Stream[[]byte] { + return NewNewLineStream(r) + }) +} + +func NewProxyStream(r io.Reader, bufferSize int) Stream[[]byte] { + return &ProxyStream{reader: r, buffer: make([]byte, bufferSize)} +} + +func NewProxyStreamFactory(bufferSize int) StreamFactory[[]byte] { + return StreamFactoryFunc[[]byte](func(r io.Reader) Stream[[]byte] { + return NewProxyStream(r, bufferSize) + }) +} + +func NewJSONEachRowStream[T any](r io.Reader) Stream[T] { + return &JSONEachRowStream[T]{ + inner: NewNewLineStream(r), + } +} + +func NewJSONEachRowStreamFactory[T any]() StreamFactory[T] { + return StreamFactoryFunc[T](func(r io.Reader) Stream[T] { + return NewJSONEachRowStream[T](r) + }) +} diff --git a/streams.go b/streams.go new file mode 100644 index 0000000..a934e5b --- /dev/null +++ b/streams.go @@ -0,0 +1,70 @@ +package withttp + +import ( + "context" + "io" +) + +type ( + rangeable[T any] interface { + Range(func(int, T) bool) + Serialize() bool + } + + Slice[T any] []T + + Channel[T any] chan T + + StreamFromReader struct { + io.Reader + streamFactory StreamFactory[[]byte] + } +) + +func NewStreamFromReader(r io.Reader, sf StreamFactory[[]byte]) StreamFromReader { + return StreamFromReader{ + Reader: r, + streamFactory: sf, + } +} + +func (s Slice[T]) Range(fn func(int, T) bool) { + for i, x := range s { + if !fn(i, x) { + return + } + } +} + +func (s Slice[T]) Serialize() bool { return true } + +func (c Channel[T]) Range(fn func(int, T) bool) { + i := 0 + for { + x, ok := <-c + if !ok { + return + } + + fn(i, x) + + i++ + } +} + +func (c Channel[T]) Serialize() bool { return true } + +func (r StreamFromReader) Range(fn func(int, []byte) bool) { + stream := r.streamFactory.Get(r) + i := 0 + for stream.Next(context.TODO()) { + if stream.Err() != nil { + return + } + + fn(i, stream.Data()) + i++ + } +} + +func (r StreamFromReader) Serialize() bool { return false } diff --git a/testing.go b/testing.go index 87a8f72..dfbe195 100644 --- a/testing.go +++ b/testing.go @@ -1,9 +1,10 @@ package withttp import ( - "github.com/pkg/errors" "strings" "testing" + + "github.com/pkg/errors" ) func assertError(t *testing.T, expected, actual error) bool { diff --git a/with_req.go b/with_req.go index a14829d..294b70f 100644 --- a/with_req.go +++ b/with_req.go @@ -3,10 +3,11 @@ package withttp import ( "bytes" "context" - "github.com/sonirico/withttp/codec" "io" "net/url" "sync" + + "github.com/sonirico/withttp/codec" ) func WithHeader[T any](k, v string, override bool) CallReqOptionFunc[T] { @@ -83,6 +84,14 @@ func WithBody[T any](payload any) CallReqOptionFunc[T] { } } +func WithRequestSniffer[T any](fn func([]byte, error)) CallReqOptionFunc[T] { + return func(c *Call[T], req Request) error { + c.ReqShouldSniff = true + c.ReqStreamSniffer = fn + return nil + } +} + func WithRequestStreamBody[T, U any](r rangeable[U]) StreamCallReqOptionFunc[T] { return func(c *Call[T], req Request) error { c.ReqIsStream = true @@ -93,13 +102,28 @@ func WithRequestStreamBody[T, U any](r rangeable[U]) StreamCallReqOptionFunc[T] c.ReqStreamWriter = func(ctx context.Context, c *Call[T], req Request, wg *sync.WaitGroup) (err error) { defer func() { wg.Done() }() - encoder, err := c.ReqContentType.Codec() - if err != nil { - return err + var encoder codec.Encoder + if r.Serialize() { + encoder, err = c.ReqContentType.Codec() + + if err != nil { + return + } + } else { + encoder = codec.ProxyBytesEncoder + } + + var sniffer func([]byte, error) + + if c.ReqShouldSniff { + sniffer = c.ReqStreamSniffer + } else { + sniffer = func(_ []byte, _ error) {} } - err = EncodeStream(ctx, r, req, encoder) - return nil + err = EncodeStream(ctx, r, req, encoder, sniffer) + + return } return nil } @@ -121,7 +145,14 @@ func EncodeBody(payload any, contentType ContentType) (bts []byte, err error) { return } -func EncodeStream[T any](ctx context.Context, r rangeable[T], req Request, encoder codec.Encoder) (err error) { +func EncodeStream[T any]( + ctx context.Context, + r rangeable[T], + req Request, + encoder codec.Encoder, + sniffer func([]byte, error), +) (err error) { + stream := req.BodyStream() defer func() { _ = stream.Close() }() @@ -129,6 +160,10 @@ func EncodeStream[T any](ctx context.Context, r rangeable[T], req Request, encod var bts []byte r.Range(func(i int, x T) bool { + defer func() { + sniffer(bts, err) + }() + select { case <-ctx.Done(): return false diff --git a/with_res.go b/with_res.go index 07f5926..833b893 100644 --- a/with_res.go +++ b/with_res.go @@ -2,8 +2,9 @@ package withttp import ( "encoding/json" - "github.com/pkg/errors" "io" + + "github.com/pkg/errors" ) func WithCloseBody[T any]() CallResOptionFunc[T] {