Skip to content

Commit

Permalink
feat: #66 precisely control variables for TCP deadlines/keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Nov 19, 2020
1 parent f403e56 commit 5049f65
Show file tree
Hide file tree
Showing 32 changed files with 671 additions and 392 deletions.
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -978,13 +978,34 @@ In order to get more info about installation and configuration of the plugins, s

Olric nodes supports setting `KeepAlivePeriod` on TCP sockets.

Server-side:
**Server-side:**

* **config.KeepAlivePeriod**: KeepAlivePeriod denotes whether the operating system should send keep-alive messages on the connection.
##### config.KeepAlivePeriod

Client-side:
KeepAlivePeriod denotes whether the operating system should send keep-alive messages on the connection.

**Client-side:**

* **config.DialTimeout**: Timeout for TCP dial.
##### config.DialTimeout

Timeout for TCP dial. The timeout includes name resolution, if required. When using TCP, and the host in the address
parameter resolves to multiple IP addresses, the timeout is spread over each consecutive dial, such that each is
given an appropriate fraction of the time to connect.

##### config.ReadTimeout

Timeout for socket reads. If reached, commands will fail with a timeout instead of blocking. Use value -1 for no
timeout and 0 for default. The default is config.DefaultReadTimeout

##### config.WriteTimeout

Timeout for socket writes. If reached, commands will fail with a timeout instead of blocking. The default is config.DefaultWriteTimeout

##### config.KeepAlive

KeepAlive specifies the interval between keep-alive probes for an active network connection. If zero, keep-alive probes
are sent with a default value (currently 15 seconds), if supported by the protocol and operating system. Network protocols
or operating systems that do not support keep-alives ignore this field. If negative, keep-alive probes are disabled.

## Architecture

Expand Down
45 changes: 27 additions & 18 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"log"
"os"
"sync"
"time"

