Skip to content

Commit

Permalink
feat: replication: implement REPLCONF GETACK & ACK commands
Browse files Browse the repository at this point in the history
  • Loading branch information
mhughdo committed Sep 23, 2024
1 parent 5c4fff9 commit ec93ec2
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 15 deletions.
139 changes: 128 additions & 11 deletions internal/app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
)

const (
defaultListenAddr = ":6379"
defaultListenAddr = ":6379"
replicaPingInterval = 45 * time.Second
replconfGetAckInterval = 30 * time.Second
)

type Server struct {
Expand All @@ -34,6 +36,7 @@ type Server struct {
cfg *config.Config
done chan struct{}
store keyval.KV
offset uint64
queueMu sync.Mutex
clients map[*client.Client]struct{}
cFactory *command.CommandFactory
Expand Down Expand Up @@ -129,6 +132,9 @@ func (s *Server) Start(ctx context.Context) error {
if err := s.startReplication(ctx); err != nil {
return fmt.Errorf("failed to start replication: %v", err)
}
} else {
go s.pingReplicas(ctx)
go s.sendReplConfGetAck(ctx)
}

if err := s.Listen(ctx); err != nil {
Expand Down Expand Up @@ -195,6 +201,9 @@ func (s *Server) handleMasterConnection(ctx context.Context) {
}
s.dispatchResponse(r)
s.handleMasterResponse(ctx, r, reader)
if r.Type == resp.Array {
s.offset += uint64(len(buffer.Bytes()))
}
buffer.Reset()
}
}
Expand Down Expand Up @@ -256,16 +265,19 @@ func (s *Server) handleMasterResponse(ctx context.Context, r *resp.Resp, reader
case resp.Array:
cmdName := strings.ToLower(r.Data.([]*resp.Resp)[0].String())
args := r.Data.([]*resp.Resp)[1:]
cmd, err := s.cFactory.GetCommand(cmdName)
if err != nil {
logger.Error(ctx, "Unknown command from master: %s", cmdName)
return
}
logger.Info(ctx, "Received command from master, cmd: %s, args: %v", cmdName, args)
tmpWriter := resp.NewWriter(&bytes.Buffer{}, resp.RESP3)
err = cmd.Execute(s.masterClient, tmpWriter, args)
if err != nil {
logger.Error(ctx, "Failed to execute command from master: %v", err)
if cmdName == "replconf" && len(args) >= 2 && strings.ToLower(args[0].String()) == "getack" && args[1].String() == "*" {
s.respondToGetAck(ctx)
} else {
cmd, err := s.cFactory.GetCommand(cmdName)
if err != nil {
logger.Error(ctx, "Unknown command from master: %s", cmdName)
return
}
tmpWriter := resp.NewWriter(&bytes.Buffer{}, resp.RESP3)
err = cmd.Execute(s.masterClient, tmpWriter, args)
if err != nil {
logger.Error(ctx, "Failed to execute command from master: %v", err)
}
}
case resp.BulkString:
case resp.SimpleError, resp.BulkError:
Expand All @@ -276,6 +288,19 @@ func (s *Server) handleMasterResponse(ctx context.Context, r *resp.Resp, reader
}
}

func (s *Server) respondToGetAck(ctx context.Context) {
ackMessage := resp.CreateCommand("REPLCONF", "ACK", strconv.FormatUint(s.offset, 10))
_, err := s.masterClient.Writer.Write(ackMessage)
if err != nil {
logger.Error(ctx, "Failed to write REPLCONF ACK to master: %v", err)
}

err = s.masterClient.Writer.Flush()
if err != nil {
logger.Error(ctx, "Failed to flush REPLCONF ACK to master: %v", err)
}
}

func (s *Server) commandSender(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -517,6 +542,98 @@ func (s *Server) propagateCommand(ctx context.Context, r *resp.Resp) {
}
}

func (s *Server) pingReplicas(ctx context.Context) {
ticker := time.NewTicker(replicaPingInterval)
defer ticker.Stop()

for {
select {
case <-s.done:
return
case <-ticker.C:
s.mu.Lock()
for replica := range s.replicas {
go func(replica *client.Client) {
err := s.sendPing(replica)
if err != nil {
logger.Error(ctx, "Failed to ping replica %s: %v", replica.ID, err)
s.removeReplica(ctx, replica)
} else {

Check failure on line 561 in internal/app/server/server.go

View workflow job for this annotation

GitHub Actions / Lint and Test

SA9003: empty branch (staticcheck)

Check failure on line 561 in internal/app/server/server.go

View workflow job for this annotation

GitHub Actions / Lint and Test

SA9003: empty branch (staticcheck)
// logger.Info(ctx, "Successfully pinged replica %s", replica.ID)
}
}(replica)
}
s.mu.Unlock()
}
}
}

func (s *Server) sendReplConfGetAck(ctx context.Context) {
ticker := time.NewTicker(replconfGetAckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.mu.Lock()
replicas := make([]*client.Client, 0, len(s.replicas))
for replica := range s.replicas {
replicas = append(replicas, replica)
}
s.mu.Unlock()

for _, replica := range replicas {
go func(replica *client.Client) {
getAckCmd := resp.CreateCommand("REPLCONF", "GETACK", "*")
_, err := replica.Writer.Write(getAckCmd)
if err != nil {
logger.Error(ctx, "Failed to write REPLCONF GETACK to replica %s: %v", replica.ID, err)
s.removeReplica(ctx, replica)
return
}
err = replica.Writer.Flush()
if err != nil {
if errors.Is(err, net.ErrClosed) {
logger.Info(ctx, "Failed to flush REPLCONF GETACK to replica %s, replica disconnected", replica.ID)
} else {
logger.Error(ctx, "Failed to flush REPLCONF GETACK to replica %s: %v", replica.ID, err)
}
s.removeReplica(ctx, replica)
return
}
}(replica)
}
}
}
}

func (s *Server) sendPing(replica *client.Client) error {
pingCmd := resp.CreatePingCommand()

_, err := replica.Conn().Write(pingCmd)
if err != nil {
return fmt.Errorf("error sending PING to replica %s: %w", replica.ID, err)
}

err = replica.Writer.Flush()
if err != nil {
return fmt.Errorf("error flushing PING to replica %s: %w", replica.ID, err)
}

return nil
}

func (s *Server) removeReplica(ctx context.Context, replica *client.Client) {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.replicas, replica)
replica.Close(ctx)
logger.Info(ctx, "Removed replica %s due to failed PING", replica.ID)
}

