From d69285e65741ec9b0341790814162249d3d99080 Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Wed, 31 Jul 2019 14:04:23 +0800 Subject: [PATCH] add new rsocket-cli implementation. --- .gitignore | 2 + cmd/echo/echo_benchmark_test.go | 2 +- cmd/rsocket-cli-ng/rsocket-cli.go | 306 ------------------- cmd/{rsocket-cli => rsocket-cli-old}/go.mod | 0 cmd/{rsocket-cli => rsocket-cli-old}/go.sum | 0 cmd/{rsocket-cli => rsocket-cli-old}/main.go | 0 cmd/rsocket-cli/rsocket-cli.go | 152 +++++++++ cmd/rsocket-cli/runner.go | 287 +++++++++++++++++ extension/mime.go | 6 + go.mod | 2 +- go.sum | 6 +- internal/common/misc.go | 16 - internal/framing/frame.go | 7 +- internal/framing/frame_error.go | 8 +- internal/framing/frame_metadata_push.go | 2 +- internal/framing/frame_payload.go | 4 +- internal/framing/frame_request_channel.go | 4 +- internal/framing/frame_request_response.go | 2 +- internal/framing/frame_request_stream.go | 4 +- internal/framing/frame_setup.go | 8 +- internal/transport/connection_tcp.go | 6 +- payload/payload_raw.go | 10 +- payload/payload_str.go | 4 - rx/flux/flux.go | 3 + rx/flux/proxy.go | 64 ++-- server.go | 3 +- 26 files changed, 533 insertions(+), 375 deletions(-) delete mode 100644 cmd/rsocket-cli-ng/rsocket-cli.go rename cmd/{rsocket-cli => rsocket-cli-old}/go.mod (100%) rename cmd/{rsocket-cli => rsocket-cli-old}/go.sum (100%) rename cmd/{rsocket-cli => rsocket-cli-old}/main.go (100%) create mode 100644 cmd/rsocket-cli/rsocket-cli.go create mode 100644 cmd/rsocket-cli/runner.go diff --git a/.gitignore b/.gitignore index f5758ff..aa6673e 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ suppressions/ *.zip .idea + +cmd/rsocket-cli/rsocket-cli diff --git a/cmd/echo/echo_benchmark_test.go b/cmd/echo/echo_benchmark_test.go index 0f4c573..4f154f5 100644 --- a/cmd/echo/echo_benchmark_test.go +++ b/cmd/echo/echo_benchmark_test.go @@ -30,7 +30,7 @@ func TestClient_RequestResponse(t *testing.T) { _ = client.Close() }() wg := &sync.WaitGroup{} - n := 50 * 10000 + n := 100 * 10000 wg.Add(n) data := []byte(common.RandAlphanumeric(1024)) diff --git a/cmd/rsocket-cli-ng/rsocket-cli.go b/cmd/rsocket-cli-ng/rsocket-cli.go deleted file mode 100644 index 84fcd02..0000000 --- a/cmd/rsocket-cli-ng/rsocket-cli.go +++ /dev/null @@ -1,306 +0,0 @@ -package main - -import ( - "bufio" - "context" - "fmt" - "log" - "os" - "os/signal" - "strings" - "syscall" - "time" - - "github.com/rsocket/rsocket-go" - "github.com/rsocket/rsocket-go/logger" - "github.com/rsocket/rsocket-go/payload" - "github.com/rsocket/rsocket-go/rx" - "github.com/rsocket/rsocket-go/rx/mono" - "github.com/urfave/cli" -) - -func init() { - logger.DisablePrefix() - fn := func(s string, i ...interface{}) { - fmt.Printf(s, i...) - } - logger.SetFunc(logger.LevelDebug, fn) - logger.SetFunc(logger.LevelInfo, fn) - logger.SetFunc(logger.LevelError, func(s string, i ...interface{}) { - _, _ = os.Stderr.WriteString(fmt.Sprintf(s, i...)) - }) -} - -func main() { - conf := &Runner{} - app := cli.NewApp() - //app.UsageText = "rsocket-cli [global options] [target]" - app.Name = "rsocket-cli" - app.Usage = "CLI for RSocket." - app.Version = "alpha" - app.Flags = newFlags(conf) - app.ArgsUsage = "[target]" - app.Action = func(c *cli.Context) (err error) { - conf.URI = c.Args().Get(0) - return conf.Run() - } - err := app.Run(os.Args) - if err != nil { - log.Fatal(err) - } -} - -func newFlags(args *Runner) []cli.Flag { - return []cli.Flag{ - cli.StringSliceFlag{ - Name: "header,H", - Usage: "Custom header to pass to server", - }, - cli.StringSliceFlag{ - Name: "transport-header,T", - Usage: "Custom header to pass to the transport", - }, - cli.BoolFlag{ - Name: "stream", - Usage: "Request Stream", - Destination: &(args.Stream), - }, - cli.BoolFlag{ - Name: "request", - Usage: "Request Response", - Destination: &(args.Request), - }, - cli.BoolFlag{ - Name: "fnf", - Usage: "Fire And Forget", - Destination: &(args.FNF), - }, - cli.BoolFlag{ - Name: "channel", - Usage: "Channel", - Destination: &(args.Channel), - }, - cli.BoolFlag{ - Name: "metadataPush", - Usage: "Metadata Push", - Destination: &(args.MetadataPush), - }, - cli.BoolFlag{ - Name: "server,s", - Usage: "Start server instead of client", - Destination: &(args.ServerMode), - }, - cli.StringFlag{ - Name: "input,i", - Usage: "String input, '-' (STDIN) or @path/to/file", - Destination: &(args.Input), - }, - cli.StringFlag{ - Name: "metadata, m", - Usage: "Metadata input string input or @path/to/file", - Destination: &(args.Metadata), - }, - cli.StringFlag{ - Name: "metadataFormat", - Usage: "Metadata Format", - Value: "json", - Destination: &(args.MetadataFormat), - }, - cli.StringFlag{ - Name: "dataFormat", - Usage: "Data Format", - Value: "binary", - Destination: &(args.DataFormat), - }, - cli.StringFlag{ - Name: "setup", - Usage: "String input or @path/to/file for setup metadata", - Destination: &(args.Setup), - }, - cli.BoolFlag{ - Name: "debug,d", - Usage: "Debug Output", - Destination: &(args.Debug), - }, - cli.IntFlag{ - Name: "ops", - Usage: "Operation Count", - Value: 1, - Destination: &(args.Ops), - }, - cli.DurationFlag{ - Name: "timeout", - Usage: "Timeout in seconds", - Destination: &(args.Timeout), - }, - cli.DurationFlag{ - Name: "keepalive,k", - Usage: "Keepalive period", - Value: 20 * time.Second, - Destination: &(args.Keepalive), - }, - cli.IntFlag{ - Name: "requestn, r", - Usage: "Request N credits", - Value: rx.RequestMax, - Destination: &(args.N), - }, - cli.BoolFlag{ - Name: "resume", - Usage: "resume enabled", - Destination: &(args.Resume), - }, - } - -} - -type Runner struct { - Stream bool - Request bool - FNF bool - Channel bool - MetadataPush bool - ServerMode bool - Input string - Metadata string - MetadataFormat string - DataFormat string - Setup string - Debug bool - Ops int - Timeout time.Duration - Keepalive time.Duration - N int - Resume bool - URI string -} - -func (p *Runner) PreFlight() (err error) { - if p.Debug { - logger.SetLevel(logger.LevelDebug) - } - if p.Input == "-" { - fmt.Println("Type commands to send to the server......") - reader := bufio.NewReader(os.Stdin) - text, _ := reader.ReadString('\n') - p.Input = strings.Trim(text, "\n") - } - return -} - -func (p *Runner) Run() error { - if err := p.PreFlight(); err != nil { - return err - } - ctx, cancel := context.WithCancel(context.Background()) - - defer func() { - cancel() - }() - - go func() { - cs := make(chan os.Signal, 1) - signal.Notify(cs, os.Interrupt, syscall.SIGTERM) - <-cs - cancel() - }() - - if p.ServerMode { - return p.runServerMode(ctx) - } - return p.runClientMode(ctx) -} - -func (p *Runner) runClientMode(ctx context.Context) (err error) { - var cb rsocket.ClientBuilder - if p.Resume { - cb = rsocket.Connect().Resume() - } else { - cb = rsocket.Connect() - } - c, err := cb. - DataMimeType(p.DataFormat). - MetadataMimeType(p.MetadataFormat). - SetupPayload(payload.NewString(p.Setup, "")). - Transport(p.URI). - Start(context.Background()) - if err != nil { - return - } - defer func() { - _ = c.Close() - }() - send := p.CreatePayload() - if p.Stream { - err = p.RequestStream(ctx, c, send) - } else { - err = p.RequestResponse(ctx, c, send) - } - return -} - -func (p *Runner) runServerMode(ctx context.Context) error { - var sb rsocket.ServerBuilder - if p.Resume { - sb = rsocket.Receive().Resume() - } else { - sb = rsocket.Receive() - } - ch := make(chan error) - go func() { - err := sb. - Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket { - var options []rsocket.OptAbstractSocket - - if p.Channel { - - } else if p.Stream { - - } else { - options = append(options, rsocket.RequestResponse(func(msg payload.Payload) mono.Mono { - p.showPayload(msg) - return mono.Just(p.CreatePayload()) - })) - } - return rsocket.NewAbstractSocket(options...) - }). - Transport(p.URI). - Serve(ctx) - ch <- err - close(ch) - }() - return <-ch -} - -func (p *Runner) RequestResponse(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { - res, err := c.RequestResponse(send).Block(ctx) - if err != nil { - return - } - p.showPayload(res) - return -} - -func (p *Runner) RequestStream(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { - first := true - _, err = c.RequestStream(send). - DoOnNext(func(input payload.Payload) { - if first { - first = false - p.showPayload(input) - } else { - logger.Infof("\n") - p.showPayload(input) - } - }). - BlockLast(ctx) - return -} - -func (p *Runner) showPayload(pa payload.Payload) { - logger.Infof("%s", pa.DataUTF8()) -} - -func (p *Runner) CreatePayload() payload.Payload { - return payload.NewString(p.Input, p.Metadata) -} diff --git a/cmd/rsocket-cli/go.mod b/cmd/rsocket-cli-old/go.mod similarity index 100% rename from cmd/rsocket-cli/go.mod rename to cmd/rsocket-cli-old/go.mod diff --git a/cmd/rsocket-cli/go.sum b/cmd/rsocket-cli-old/go.sum similarity index 100% rename from cmd/rsocket-cli/go.sum rename to cmd/rsocket-cli-old/go.sum diff --git a/cmd/rsocket-cli/main.go b/cmd/rsocket-cli-old/main.go similarity index 100% rename from cmd/rsocket-cli/main.go rename to cmd/rsocket-cli-old/main.go diff --git a/cmd/rsocket-cli/rsocket-cli.go b/cmd/rsocket-cli/rsocket-cli.go new file mode 100644 index 0000000..4ba0f6f --- /dev/null +++ b/cmd/rsocket-cli/rsocket-cli.go @@ -0,0 +1,152 @@ +package main + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/rsocket/rsocket-go/extension" + "github.com/rsocket/rsocket-go/logger" + "github.com/rsocket/rsocket-go/rx" + "github.com/urfave/cli" +) + +func init() { + logger.DisablePrefix() + fn := func(s string, i ...interface{}) { + fmt.Printf(s, i...) + } + logger.SetFunc(logger.LevelDebug, fn) + logger.SetFunc(logger.LevelInfo, fn) + logger.SetFunc(logger.LevelError, func(s string, i ...interface{}) { + _, _ = os.Stderr.WriteString(fmt.Sprintf(s, i...)) + }) +} + +func main() { + conf := &Runner{} + app := cli.NewApp() + app.UsageText = "rsocket-cli [global options] [URI]" + app.Name = "rsocket-cli" + app.Usage = "CLI for RSocket." + app.Version = "alpha" + app.Flags = newFlags(conf) + app.ArgsUsage = "[URI]" + app.Action = func(c *cli.Context) (err error) { + if c.NArg() < 1 { + cli.ShowAppHelpAndExit(c, 1) + return + } + conf.URI = c.Args().First() + return conf.Run() + } + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} + +func newFlags(args *Runner) []cli.Flag { + return []cli.Flag{ + cli.StringSliceFlag{ + Name: "header,H", + Usage: "Custom header to pass to server", + }, + cli.StringSliceFlag{ + Name: "transport-header,T", + Usage: "Custom header to pass to the transport", + }, + cli.BoolFlag{ + Name: "stream", + Usage: "Request Stream", + Destination: &(args.Stream), + }, + cli.BoolFlag{ + Name: "request", + Usage: "Request Response", + Destination: &(args.Request), + }, + cli.BoolFlag{ + Name: "fnf", + Usage: "Fire And Forget", + Destination: &(args.FNF), + }, + cli.BoolFlag{ + Name: "channel", + Usage: "Channel", + Destination: &(args.Channel), + }, + cli.BoolFlag{ + Name: "metadataPush", + Usage: "Metadata Push", + Destination: &(args.MetadataPush), + }, + cli.BoolFlag{ + Name: "server,s", + Usage: "Start server instead of client", + Destination: &(args.ServerMode), + }, + cli.StringFlag{ + Name: "input,i", + Usage: "String input, '-' (STDIN) or @path/to/file", + Destination: &(args.Input), + }, + cli.StringFlag{ + Name: "metadata, m", + Usage: "Metadata input string input or @path/to/file", + Destination: &(args.Metadata), + }, + cli.StringFlag{ + Name: "metadataFormat", + Usage: "Metadata Format", + Value: extension.ApplicationJSON.String(), + Destination: &(args.MetadataFormat), + }, + cli.StringFlag{ + Name: "dataFormat", + Usage: "Data Format", + Value: "application/binary", + Destination: &(args.DataFormat), + }, + cli.StringFlag{ + Name: "setup", + Usage: "String input or @path/to/file for setup metadata", + Destination: &(args.Setup), + }, + cli.BoolFlag{ + Name: "debug,d", + Usage: "Debug Output", + Destination: &(args.Debug), + }, + cli.IntFlag{ + Name: "ops", + Usage: "Operation Count", + Value: 1, + Destination: &(args.Ops), + }, + cli.DurationFlag{ + Name: "timeout", + Usage: "Timeout in seconds", + Destination: &(args.Timeout), + }, + cli.DurationFlag{ + Name: "keepalive,k", + Usage: "Keepalive period", + Value: 20 * time.Second, + Destination: &(args.Keepalive), + }, + cli.IntFlag{ + Name: "requestn, r", + Usage: "Request N credits", + Value: rx.RequestMax, + Destination: &(args.N), + }, + cli.BoolFlag{ + Name: "resume", + Usage: "resume enabled", + Destination: &(args.Resume), + }, + } + +} diff --git a/cmd/rsocket-cli/runner.go b/cmd/rsocket-cli/runner.go new file mode 100644 index 0000000..b7038e7 --- /dev/null +++ b/cmd/rsocket-cli/runner.go @@ -0,0 +1,287 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io/ioutil" + "os" + "strings" + "time" + + "github.com/rsocket/rsocket-go" + "github.com/rsocket/rsocket-go/logger" + "github.com/rsocket/rsocket-go/payload" + "github.com/rsocket/rsocket-go/rx" + "github.com/rsocket/rsocket-go/rx/flux" + "github.com/rsocket/rsocket-go/rx/mono" +) + +type Runner struct { + Stream bool + Request bool + FNF bool + Channel bool + MetadataPush bool + ServerMode bool + Input string + Metadata string + MetadataFormat string + DataFormat string + Setup string + Debug bool + Ops int + Timeout time.Duration + Keepalive time.Duration + N int + Resume bool + URI string +} + +func (p *Runner) preflight() (err error) { + if p.Debug { + logger.SetLevel(logger.LevelDebug) + } + + return +} + +func (p *Runner) Run() error { + if err := p.preflight(); err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + cancel() + }() + + //go func() { + // cs := make(chan os.Signal, 1) + // signal.Notify(cs, os.Interrupt, syscall.SIGTERM) + // <-cs + // cancel() + //}() + + if p.ServerMode { + return p.runServerMode(ctx) + } + return p.runClientMode(ctx) +} + +func (p *Runner) runClientMode(ctx context.Context) (err error) { + var cb rsocket.ClientBuilder + if p.Resume { + cb = rsocket.Connect().Resume() + } else { + cb = rsocket.Connect() + } + + setupData, err := p.readData(p.Setup) + if err != nil { + return + } + setupPayload := payload.New(setupData, nil) + sendings := p.createPayload() + c, err := cb. + DataMimeType(p.DataFormat). + MetadataMimeType(p.MetadataFormat). + SetupPayload(setupPayload). + Transport(p.URI). + Start(ctx) + if err != nil { + return + } + defer func() { + _ = c.Close() + }() + + for i := 0; i < p.Ops; i++ { + if i > 0 { + logger.Infof("\n") + } + var first payload.Payload + if !p.Channel { + first, err = sendings.BlockFirst(ctx) + if err != nil { + return + } + } + + if p.Request { + err = p.execRequestResponse(ctx, c, first) + } else if p.FNF { + err = p.execFireAndForget(ctx, c, first) + } else if p.Stream { + err = p.execRequestStream(ctx, c, first) + } else if p.Channel { + err = p.execRequestChannel(ctx, c, sendings) + } else if p.MetadataPush { + err = p.execMetadataPush(ctx, c, first) + } else { + err = p.execRequestResponse(ctx, c, first) + } + if err != nil { + break + } + } + return +} + +func (p *Runner) runServerMode(ctx context.Context) error { + var sb rsocket.ServerBuilder + if p.Resume { + sb = rsocket.Receive().Resume() + } else { + sb = rsocket.Receive() + } + ch := make(chan error) + go func() { + sendings := p.createPayload() + var first payload.Payload + if !p.Channel { + var err error + first, err = sendings.BlockFirst(ctx) + if err != nil { + ch <- err + return + } + } + ch <- sb. + Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket { + var options []rsocket.OptAbstractSocket + if p.Channel { + + } else if p.Stream { + + } else { + options = append(options, rsocket.RequestResponse(func(msg payload.Payload) mono.Mono { + p.showPayload(msg) + return mono.Just(first) + })) + } + return rsocket.NewAbstractSocket(options...) + }). + Transport(p.URI). + Serve(ctx) + close(ch) + }() + return <-ch +} + +func (p *Runner) execMetadataPush(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { + c.MetadataPush(send) + return +} + +func (p *Runner) execFireAndForget(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { + c.FireAndForget(send) + return +} + +func (p *Runner) execRequestResponse(ctx context.Context, c rsocket.Client, send payload.Payload) (err error) { + res, err := c.RequestResponse(send).Block(ctx) + if err != nil { + return + } + p.showPayload(res) + return +} + +func (p *Runner) execRequestChannel(ctx context.Context, c rsocket.Client, send flux.Flux) error { + var f flux.Flux + if p.N < rx.RequestMax { + f = c.RequestChannel(send).Take(p.N) + } else { + f = c.RequestChannel(send) + } + return p.printFlux(ctx, f) +} + +func (p *Runner) execRequestStream(ctx context.Context, c rsocket.Client, send payload.Payload) error { + var f flux.Flux + if p.N < rx.RequestMax { + f = c.RequestStream(send).Take(p.N) + } else { + f = c.RequestStream(send) + } + return p.printFlux(ctx, f) +} + +func (p *Runner) printFlux(ctx context.Context, f flux.Flux) (err error) { + var requested int + _, err = f. + DoOnNext(func(input payload.Payload) { + if requested == 0 { + p.showPayload(input) + } else { + logger.Infof("\n") + p.showPayload(input) + } + requested++ + }). + BlockLast(ctx) + return +} + +func (p *Runner) showPayload(pa payload.Payload) { + logger.Infof("%s", pa.DataUTF8()) +} + +func (p *Runner) createPayload() flux.Flux { + var md []byte + if strings.HasPrefix(p.Metadata, "@") { + var err error + md, err = ioutil.ReadFile(p.Metadata[1:]) + if err != nil { + return flux.Error(err) + } + } else { + md = []byte(p.Metadata) + } + + if p.Input == "-" { + fmt.Println("Type commands to send to the server......") + reader := bufio.NewReader(os.Stdin) + text, _ := reader.ReadString('\n') + return flux.Just(payload.New([]byte(strings.Trim(text, "\n")), md)) + } + + if !strings.HasPrefix(p.Input, "@") { + return flux.Just(payload.New([]byte(p.Input), md)) + } + + return flux.Create(func(ctx context.Context, s flux.Sink) { + f, err := os.Open(p.Input[1:]) + if err != nil { + fmt.Println("error:", err) + s.Error(err) + return + } + defer func() { + _ = f.Close() + }() + scanner := bufio.NewScanner(f) + + for scanner.Scan() { + select { + case <-ctx.Done(): + s.Error(ctx.Err()) + return + default: + s.Next(payload.New([]byte(scanner.Text()), md)) + } + } + s.Complete() + }) +} + +func (p *Runner) readData(input string) (data []byte, err error) { + switch { + case strings.HasPrefix(input, "@"): + data, err = ioutil.ReadFile(input[1:]) + case input != "": + data = []byte(input) + } + return +} diff --git a/extension/mime.go b/extension/mime.go index 755aa43..b055be0 100644 --- a/extension/mime.go +++ b/extension/mime.go @@ -62,6 +62,9 @@ const ( VideoH264 VideoH265 VideoVP8 + Hessian + JavaObject + CloudEventsJSON MessageZipkin MIME = 0x7D MessageRouting MIME = 0x7E MessageCompositeMetadata MIME = 0x7F @@ -107,6 +110,9 @@ func init() { VideoH264: "video/H264", VideoH265: "video/H265", VideoVP8: "video/VP8", + Hessian: "application/x-hessian", + JavaObject: "application/x-java-object", + CloudEventsJSON: "application/cloudevents+json", MessageZipkin: "message/x.rsocket.tracing-zipkin.v0", MessageRouting: "message/x.rsocket.routing.v0", MessageCompositeMetadata: "message/x.rsocket.composite-metadata.v0", diff --git a/go.mod b/go.mod index f0032be..409966b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/google/uuid v1.1.1 github.com/gorilla/websocket v1.4.0 - github.com/jjeffcaii/reactor-go v0.0.12 + github.com/jjeffcaii/reactor-go v0.0.14 github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 github.com/urfave/cli v1.20.0 diff --git a/go.sum b/go.sum index 3a9d2b7..9050c55 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/jjeffcaii/reactor-go v0.0.12 h1:2aJc30+IzWoTe+Q+/kmx12Kfiht5fYQgGrMVb8789ZM= -github.com/jjeffcaii/reactor-go v0.0.12/go.mod h1:yxYBt62huNjDF5+tuzzGhjHM/SCcscp6GeYPsNjU7eA= +github.com/jjeffcaii/reactor-go v0.0.14 h1:bvAxdxB1LTADNy/m3giSokCFUtvOYaHmeM4jLmf/80w= +github.com/jjeffcaii/reactor-go v0.0.14/go.mod h1:yxYBt62huNjDF5+tuzzGhjHM/SCcscp6GeYPsNjU7eA= github.com/panjf2000/ants v1.0.0 h1:MZBsUt8W6ktQfhIswUZpw17IJlXY6ly2+U5b9jxwad4= github.com/panjf2000/ants v1.0.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -19,5 +19,3 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= diff --git a/internal/common/misc.go b/internal/common/misc.go index 692af8b..14b55da 100644 --- a/internal/common/misc.go +++ b/internal/common/misc.go @@ -2,7 +2,6 @@ package common import ( "time" - "unsafe" ) const ( @@ -12,18 +11,3 @@ const ( DefaultKeepaliveMaxLifetime = 90 * time.Second ) -// Str2bytes convert string to bytes. -func Str2bytes(s string) []byte { - x := (*[2]uintptr)(unsafe.Pointer(&s)) - h := [3]uintptr{x[0], x[1], x[1]} - return *(*[]byte)(unsafe.Pointer(&h)) -} - -// Bytes2str convert bytes to string. -func Bytes2str(b []byte) (s string) { - if len(b) < 1 { - return - } - s = *(*string)(unsafe.Pointer(&b)) - return -} diff --git a/internal/framing/frame.go b/internal/framing/frame.go index 6fe3116..51d12df 100644 --- a/internal/framing/frame.go +++ b/internal/framing/frame.go @@ -2,6 +2,7 @@ package framing import ( "errors" + "fmt" "io" "strings" @@ -123,6 +124,7 @@ func newFlags(flags ...FrameFlag) FrameFlag { // Frame is a single message containing a request, response, or protocol processing. type Frame interface { + fmt.Stringer io.WriterTo // Header returns frame FrameHeader. Header() FrameHeader @@ -210,7 +212,10 @@ func (p *BaseFrame) WriteTo(w io.Writer) (n int64, err error) { // Bytes returns frame in bytes. func (p *BaseFrame) Bytes() []byte { - return append(p.header[:], p.body.Bytes()...) + ret := make([]byte, HeaderLen+p.body.Len()) + copy(ret[:HeaderLen], p.header[:]) + copy(ret[HeaderLen:], p.body.Bytes()) + return ret } // NewBaseFrame returns a new BaseFrame. diff --git a/internal/framing/frame_error.go b/internal/framing/frame_error.go index 81507dd..71ea95a 100644 --- a/internal/framing/frame_error.go +++ b/internal/framing/frame_error.go @@ -18,6 +18,10 @@ type FrameError struct { *BaseFrame } +func (p *FrameError) String() string { + return fmt.Sprintf("FrameError{%s,code=%s,data=%s}", p.header, p.ErrorCode(), p.ErrorData()) +} + // Validate returns error if frame is invalid. func (p *FrameError) Validate() (err error) { if p.Len() < minErrorFrameLen { @@ -47,11 +51,11 @@ func NewFrameError(streamID uint32, code common.ErrorCode, data []byte) *FrameEr var b4 [4]byte binary.BigEndian.PutUint32(b4[:], uint32(code)) if _, err := bf.Write(b4[:]); err != nil { - + panic(err) } if _, err := bf.Write(data); err != nil { - + panic(err) } return &FrameError{ diff --git a/internal/framing/frame_metadata_push.go b/internal/framing/frame_metadata_push.go index 4310ac1..3c94dc1 100644 --- a/internal/framing/frame_metadata_push.go +++ b/internal/framing/frame_metadata_push.go @@ -37,7 +37,7 @@ func (p *FrameMetadataPush) Data() []byte { func (p *FrameMetadataPush) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } diff --git a/internal/framing/frame_payload.go b/internal/framing/frame_payload.go index cff3fa3..1f0d1c3 100644 --- a/internal/framing/frame_payload.go +++ b/internal/framing/frame_payload.go @@ -35,14 +35,14 @@ func (p *FramePayload) Data() []byte { func (p *FramePayload) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } // DataUTF8 returns data as UTF8 string. func (p *FramePayload) DataUTF8() string { - return common.Bytes2str(p.Data()) + return string(p.Data()) } // NewFramePayload returns a new payload frame. diff --git a/internal/framing/frame_request_channel.go b/internal/framing/frame_request_channel.go index cc2234d..8b59972 100644 --- a/internal/framing/frame_request_channel.go +++ b/internal/framing/frame_request_channel.go @@ -50,14 +50,14 @@ func (p *FrameRequestChannel) Data() []byte { func (p *FrameRequestChannel) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } // DataUTF8 returns data as UTF8 string. func (p *FrameRequestChannel) DataUTF8() string { - return common.Bytes2str(p.Data()) + return string(p.Data()) } // NewFrameRequestChannel returns a new RequestChannel frame. diff --git a/internal/framing/frame_request_response.go b/internal/framing/frame_request_response.go index 865a2db..865f6d7 100644 --- a/internal/framing/frame_request_response.go +++ b/internal/framing/frame_request_response.go @@ -35,7 +35,7 @@ func (p *FrameRequestResponse) Data() []byte { func (p *FrameRequestResponse) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } diff --git a/internal/framing/frame_request_stream.go b/internal/framing/frame_request_stream.go index 897d52f..05ae921 100644 --- a/internal/framing/frame_request_stream.go +++ b/internal/framing/frame_request_stream.go @@ -49,14 +49,14 @@ func (p *FrameRequestStream) Data() []byte { func (p *FrameRequestStream) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } // DataUTF8 returns data as UTF8 string. func (p *FrameRequestStream) DataUTF8() string { - return common.Bytes2str(p.Data()) + return string(p.Data()) } // NewFrameRequestStream returns a new request stream frame. diff --git a/internal/framing/frame_setup.go b/internal/framing/frame_setup.go index b391314..3e40e5f 100644 --- a/internal/framing/frame_setup.go +++ b/internal/framing/frame_setup.go @@ -76,13 +76,13 @@ func (p *FrameSetup) Token() []byte { // DataMimeType returns MIME of data. func (p *FrameSetup) DataMimeType() (mime string) { _, b := p.mime() - return common.Bytes2str(b) + return string(b) } // MetadataMimeType returns MIME of metadata. func (p *FrameSetup) MetadataMimeType() string { a, _ := p.mime() - return common.Bytes2str(a) + return string(a) } // Metadata returns metadata bytes. @@ -111,14 +111,14 @@ func (p *FrameSetup) Data() []byte { func (p *FrameSetup) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } // DataUTF8 returns data as UTF8 string. func (p *FrameSetup) DataUTF8() string { - return common.Bytes2str(p.Data()) + return string(p.Data()) } func (p *FrameSetup) mime() (metadata []byte, data []byte) { diff --git a/internal/transport/connection_tcp.go b/internal/transport/connection_tcp.go index 5aebc68..1d0d48f 100644 --- a/internal/transport/connection_tcp.go +++ b/internal/transport/connection_tcp.go @@ -81,13 +81,17 @@ func (p *tcpConn) Write(frame framing.Frame) (err error) { err = errors.Wrap(err, "write frame failed") return } + var debugStr string + if logger.IsDebugEnabled() { + debugStr = frame.String() + } _, err = frame.WriteTo(p.writer) if err != nil { err = errors.Wrap(err, "write frame failed") return } if logger.IsDebugEnabled() { - logger.Debugf("---> snd: %s\n", frame) + logger.Debugf("---> snd: %s\n", debugStr) } return } diff --git a/payload/payload_raw.go b/payload/payload_raw.go index 7e2b47a..5092a2d 100644 --- a/payload/payload_raw.go +++ b/payload/payload_raw.go @@ -2,8 +2,6 @@ package payload import ( "fmt" - - "github.com/rsocket/rsocket-go/internal/common" ) type rawPayload struct { @@ -23,7 +21,7 @@ func (p *rawPayload) Metadata() (metadata []byte, ok bool) { func (p *rawPayload) MetadataUTF8() (metadata string, ok bool) { raw, ok := p.Metadata() if ok { - metadata = common.Bytes2str(raw) + metadata = string(raw) } return } @@ -33,9 +31,5 @@ func (p *rawPayload) Data() []byte { } func (p *rawPayload) DataUTF8() string { - return common.Bytes2str(p.data) -} - -func (*rawPayload) Release() { - // ignore + return string(p.data) } diff --git a/payload/payload_str.go b/payload/payload_str.go index f5b953b..128f423 100644 --- a/payload/payload_str.go +++ b/payload/payload_str.go @@ -30,7 +30,3 @@ func (p *strPayload) Data() []byte { func (p *strPayload) DataUTF8() string { return p.data } - -func (*strPayload) Release() { - // ignore -} diff --git a/rx/flux/flux.go b/rx/flux/flux.go index 65ec99d..16f15f5 100644 --- a/rx/flux/flux.go +++ b/rx/flux/flux.go @@ -17,6 +17,7 @@ type Sink interface { type Flux interface { rx.Publisher + Take(n int) Flux Filter(rx.FnPredicate) Flux DoOnError(rx.FnOnError) Flux DoOnNext(rx.FnOnNext) Flux @@ -28,7 +29,9 @@ type Flux interface { SwitchOnFirst(FnSwitchOnFirst) Flux SubscribeOn(scheduler.Scheduler) Flux Raw() flux.Flux + BlockFirst(context.Context) (payload.Payload, error) BlockLast(context.Context) (payload.Payload, error) + ToChan(ctx context.Context, cap int) (c <-chan payload.Payload, e <-chan error) } type Processor interface { diff --git a/rx/flux/proxy.go b/rx/flux/proxy.go index 7631248..6cf6dac 100644 --- a/rx/flux/proxy.go +++ b/rx/flux/proxy.go @@ -7,6 +7,7 @@ import ( "github.com/jjeffcaii/reactor-go/flux" "github.com/jjeffcaii/reactor-go/scheduler" "github.com/pkg/errors" + "github.com/rsocket/rsocket-go/internal/framing" "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" ) @@ -45,6 +46,10 @@ func (p proxy) Error(e error) { p.mustProcessor().Error(e) } +func (p proxy) Take(n int) Flux { + return newProxy(p.Flux.Take(n)) +} + func (p proxy) Filter(fn rx.FnPredicate) Flux { return newProxy(p.Flux.Filter(func(i interface{}) bool { return fn(i.(payload.Payload)) @@ -65,25 +70,50 @@ func (p proxy) DoOnNext(fn rx.FnOnNext) Flux { })) } -func (p proxy) BlockLast(ctx context.Context) (last payload.Payload, err error) { - done := make(chan struct{}) - sub := rs.NewSubscriber( - rs.OnNext(func(v interface{}) { - last = v.(payload.Payload) - }), - rs.OnError(func(e error) { - err = e - }), - ) - p.Flux. - DoFinally(func(s rs.SignalType) { - if s == rs.SignalTypeCancel { - err = rs.ErrSubscribeCancelled +func (p proxy) ToChan(ctx context.Context, cap int) (c <-chan payload.Payload, e <-chan error) { + if cap < 1 { + cap = 1 + } + ch := make(chan payload.Payload, cap) + err := make(chan error, 1) + p. + DoFinally(func(s rx.SignalType) { + if s == rx.SignalCancel { + err <- rs.ErrSubscribeCancelled } - close(done) + close(ch) + close(err) }). - SubscribeWith(ctx, sub) - <-done + Subscribe(ctx, + rx.OnNext(func(v payload.Payload) { + if _, ok := v.(framing.Frame); ok { + ch <- payload.Clone(v) + } else { + ch <- v + } + }), + rx.OnError(func(e error) { + err <- e + }), + ) + return ch, err +} + +func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error) { + v, err := p.Flux.BlockFirst(ctx) + if err != nil { + return + } + first = v.(payload.Payload) + return +} + +func (p proxy) BlockLast(ctx context.Context) (last payload.Payload, err error) { + v, err := p.Flux.BlockLast(ctx) + if err != nil { + return + } + last = v.(payload.Payload) return } diff --git a/server.go b/server.go index e303564..91ab156 100644 --- a/server.go +++ b/server.go @@ -3,7 +3,6 @@ package rsocket import ( "context" "crypto/tls" - "fmt" "time" "github.com/rsocket/rsocket-go/internal/common" @@ -269,7 +268,7 @@ func (p *server) doResume(frame *framing.FrameResume, tp *transport.Transport, s sending = framing.NewFrameError( 0, common.ErrorCodeRejectedResume, - common.Str2bytes(fmt.Sprintf("no such session")), + []byte("no such session"), ) } if err := tp.Send(sending, true); err != nil {