"github.com/buraksezer/olric"
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/transport"
Expand All @@ -41,18 +41,18 @@ var (
type Client struct {
config *Config
client *transport.Client
roundRobin *roundRobin
serializer serializer.Serializer
streams *streams
wg sync.WaitGroup
}

// Config includes configuration parameters for the Client.
type Config struct {
Addrs []string
Serializer serializer.Serializer
DialTimeout time.Duration
KeepAlive time.Duration
MaxConn int
Servers []string
Serializer serializer.Serializer
Client *config.Client
// TODO: This item may be moved to config.Client
MaxListenersPerStream int
}

Expand All @@ -61,37 +61,40 @@ func New(c *Config) (*Client, error) {
if c == nil {
return nil, fmt.Errorf("config cannot be nil")
}
if len(c.Addrs) == 0 {
return nil, fmt.Errorf("addrs list cannot be empty")
if len(c.Servers) == 0 {
return nil, fmt.Errorf("servers cannot be empty")
}
if c.Serializer == nil {
c.Serializer = serializer.NewGobSerializer()
}
if c.MaxConn == 0 {
c.MaxConn = 1
}
if c.MaxListenersPerStream <= 0 {
c.MaxListenersPerStream = maxListenersPerStream
}
cc := &transport.ClientConfig{
Addrs: c.Addrs,
DialTimeout: c.DialTimeout,
KeepAlive: c.KeepAlive,
MaxConn: c.MaxConn,
}
client := transport.NewClient(cc)
c.Client.Sanitize()
client := transport.NewClient(c.Client)
// About the hack: This looks weird, but I need to mock client.CreateStream function to test streams
// independently. I don't want to use a mocking library for this. So I created a function named
// createStreamFunction and I overwrite that function in test.
createStreamFunction = client.CreateStream
return &Client{
roundRobin: newRoundRobin(c.Servers),
config: c,
client: client,
serializer: c.Serializer,
streams: &streams{m: make(map[uint64]*stream)},
}, nil
}

// AddServer adds a new server to the servers list. Incoming requests are distributed evenly among the servers.
func (c *Client) AddServer(addr string) {
c.roundRobin.add(addr)
}

// DeleteServer deletes a server from the servers list.
func (c *Client) DeleteServer(addr string) error {
return c.roundRobin.delete(addr)
}

// Ping sends a dummy protocol messsage to the given host. This is useful to
// measure RTT between hosts. It also can be used as aliveness check.
func (c *Client) Ping(addr string) error {
Expand All @@ -100,6 +103,12 @@ func (c *Client) Ping(addr string) error {
return err
}

// Request initiates a request-response cycle to randomly selected host.
func (c *Client) request(req protocol.EncodeDecoder) (protocol.EncodeDecoder, error) {
addr := c.roundRobin.get()
return c.client.RequestTo(addr, req)
}

// Stats exposes some useful metrics to monitor an Olric node.
func (c *Client) Stats(addr string) (stats.Stats, error) {
s := stats.Stats{}
Expand Down
14 changes: 8 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
)

var testConfig = &Config{
DialTimeout: time.Second,
KeepAlive: time.Second,
MaxConn: 10,
Client: &config.Client{
DialTimeout: time.Second,
KeepAlive: time.Second,
MaxConn: 10,
},
}

func getFreePort() (int, error) {
Expand Down Expand Up @@ -74,7 +76,7 @@ func newDB() (*olric.Olric, chan struct{}, error) {
close(done)
}()
time.Sleep(100 * time.Millisecond)
testConfig.Addrs = []string{"127.0.0.1:" + strconv.Itoa(port)}
testConfig.Servers = []string{"127.0.0.1:" + strconv.Itoa(port)}
return db, done, nil
}

Expand All @@ -96,7 +98,7 @@ func TestClient_Ping(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

addr := testConfig.Addrs[0]
addr := testConfig.Servers[0]
err = c.Ping(addr)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down Expand Up @@ -130,7 +132,7 @@ func TestClient_Stats(t *testing.T) {
}
}

addr := testConfig.Addrs[0]
addr := testConfig.Servers[0]
s, err := c.Stats(addr)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down
28 changes: 14 additions & 14 deletions client/dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (d *DMap) Get(key string) (interface{}, error) {
req := protocol.NewDMapMessage(protocol.OpGet)
req.SetDMap(d.name)
req.SetKey(key)
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return nil, err
}
Expand All @@ -56,7 +56,7 @@ func (d *DMap) GetEntry(key string) (*olric.Entry, error) {
req := protocol.NewDMapMessage(protocol.OpGet)
req.SetDMap(d.name)
req.SetKey(key)
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (d *DMap) Put(key string, value interface{}) error {
req.SetExtra(protocol.PutExExtra{
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand All @@ -113,7 +113,7 @@ func (d *DMap) PutEx(key string, value interface{}, timeout time.Duration) error
TTL: timeout.Nanoseconds(),
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand All @@ -126,7 +126,7 @@ func (d *DMap) Delete(key string) error {
req := protocol.NewDMapMessage(protocol.OpDelete)
req.SetDMap(d.name)
req.SetKey(key)
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func (d *DMap) LockWithTimeout(key string, timeout, deadline time.Duration) (*Lo
Timeout: timeout.Nanoseconds(),
Deadline: deadline.Nanoseconds(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (d *DMap) Lock(key string, deadline time.Duration) (*LockContext, error) {
req.SetExtra(protocol.LockExtra{
Deadline: deadline.Nanoseconds(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return nil, err
}
Expand All @@ -209,7 +209,7 @@ func (l *LockContext) Unlock() error {
req.SetDMap(l.name)
req.SetKey(l.key)
req.SetValue(l.token)
resp, err := l.dmap.client.Request(req)
resp, err := l.dmap.request(req)
if err != nil {
return err
}
Expand All @@ -222,7 +222,7 @@ func (l *LockContext) Unlock() error {
func (d *DMap) Destroy() error {
req := protocol.NewDMapMessage(protocol.OpDestroy)
req.SetDMap(d.name)
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *Client) incrDecr(op protocol.OpCode, name, key string, delta int) (int,
req.SetExtra(protocol.AtomicExtra{
Timestamp: time.Now().UnixNano(),
})
resp, err := c.client.Request(req)
resp, err := c.request(req)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (d *DMap) GetPut(key string, value interface{}) (interface{}, error) {
req.SetExtra(protocol.AtomicExtra{
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return nil, err
}
Expand All @@ -313,7 +313,7 @@ func (d *DMap) Expire(key string, timeout time.Duration) error {
TTL: timeout.Nanoseconds(),
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -342,7 +342,7 @@ func (d *DMap) PutIf(key string, value interface{}, flags int16) error {
Flags: flags,
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (d *DMap) PutIfEx(key string, value interface{}, timeout time.Duration, fla
TTL: timeout.Nanoseconds(),
Timestamp: time.Now().UnixNano(),
})
resp, err := d.client.Request(req)
resp, err := d.request(req)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions client/dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (dt *DTopic) Publish(msg interface{}) error {
req := protocol.NewDTopicMessage(protocol.OpDTopicPublish)
req.SetDTopic(dt.name)
req.SetValue(value)
resp, err := dt.client.Request(req)
resp, err := dt.request(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (dt *DTopic) AddListener(f func(olric.DTopicMessage)) (uint64, error) {
ListenerID: listenerID,
StreamID: streamID,
})
resp, err := dt.client.Request(req)
resp, err := dt.request(req)
if err != nil {
_ = dt.removeStreamListener(listenerID)
return 0, err
Expand Down Expand Up @@ -169,7 +169,7 @@ func (dt *DTopic) RemoveListener(listenerID uint64) error {
req.SetExtra(protocol.DTopicRemoveListenerExtra{
ListenerID: listenerID,
})
resp, err := dt.client.Request(req)
resp, err := dt.request(req)
if err != nil {
return err
}
Expand All @@ -184,7 +184,7 @@ func (dt *DTopic) RemoveListener(listenerID uint64) error {
func (dt *DTopic) Destroy() error {
req := protocol.NewDTopicMessage(protocol.OpDTopicDestroy)
req.SetDTopic(dt.name)
resp, err := dt.client.Request(req)
resp, err := dt.request(req)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions client/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package client

import (
"bytes"
"github.com/hashicorp/go-multierror"
"io"
"sync"
"time"

"github.com/buraksezer/olric/internal/protocol"
"github.com/hashicorp/go-multierror"
)

// Pipeline implements pipelining feature for Olric Binary Protocol.
Expand Down Expand Up @@ -251,7 +251,7 @@ func (p *Pipeline) Flush() ([]PipelineResponse, error) {

req := protocol.NewPipelineMessage(protocol.OpPipeline)
req.SetValue(p.buf.Bytes())
resp, err := p.c.client.Request(req)
resp, err := p.c.request(req)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *Cursor) runQueryOnPartition(partID uint64) (olric.QueryResponse, error)
req.SetExtra(protocol.QueryExtra{
PartID: partID,
})
resp, err := c.dm.client.Request(req)
resp, err := c.dm.request(req)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 5049f65

Please sign in to comment.