Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replication: implement REPLCONF GETACK & ACK commands #15

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 126 additions & 11 deletions internal/app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
)

const (
defaultListenAddr = ":6379"
defaultListenAddr = ":6379"
replicaPingInterval = 45 * time.Second
replconfGetAckInterval = 30 * time.Second
)

type Server struct {
Expand All @@ -34,6 +36,7 @@ type Server struct {
cfg *config.Config
done chan struct{}
store keyval.KV
offset uint64
queueMu sync.Mutex
clients map[*client.Client]struct{}
cFactory *command.CommandFactory
Expand Down Expand Up @@ -129,6 +132,9 @@ func (s *Server) Start(ctx context.Context) error {
if err := s.startReplication(ctx); err != nil {
return fmt.Errorf("failed to start replication: %v", err)
}
} else {
go s.pingReplicas(ctx)
go s.sendReplConfGetAck(ctx)
}

if err := s.Listen(ctx); err != nil {
Expand Down Expand Up @@ -195,6 +201,9 @@ func (s *Server) handleMasterConnection(ctx context.Context) {
}
s.dispatchResponse(r)
s.handleMasterResponse(ctx, r, reader)
if r.Type == resp.Array {
s.offset += uint64(len(buffer.Bytes()))
}
buffer.Reset()
}
}
Expand Down Expand Up @@ -256,16 +265,19 @@ func (s *Server) handleMasterResponse(ctx context.Context, r *resp.Resp, reader
case resp.Array:
cmdName := strings.ToLower(r.Data.([]*resp.Resp)[0].String())
args := r.Data.([]*resp.Resp)[1:]
cmd, err := s.cFactory.GetCommand(cmdName)
if err != nil {
logger.Error(ctx, "Unknown command from master: %s", cmdName)
return
}
logger.Info(ctx, "Received command from master, cmd: %s, args: %v", cmdName, args)
tmpWriter := resp.NewWriter(&bytes.Buffer{}, resp.RESP3)
err = cmd.Execute(s.masterClient, tmpWriter, args)
if err != nil {
logger.Error(ctx, "Failed to execute command from master: %v", err)
if cmdName == "replconf" && len(args) >= 2 && strings.ToLower(args[0].String()) == "getack" && args[1].String() == "*" {
s.respondToGetAck(ctx)
} else {
cmd, err := s.cFactory.GetCommand(cmdName)
if err != nil {
logger.Error(ctx, "Unknown command from master: %s", cmdName)
return
}
tmpWriter := resp.NewWriter(&bytes.Buffer{}, resp.RESP3)
err = cmd.Execute(s.masterClient, tmpWriter, args)
if err != nil {
logger.Error(ctx, "Failed to execute command from master: %v", err)
}
}
case resp.BulkString:
case resp.SimpleError, resp.BulkError:
Expand All @@ -276,6 +288,19 @@ func (s *Server) handleMasterResponse(ctx context.Context, r *resp.Resp, reader
}
}

func (s *Server) respondToGetAck(ctx context.Context) {
ackMessage := resp.CreateCommand("REPLCONF", "ACK", strconv.FormatUint(s.offset, 10))
_, err := s.masterClient.Writer.Write(ackMessage)
if err != nil {
logger.Error(ctx, "Failed to write REPLCONF ACK to master: %v", err)
}

err = s.masterClient.Writer.Flush()
if err != nil {
logger.Error(ctx, "Failed to flush REPLCONF ACK to master: %v", err)
}
}

func (s *Server) commandSender(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -517,6 +542,96 @@ func (s *Server) propagateCommand(ctx context.Context, r *resp.Resp) {
}
}

func (s *Server) pingReplicas(ctx context.Context) {
ticker := time.NewTicker(replicaPingInterval)
defer ticker.Stop()

for {
select {
case <-s.done:
return
case <-ticker.C:
s.mu.Lock()
for replica := range s.replicas {
go func(replica *client.Client) {
err := s.sendPing(replica)
if err != nil {
logger.Error(ctx, "Failed to ping replica %s: %v", replica.ID, err)
s.removeReplica(ctx, replica)
}
}(replica)
}
s.mu.Unlock()
}
}
}

func (s *Server) sendReplConfGetAck(ctx context.Context) {
ticker := time.NewTicker(replconfGetAckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.mu.Lock()
replicas := make([]*client.Client, 0, len(s.replicas))
for replica := range s.replicas {
replicas = append(replicas, replica)
}
s.mu.Unlock()

for _, replica := range replicas {
go func(replica *client.Client) {
getAckCmd := resp.CreateCommand("REPLCONF", "GETACK", "*")
_, err := replica.Writer.Write(getAckCmd)
if err != nil {
logger.Error(ctx, "Failed to write REPLCONF GETACK to replica %s: %v", replica.ID, err)
s.removeReplica(ctx, replica)
return
}
err = replica.Writer.Flush()
if err != nil {
if errors.Is(err, net.ErrClosed) {
logger.Info(ctx, "Failed to flush REPLCONF GETACK to replica %s, replica disconnected", replica.ID)
} else {
logger.Error(ctx, "Failed to flush REPLCONF GETACK to replica %s: %v", replica.ID, err)
}
s.removeReplica(ctx, replica)
return
}
}(replica)
}
}
}
}

func (s *Server) sendPing(replica *client.Client) error {
pingCmd := resp.CreatePingCommand()

_, err := replica.Conn().Write(pingCmd)
if err != nil {
return fmt.Errorf("error sending PING to replica %s: %w", replica.ID, err)
}

err = replica.Writer.Flush()
if err != nil {
return fmt.Errorf("error flushing PING to replica %s: %w", replica.ID, err)
}

return nil
}

func (s *Server) removeReplica(ctx context.Context, replica *client.Client) {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.replicas, replica)
replica.Close(ctx)
logger.Info(ctx, "Removed replica %s due to failed PING", replica.ID)
}

