diff --git a/README.md b/README.md index 9f81731..c7e848f 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,6 @@ And the server side of the above: func main() { server, _ := execrpc.NewServer( execrpc.ServerOptions[model.ExampleRequest, model.ExampleResponse]{ - Codec: codecs.JSONCodec[model.ExampleResponse, model.ExampleRequest]{}, Call: func(d execrpc.Dispatcher, req model.ExampleRequest) model.ExampleResponse { return model.ExampleResponse{ Hello: "Hello " + req.Text + "!", diff --git a/client.go b/client.go index 01435b8..135d1b3 100644 --- a/client.go +++ b/client.go @@ -18,10 +18,20 @@ import ( // is about to be shut down. var ErrShutdown = errors.New("connection is shut down") +const ( + // Signal to server about what codec to use. + envClientCodec = "EXECRPC_CLIENT_CODEC" +) + +// StartClient starts a client for the given options. func StartClient[Q, R any](opts ClientOptions[Q, R]) (*Client[Q, R], error) { if opts.Codec == nil { return nil, errors.New("opts: Codec is required") } + + // Pass default settings to the server. + envhelpers.SetEnvVars(&opts.Env, envClientCodec, opts.Codec.Name()) + rawClient, err := StartClientRaw(opts.ClientRawOptions) if err != nil { return nil, err @@ -33,6 +43,7 @@ func StartClient[Q, R any](opts ClientOptions[Q, R]) (*Client[Q, R], error) { }, nil } +// Client is a strongly typed RPC client. type Client[Q, R any] struct { rawClient *ClientRaw codec codecs.Codec[Q, R] @@ -63,10 +74,12 @@ func (c *Client[Q, R]) Execute(r Q) (R, error) { return resp, nil } +// Close closes the client. func (c *Client[Q, R]) Close() error { return c.rawClient.Close() } +// StartClientRaw starts a untyped client client for the given options. func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error) { if opts.Timeout == 0 { opts.Timeout = time.Second * 10 @@ -115,6 +128,8 @@ func StartClientRaw(opts ClientRawOptions) (*ClientRaw, error) { return client, nil } +// ClientRaw is a raw RPC client. +// Raw means that the client doesn't do any type conversion, a byte slice is what you get. type ClientRaw struct { version uint8 @@ -135,6 +150,7 @@ type ClientRaw struct { pending map[uint32]*call } +// Close closes the server connection and waits for the server process to quit. func (c *ClientRaw) Close() error { if err := c.conn.Close(); err != nil { return c.addErrContext("close", err) @@ -261,11 +277,13 @@ func (c *ClientRaw) send(call *call) error { return call.Request.Write(c.conn) } +// ClientOptions are options for the client. type ClientOptions[Q, R any] struct { ClientRawOptions Codec codecs.Codec[Q, R] } +// ClientRawOptions are options for the raw part of the client. type ClientRawOptions struct { // Version number passed to the server. Version uint8 diff --git a/client_test.go b/client_test.go index 9c8b80f..0b8bbc1 100644 --- a/client_test.go +++ b/client_test.go @@ -86,17 +86,17 @@ func TestExecTyped(t *testing.T) { } c.Run("JSON", func(c *qt.C) { - client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json") + client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}) runBasicTestForClient(c, client) }) c.Run("TOML", func(c *qt.C) { - client := newClient(c, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml") + client := newClient(c, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}) runBasicTestForClient(c, client) }) c.Run("Gob", func(c *qt.C) { - client := newClient(c, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob") + client := newClient(c, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}) runBasicTestForClient(c, client) }) @@ -109,7 +109,7 @@ func TestExecTyped(t *testing.T) { Version: 1, Cmd: "go", Args: []string{"run", "./examples/servers/typed"}, - Env: []string{"EXECRPC_CODEC=json", "EXECRPC_SEND_TWO_LOG_MESSAGES=true"}, + Env: []string{"EXECRPC_SEND_TWO_LOG_MESSAGES=true"}, Timeout: 4 * time.Second, OnMessage: func(msg execrpc.Message) { logMessages = append(logMessages, msg) @@ -128,7 +128,7 @@ func TestExecTyped(t *testing.T) { }) c.Run("Error", func(c *qt.C) { - client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json", "EXECRPC_CALL_SHOULD_FAIL=true") + client := newClient(c, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CALL_SHOULD_FAIL=true") result, err := client.Execute(model.ExampleRequest{Text: "hello"}) c.Assert(err, qt.IsNil) c.Assert(result.Err(), qt.IsNotNil) @@ -160,7 +160,7 @@ func TestExecTyped(t *testing.T) { } func TestExecTypedConcurrent(t *testing.T) { - client := newTestClient(t, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json") + client := newTestClient(t, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}) var g errgroup.Group for i := 0; i < 100; i++ { @@ -195,7 +195,7 @@ func BenchmarkClient(b *testing.B) { const word = "World" b.Run("JSON", func(b *testing.B) { - client := newTestClient(b, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=json") + client := newTestClient(b, codecs.JSONCodec[model.ExampleRequest, model.ExampleResponse]{}) b.RunParallel(func(pb *testing.PB) { for pb.Next() { _, err := client.Execute(model.ExampleRequest{Text: word}) @@ -207,7 +207,7 @@ func BenchmarkClient(b *testing.B) { }) b.Run("TOML", func(b *testing.B) { - client := newTestClient(b, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=toml") + client := newTestClient(b, codecs.TOMLCodec[model.ExampleRequest, model.ExampleResponse]{}) b.RunParallel(func(pb *testing.PB) { for pb.Next() { _, err := client.Execute(model.ExampleRequest{Text: word}) @@ -219,7 +219,7 @@ func BenchmarkClient(b *testing.B) { }) b.Run("Gob", func(b *testing.B) { - client := newTestClient(b, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}, "EXECRPC_CODEC=gob") + client := newTestClient(b, codecs.GobCodec[model.ExampleRequest, model.ExampleResponse]{}) b.RunParallel(func(pb *testing.PB) { for pb.Next() { _, err := client.Execute(model.ExampleRequest{Text: word}) diff --git a/codecs/codecs.go b/codecs/codecs.go index 9f0a8d6..73c2a80 100644 --- a/codecs/codecs.go +++ b/codecs/codecs.go @@ -4,14 +4,34 @@ import ( "bytes" "encoding/gob" "encoding/json" + "errors" + "strings" "github.com/pelletier/go-toml/v2" ) -// Codec defines the interface for a two way conversion between Q and R. +// Codec defines the interface for a two way conversion between Q and R. type Codec[Q, R any] interface { Encode(Q) ([]byte, error) Decode([]byte, *R) error + Name() string +} + +// ErrUnknownCodec is returned when no codec is found for the given name. +var ErrUnknownCodec = errors.New("unknown codec") + +// ForName returns the codec for the given name or ErrUnknownCodec if no codec is found. +func ForName[Q, R any](name string) (Codec[Q, R], error) { + switch strings.ToLower(name) { + case "toml": + return TOMLCodec[Q, R]{}, nil + case "json": + return JSONCodec[Q, R]{}, nil + case "gob": + return GobCodec[Q, R]{}, nil + default: + return nil, ErrUnknownCodec + } } // TOMLCodec is a Codec that uses TOML as the underlying format. @@ -30,6 +50,10 @@ func (c TOMLCodec[Q, R]) Encode(q Q) ([]byte, error) { return b.Bytes(), nil } +func (c TOMLCodec[Q, R]) Name() string { + return "TOML" +} + // JSONCodec is a Codec that uses JSON as the underlying format. type JSONCodec[Q, R any] struct{} @@ -41,6 +65,10 @@ func (c JSONCodec[Q, R]) Encode(q Q) ([]byte, error) { return json.Marshal(q) } +func (c JSONCodec[Q, R]) Name() string { + return "JSON" +} + // GobCodec is a Codec that uses gob as the underlying format. type GobCodec[Q, R any] struct{} @@ -58,3 +86,7 @@ func (c GobCodec[Q, R]) Encode(q Q) ([]byte, error) { } return b.Bytes(), nil } + +func (c GobCodec[Q, R]) Name() string { + return "Gob" +} diff --git a/examples/model/model.go b/examples/model/model.go index 211b829..a582ef0 100644 --- a/examples/model/model.go +++ b/examples/model/model.go @@ -1,14 +1,17 @@ package model +// ExampleRequest is just a simple example request. type ExampleRequest struct { Text string `json:"text"` } +// ExampleResponse is just a simple example response. type ExampleResponse struct { Hello string `json:"hello"` Error *Error `json:"err"` } +// Err is just a simple example error. func (r ExampleResponse) Err() error { if r.Error == nil { // Make sure that resp.Err() == nil. @@ -17,6 +20,7 @@ func (r ExampleResponse) Err() error { return r.Error } +// Error holds an error message. type Error struct { Msg string `json:"msg"` } diff --git a/examples/servers/typed/main.go b/examples/servers/typed/main.go index 97d19cb..aeb9024 100644 --- a/examples/servers/typed/main.go +++ b/examples/servers/typed/main.go @@ -6,7 +6,6 @@ import ( "os" "github.com/bep/execrpc" - "github.com/bep/execrpc/codecs" "github.com/bep/execrpc/examples/model" ) @@ -16,7 +15,6 @@ func main() { // Some test flags from the client. var ( - codecID = os.Getenv("EXECRPC_CODEC") printOutsideServerBefore = os.Getenv("EXECRPC_PRINT_OUTSIDE_SERVER_BEFORE") != "" printOutsideServerAfter = os.Getenv("EXECRPC_PRINT_OUTSIDE_SERVER_AFTER") != "" printInsideServer = os.Getenv("EXECRPC_PRINT_INSIDE_SERVER") != "" @@ -24,23 +22,12 @@ func main() { sendLogMessage = os.Getenv("EXECRPC_SEND_TWO_LOG_MESSAGES") != "" ) - var codec codecs.Codec[model.ExampleResponse, model.ExampleRequest] - switch codecID { - case "toml": - codec = codecs.TOMLCodec[model.ExampleResponse, model.ExampleRequest]{} - case "gob": - codec = codecs.GobCodec[model.ExampleResponse, model.ExampleRequest]{} - default: - codec = codecs.JSONCodec[model.ExampleResponse, model.ExampleRequest]{} - } - if printOutsideServerBefore { fmt.Println("Printing outside server before") } server, err := execrpc.NewServer( execrpc.ServerOptions[model.ExampleRequest, model.ExampleResponse]{ - Codec: codec, Call: func(d execrpc.Dispatcher, req model.ExampleRequest) model.ExampleResponse { if printInsideServer { fmt.Println("Printing inside server") diff --git a/message.go b/message.go index 4b67f33..bc2443c 100644 --- a/message.go +++ b/message.go @@ -29,6 +29,8 @@ func (m *Message) Write(w io.Writer) error { return err } +// Header is the header of a message. +// ID and Size are set by the system. type Header struct { ID uint32 Version uint8 @@ -36,6 +38,7 @@ type Header struct { Size uint32 } +// Read reads the header from the reader. func (h *Header) Read(r io.Reader) error { buf := make([]byte, 10) _, err := io.ReadFull(r, buf) @@ -49,6 +52,7 @@ func (h *Header) Read(r io.Reader) error { return nil } +// Write writes the header to the writer. func (h Header) Write(w io.Writer) error { buff := make([]byte, 10) binary.BigEndian.PutUint32(buff[0:4], h.ID) diff --git a/server.go b/server.go index 7612f4a..8c263a9 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package execrpc import ( + "errors" "fmt" "io" "os" @@ -22,7 +23,7 @@ const ( MessageStatusSystemReservedMax = 99 ) -// NewServerRaw creates a new Server. using the given options. +// NewServerRaw creates a new Server using the given options. func NewServerRaw(opts ServerRawOptions) (*ServerRaw, error) { if opts.Call == nil { return nil, fmt.Errorf("opts: Call function is required") @@ -42,7 +43,13 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) { return nil, fmt.Errorf("opts: Call function is required") } if opts.Codec == nil { - return nil, fmt.Errorf("opts: Codec is required") + if opts.Codec == nil { + var err error + opts.Codec, err = codecs.ForName[R, Q](os.Getenv(envClientCodec)) + if err != nil { + return nil, errors.New("opts: Codec is required") + } + } } var rawServer *ServerRaw @@ -61,7 +68,6 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) { r := opts.Call(rawServer.dispatcher, q) b, err := opts.Codec.Encode(r) if err != nil { - m := Message{ Header: message.Header, Body: []byte(fmt.Sprintf("failed to encode response: %s. Check that client and server uses the same codec.", err)), @@ -93,7 +99,11 @@ func NewServer[Q, R any](opts ServerOptions[Q, R]) (*Server[Q, R], error) { // ServerOptions is the options for a server. type ServerOptions[Q, R any] struct { - Call func(Dispatcher, Q) R + // Call is the function that will be called when a request is received. + Call func(Dispatcher, Q) R + + // Codec is the codec that will be used to encode and decode requests and responses. + // The client will tell the server what codec is in use, so in most cases you should just leave this unset. Codec codecs.Codec[R, Q] } @@ -241,12 +251,13 @@ func (s *ServerRaw) inputOutput() error { return err } +// ServerRawOptions is the options for a raw portion of the server. type ServerRawOptions struct { // Call is the message exhcange between the client and server. // Note that any error returned by this function will be treated as a fatal error and the server is stopped. // Validation errors etc. should be returned in the response message. // The Dispatcher can be used to send messages to the client outside of the request/response loop, e.g. log messages. - // Note that these messages can not have an ID. + // Note that these messages must have ID 0. Call func(Dispatcher, Message) (Message, error) } @@ -254,6 +265,7 @@ type messageDispatcher struct { s *ServerRaw } +// Dispatcher is the interface for dispatching standalone messages to the client, e.g. log messages. type Dispatcher interface { // Send sends one or more message back to the client. // This is normally used for log messages and similar,