From f4e64d9e21f978612ede7f4f68c08cf140e5fd09 Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Fri, 5 Jun 2020 22:54:49 +0800 Subject: [PATCH] Fix #59: now clients without acceptor will be processed correctly. --- client.go | 25 +++++++++++++++---------- internal/socket/socket.go | 20 ++++++++++---------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index d5217e6..b9bb5ec 100644 --- a/client.go +++ b/client.go @@ -13,7 +13,10 @@ import ( "github.com/rsocket/rsocket-go/payload" ) -var defaultMimeType = []byte("application/binary") +var ( + _defaultMimeType = []byte("application/binary") + _noopSocket = NewAbstractSocket() +) type ( // ClientResumeOptions represents resume options for client. @@ -24,11 +27,6 @@ type ( CloseableRSocket } - setupClientSocket interface { - Client - Setup(ctx context.Context, setup *socket.SetupInfo) error - } - // ClientSocketAcceptor is alias for RSocket handler function. ClientSocketAcceptor = func(socket RSocket) RSocket @@ -49,11 +47,11 @@ type ( ClientTransportBuilder // Fragment set fragmentation size which default is 16_777_215(16MB). Fragment(mtu int) ClientBuilder - // KeepAlive defines current client keepalive settings. KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder - // Resume enable resume for current RSocket. + // Resume enable the functionality of resume. Resume(opts ...ClientResumeOptions) ClientBuilder + // Lease enable the functionality of lease. Lease() ClientBuilder // DataMimeType is used to set payload data MIME type. // Default MIME type is `application/binary`. @@ -79,6 +77,11 @@ type ( // "wss://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport with HTTPS. Transport(uri string, opts ...TransportOpts) ClientStarter } + + setupClientSocket interface { + Client + Setup(ctx context.Context, setup *socket.SetupInfo) error + } ) // Connect create a new RSocket client builder with default settings. @@ -89,8 +92,8 @@ func Connect() ClientBuilder { Version: common.DefaultVersion, KeepaliveInterval: common.DefaultKeepaliveInterval, KeepaliveLifetime: common.DefaultKeepaliveMaxLifetime, - DataMimeType: defaultMimeType, - MetadataMimeType: defaultMimeType, + DataMimeType: _defaultMimeType, + MetadataMimeType: _defaultMimeType, }, } } @@ -229,6 +232,8 @@ func (p *implClientBuilder) start(ctx context.Context, tc *tls.Config) (client C } if p.acceptor != nil { sk.SetResponder(p.acceptor(cs)) + } else { + sk.SetResponder(_noopSocket) } // bind closers. diff --git a/internal/socket/socket.go b/internal/socket/socket.go index 829c515..f4fee83 100644 --- a/internal/socket/socket.go +++ b/internal/socket/socket.go @@ -16,11 +16,11 @@ import ( ) var ( - errUnsupportedMetadataPush = errors.New("unsupported METADATA_PUSH") - errUnsupportedFireAndForget = errors.New("unsupported FIRE_AND_FORGET") - errUnsupportedRequestResponse = errors.New("unsupported REQUEST_RESPONSE") - errUnsupportedRequestStream = errors.New("unsupported REQUEST_STREAM") - errUnsupportedRequestChannel = errors.New("unsupported REQUEST_CHANNEL") + errUnimplementedMetadataPush = errors.New("METADATA_PUSH is unimplemented") + errUnimplementedFireAndForget = errors.New("FIRE_AND_FORGET is unimplemented") + errUnimplementedRequestResponse = errors.New("REQUEST_RESPONSE is unimplemented") + errUnimplementedRequestStream = errors.New("REQUEST_STREAM is unimplemented") + errUnimplementedRequestChannel = errors.New("REQUEST_CHANNEL is unimplemented") ) // Closeable represents a closeable target. @@ -80,7 +80,7 @@ type AbstractRSocket struct { // MetadataPush starts a request of MetadataPush. func (p AbstractRSocket) MetadataPush(message payload.Payload) { if p.MP == nil { - logger.Errorf("%s\n", errUnsupportedMetadataPush) + logger.Errorf("%s\n", errUnimplementedMetadataPush) return } p.MP(message) @@ -89,7 +89,7 @@ func (p AbstractRSocket) MetadataPush(message payload.Payload) { // FireAndForget starts a request of FireAndForget. func (p AbstractRSocket) FireAndForget(message payload.Payload) { if p.FF == nil { - logger.Errorf("%s\n", errUnsupportedFireAndForget) + logger.Errorf("%s\n", errUnimplementedFireAndForget) return } p.FF(message) @@ -98,7 +98,7 @@ func (p AbstractRSocket) FireAndForget(message payload.Payload) { // RequestResponse starts a request of RequestResponse. func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono { if p.RR == nil { - return mono.Error(errUnsupportedRequestResponse) + return mono.Error(errUnimplementedRequestResponse) } return p.RR(message) } @@ -106,7 +106,7 @@ func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono { // RequestStream starts a request of RequestStream. func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux { if p.RS == nil { - return flux.Error(errUnsupportedRequestStream) + return flux.Error(errUnimplementedRequestStream) } return p.RS(message) } @@ -114,7 +114,7 @@ func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux { // RequestChannel starts a request of RequestChannel. func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux { if p.RC == nil { - return flux.Error(errUnsupportedRequestChannel) + return flux.Error(errUnimplementedRequestChannel) } return p.RC(messages) }