func (s *Server) addReplica(c *client.Client) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"io"
"net"
"sync"
"time"

"github.com/codecrafters-io/redis-starter-go/pkg/resp"
Expand All @@ -26,6 +27,8 @@ type Info struct {

type Client struct {
ID string
offset uint64
mu sync.RWMutex
conn net.Conn
authenticated bool
info Info
Expand All @@ -51,6 +54,18 @@ func NewClient(conn net.Conn, messageChan chan<- Message) *Client {
}
}

func (c *Client) UpdateOffset(offset uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.offset = offset
}

func (c *Client) Offset() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.offset
}

func (c *Client) SetRespVersion(version resp.RESPVersion) {
c.Writer.SetVersion(version)
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/command/replconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/codecrafters-io/redis-starter-go/internal/client"
Expand All @@ -25,10 +26,20 @@ func (rc *ReplConf) Execute(c *client.Client, wr *resp.Writer, args []*resp.Resp
port := args[1].String()
c.ListeningPort = port
case "capa":
// if len(args) < 2 {
// return wr.WriteError(errors.New("wrong number of arguments for 'replconf capa' command"))
// }
// We don't need to handle/save the capa arguments
// if len(args) < 2 {
// return wr.WriteError(errors.New("wrong number of arguments for 'replconf capa' command"))
// }
// We don't need to handle/save the capa arguments
case "ack":
if len(args) != 2 {
return wr.WriteError(errors.New("wrong number of arguments for 'replconf ack' command"))
}
offsetStr := args[1].String()
offset, err := strconv.ParseUint(offsetStr, 10, 64)
if err != nil {
return wr.WriteError(fmt.Errorf("invalid offset in REPLCONF ACK: %v", err))
}
c.UpdateOffset(offset)
default:
return wr.WriteError(fmt.Errorf("unknown replconf subcommand: %s", subCommand))
}
Expand Down

0 comments on commit ec93ec2

Please sign in to comment.