Skip to content

Commit

Permalink
Fix #59: now clients without acceptor will be processed correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Jun 5, 2020
1 parent 6f10710 commit f4e64d9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
25 changes: 15 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/rsocket/rsocket-go/payload"
)

var defaultMimeType = []byte("application/binary")
var (
_defaultMimeType = []byte("application/binary")
_noopSocket = NewAbstractSocket()
)

type (
// ClientResumeOptions represents resume options for client.
Expand All @@ -24,11 +27,6 @@ type (
CloseableRSocket
}

setupClientSocket interface {
Client
Setup(ctx context.Context, setup *socket.SetupInfo) error
}

// ClientSocketAcceptor is alias for RSocket handler function.
ClientSocketAcceptor = func(socket RSocket) RSocket

Expand All @@ -49,11 +47,11 @@ type (
ClientTransportBuilder
// Fragment set fragmentation size which default is 16_777_215(16MB).
Fragment(mtu int) ClientBuilder

// KeepAlive defines current client keepalive settings.
KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
// Resume enable resume for current RSocket.
// Resume enable the functionality of resume.
Resume(opts ...ClientResumeOptions) ClientBuilder
// Lease enable the functionality of lease.
Lease() ClientBuilder
// DataMimeType is used to set payload data MIME type.
// Default MIME type is `application/binary`.
Expand All @@ -79,6 +77,11 @@ type (
// "wss://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport with HTTPS.
Transport(uri string, opts ...TransportOpts) ClientStarter
}

setupClientSocket interface {
Client
Setup(ctx context.Context, setup *socket.SetupInfo) error
}
)

// Connect create a new RSocket client builder with default settings.
Expand All @@ -89,8 +92,8 @@ func Connect() ClientBuilder {
Version: common.DefaultVersion,
KeepaliveInterval: common.DefaultKeepaliveInterval,
KeepaliveLifetime: common.DefaultKeepaliveMaxLifetime,
DataMimeType: defaultMimeType,
MetadataMimeType: defaultMimeType,
DataMimeType: _defaultMimeType,
MetadataMimeType: _defaultMimeType,
},
}
}
Expand Down Expand Up @@ -229,6 +232,8 @@ func (p *implClientBuilder) start(ctx context.Context, tc *tls.Config) (client C
}
if p.acceptor != nil {
sk.SetResponder(p.acceptor(cs))
} else {
sk.SetResponder(_noopSocket)
}

// bind closers.
Expand Down
20 changes: 10 additions & 10 deletions internal/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

var (
errUnsupportedMetadataPush = errors.New("unsupported METADATA_PUSH")
errUnsupportedFireAndForget = errors.New("unsupported FIRE_AND_FORGET")
errUnsupportedRequestResponse = errors.New("unsupported REQUEST_RESPONSE")
errUnsupportedRequestStream = errors.New("unsupported REQUEST_STREAM")
errUnsupportedRequestChannel = errors.New("unsupported REQUEST_CHANNEL")
errUnimplementedMetadataPush = errors.New("METADATA_PUSH is unimplemented")
errUnimplementedFireAndForget = errors.New("FIRE_AND_FORGET is unimplemented")
errUnimplementedRequestResponse = errors.New("REQUEST_RESPONSE is unimplemented")
errUnimplementedRequestStream = errors.New("REQUEST_STREAM is unimplemented")
errUnimplementedRequestChannel = errors.New("REQUEST_CHANNEL is unimplemented")
)

// Closeable represents a closeable target.
Expand Down Expand Up @@ -80,7 +80,7 @@ type AbstractRSocket struct {
// MetadataPush starts a request of MetadataPush.
func (p AbstractRSocket) MetadataPush(message payload.Payload) {
if p.MP == nil {
logger.Errorf("%s\n", errUnsupportedMetadataPush)
logger.Errorf("%s\n", errUnimplementedMetadataPush)
return
}
p.MP(message)
Expand All @@ -89,7 +89,7 @@ func (p AbstractRSocket) MetadataPush(message payload.Payload) {
// FireAndForget starts a request of FireAndForget.
func (p AbstractRSocket) FireAndForget(message payload.Payload) {
if p.FF == nil {
logger.Errorf("%s\n", errUnsupportedFireAndForget)
logger.Errorf("%s\n", errUnimplementedFireAndForget)
return
}
p.FF(message)
Expand All @@ -98,23 +98,23 @@ func (p AbstractRSocket) FireAndForget(message payload.Payload) {
// RequestResponse starts a request of RequestResponse.
func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono {
if p.RR == nil {
return mono.Error(errUnsupportedRequestResponse)
return mono.Error(errUnimplementedRequestResponse)
}
return p.RR(message)
}

// RequestStream starts a request of RequestStream.
func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux {
if p.RS == nil {
return flux.Error(errUnsupportedRequestStream)
return flux.Error(errUnimplementedRequestStream)
}
return p.RS(message)
}

// RequestChannel starts a request of RequestChannel.
func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux {
if p.RC == nil {
return flux.Error(errUnsupportedRequestChannel)
return flux.Error(errUnimplementedRequestChannel)
}
return p.RC(messages)
}
Expand Down

0 comments on commit f4e64d9

Please sign in to comment.