func (s *Server) addReplica(c *client.Client) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"io"
"net"
"sync"
"time"

"github.com/codecrafters-io/redis-starter-go/pkg/resp"
Expand All @@ -26,6 +27,8 @@ type Info struct {

type Client struct {
ID string
offset uint64
mu sync.RWMutex
conn net.Conn
authenticated bool
info Info
Expand All @@ -51,6 +54,18 @@ func NewClient(conn net.Conn, messageChan chan<- Message) *Client {
}
}

func (c *Client) UpdateOffset(offset uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.offset = offset
}

func (c *Client) Offset() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.offset
}

func (c *Client) SetRespVersion(version resp.RESPVersion) {
c.Writer.SetVersion(version)
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/command/replconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/codecrafters-io/redis-starter-go/internal/client"
Expand All @@ -25,10 +26,20 @@ func (rc *ReplConf) Execute(c *client.Client, wr *resp.Writer, args []*resp.Resp
port := args[1].String()
c.ListeningPort = port
case "capa":
// if len(args) < 2 {
// return wr.WriteError(errors.New("wrong number of arguments for 'replconf capa' command"))
// }
// We don't need to handle/save the capa arguments
// if len(args) < 2 {
// return wr.WriteError(errors.New("wrong number of arguments for 'replconf capa' command"))
// }
// We don't need to handle/save the capa arguments
case "ack":
if len(args) != 2 {
return wr.WriteError(errors.New("wrong number of arguments for 'replconf ack' command"))
}
offsetStr := args[1].String()
offset, err := strconv.ParseUint(offsetStr, 10, 64)
if err != nil {
return wr.WriteError(fmt.Errorf("invalid offset in REPLCONF ACK: %v", err))
}
c.UpdateOffset(offset)
default:
return wr.WriteError(fmt.Errorf("unknown replconf subcommand: %s", subCommand))
}
Expand Down
Loading