Skip to content

Commit 0b363ad

Browse files
authored
feat: implement RDB saver (#12)
* feat: replication: send PSYNC command to the master server * feat: add GetReplicationID() * feat: implement rdb saver * feat: improve AuxiliaryField writing in RDBSaver * fix: lint err
1 parent 8769a63 commit 0b363ad

File tree

14 files changed

+979
-10
lines changed

14 files changed

+979
-10
lines changed

.DS_Store

0 Bytes
Binary file not shown.

doc/RDB File Format.html

+500
Large diffs are not rendered by default.

dump2.rdb

99 Bytes
Binary file not shown.

internal/app/server/server.go

+31
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Server struct {
3838
masterAddr string
3939
messageChan chan client.Message
4040
masterClient *client.Client
41+
replicationID string
4142
disconnectChan chan *client.Client
4243
}
4344

@@ -50,6 +51,10 @@ func NewServer(cfg *config.Config) *Server {
5051
parts := strings.Split(replicaOf, " ")
5152
masterAddr = fmt.Sprintf("%s:%s", parts[0], parts[1])
5253
}
54+
var replicationID string
55+
if isMaster {
56+
replicationID = utils.GenerateRandomAlphanumeric(40)
57+
}
5358
s := &Server{
5459
mu: sync.Mutex{},
5560
cfg: cfg,
@@ -61,6 +66,7 @@ func NewServer(cfg *config.Config) *Server {
6166
masterAddr: masterAddr,
6267
messageChan: make(chan client.Message),
6368
masterClient: nil,
69+
replicationID: replicationID,
6470
disconnectChan: make(chan *client.Client),
6571
}
6672
s.cFactory = command.NewCommandFactory(store, cfg, s)
@@ -160,6 +166,10 @@ func (s *Server) startReplication(ctx context.Context) (err error) {
160166
return fmt.Errorf("failed to send REPLCONF to master: %v", err)
161167
}
162168

169+
if err := s.sendPsyncToMaster(ctx); err != nil {
170+
return fmt.Errorf("failed to send PSYNC to master: %v", err)
171+
}
172+
163173
return nil
164174
}
165175

@@ -222,6 +232,21 @@ func (s *Server) sendReplconfToMaster(ctx context.Context) error {
222232
return nil
223233
}
224234

235+
func (s *Server) sendPsyncToMaster(ctx context.Context) error {
236+
psyncCmd := resp.CreatePsyncCommand("?", "-1")
237+
response, err := s.sendAndReceive(psyncCmd)
238+
if err != nil {
239+
return fmt.Errorf("failed to send PSYNC to master: %v", err)
240+
}
241+
242+
if response.Type != resp.SimpleString || !strings.HasPrefix(response.String(), "FULLRESYNC") {
243+
return fmt.Errorf("unexpected response from master for PSYNC: %v", response)
244+
}
245+
246+
logger.Info(ctx, "Successfully sent PSYNC to master and received FULLRESYNC")
247+
return nil
248+
}
249+
225250
func (s *Server) loop(ctx context.Context) {
226251
for {
227252
select {
@@ -342,3 +367,9 @@ func (s *Server) Close(_ context.Context) error {
342367
}
343368
return nil
344369
}
370+
371+
func (s *Server) GetReplicationID() string {
372+
s.mu.Lock()
373+
defer s.mu.Unlock()
374+
return s.replicationID
375+
}

pkg/command/command.go

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var (
1616

1717
type ServerInfoProvider interface {
1818
GetReplicaInfo() []map[string]string
19+
GetReplicationID() string
1920
}
2021

2122
type Command interface {
@@ -74,6 +75,10 @@ func NewCommandFactory(kv keyval.KV, cfg *config.Config, serverInfo ServerInfoPr
7475
"xrange": &XRange{kv: kv},
7576
"xread": &XRead{kv: kv},
7677
"replconf": &ReplConf{},
78+
"psync": &Psync{
79+
serverInfo: serverInfo,
80+
},
81+
"save": &Save{kv: kv},
7782
},
7883
}
7984
}

pkg/command/info.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/codecrafters-io/redis-starter-go/internal/app/server/config"
88
"github.com/codecrafters-io/redis-starter-go/internal/client"
99
"github.com/codecrafters-io/redis-starter-go/pkg/resp"
10-
"github.com/codecrafters-io/redis-starter-go/pkg/utils"
1110
)
1211

1312
const (
@@ -36,7 +35,7 @@ var sections = map[string]SectionInfo{
3635
REPLICATION: {
3736
DynamicFields: map[string]DynamicFieldHandler{
3837
"role": determineRole,
39-
"master_replid": generateMasterReplID,
38+
"master_replid": getMasterReplID,
4039
"master_repl_offset": getMasterReplOffset,
4140
"connected_slaves": func(cfg *config.Config, serverInfo ServerInfoProvider) string {
4241
return fmt.Sprintf("%d", len(serverInfo.GetReplicaInfo()))
@@ -107,10 +106,6 @@ func determineRole(cfg *config.Config, _ ServerInfoProvider) string {
107106
return "master"
108107
}
109108

110-
func generateMasterReplID(_ *config.Config, _ ServerInfoProvider) string {
111-
return utils.GenerateRandomAlphanumeric(40)
112-
}
113-
114109
func getMasterReplOffset(_ *config.Config, _ ServerInfoProvider) string {
115110
return "0"
116111
}
@@ -125,3 +120,7 @@ func buildReplicaInfo(cfg *config.Config, serverInfo ServerInfoProvider) string
125120
}
126121
return sb.String()
127122
}
123+
124+
func getMasterReplID(_ *config.Config, serverInfo ServerInfoProvider) string {
125+
return serverInfo.GetReplicationID()
126+
}

pkg/command/psync.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package command
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/codecrafters-io/redis-starter-go/internal/client"
8+
"github.com/codecrafters-io/redis-starter-go/pkg/resp"
9+
)
10+
11+
type Psync struct {
12+
serverInfo ServerInfoProvider
13+
}
14+
15+
func (p *Psync) Execute(c *client.Client, wr *resp.Writer, args []*resp.Resp) error {
16+
if len(args) != 2 {
17+
return wr.WriteError(errors.New("wrong number of arguments for 'psync' command"))
18+
}
19+
20+
replicationID := args[0].String()
21+
offset := args[1].String()
22+
23+
if replicationID != "?" || offset != "-1" {
24+
return wr.WriteError(errors.New("invalid arguments for 'psync' command"))
25+
}
26+
27+
response := fmt.Sprintf("FULLRESYNC %s 0", p.serverInfo.GetReplicationID())
28+
29+
return wr.WriteSimpleValue(resp.SimpleString, []byte(response))
30+
}
31+
32+
func (p *Psync) IsBlocking(_ []*resp.Resp) bool {
33+
return false
34+
}

pkg/command/replconf.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
type ReplConf struct{}
1313

1414
func (rc *ReplConf) Execute(c *client.Client, wr *resp.Writer, args []*resp.Resp) error {
15-
if len(args) < 2 {
15+
if len(args) < 1 {
1616
return wr.WriteError(errors.New("wrong number of arguments for 'replconf' command"))
1717
}
1818

@@ -25,11 +25,10 @@ func (rc *ReplConf) Execute(c *client.Client, wr *resp.Writer, args []*resp.Resp
2525
port := args[1].String()
2626
c.ListeningPort = port
2727
case "capa":
28-
if len(args) != 2 {
28+
if len(args) < 2 {
2929
return wr.WriteError(errors.New("wrong number of arguments for 'replconf capa' command"))
3030
}
31-
// capability := args[1].String()
32-
// Ignore capability for now
31+
// We don't need to handle/save the capa arguments
3332
default:
3433
return wr.WriteError(fmt.Errorf("unknown replconf subcommand: %s", subCommand))
3534
}

pkg/command/save.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package command
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/codecrafters-io/redis-starter-go/internal/client"
8+
"github.com/codecrafters-io/redis-starter-go/pkg/keyval"
9+
"github.com/codecrafters-io/redis-starter-go/pkg/rdb"
10+
"github.com/codecrafters-io/redis-starter-go/pkg/resp"
11+
)
12+
13+
// SaveCommand implements the SAVE command.
14+
type Save struct {
15+
kv keyval.KV
16+
}
17+
18+
func (c *Save) Execute(cl *client.Client, wr *resp.Writer, args []*resp.Resp) error {
19+
file, err := os.Create("dump2.rdb")
20+
if err != nil {
21+
return wr.WriteError(fmt.Errorf("failed to create dump.rdb: %w", err))
22+
}
23+
defer file.Close()
24+
25+
saver := rdb.NewRDBSaver(c.kv.Export())
26+
if err := saver.SaveRDB(file); err != nil {
27+
return wr.WriteError(fmt.Errorf("failed to save RDB: %w", err))
28+
}
29+
30+
return wr.WriteSimpleValue(resp.SimpleString, []byte("OK"))
31+
}
32+
33+
func (c *Save) IsBlocking(_ []*resp.Resp) bool {
34+
return true
35+
}

pkg/keyval/kv.go

+31
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type Value struct {
4646

4747
type KV interface {
4848
RestoreRDB(data map[string]Value)
49+
GetExpiry(key string) uint64
50+
IsExpired(key string) bool
4951
Get(key string) []byte
5052
Set(key string, value []byte) error
5153
Expire(key string, duration time.Duration)
@@ -57,6 +59,7 @@ type KV interface {
5759
Keys() []string
5860
Type(key string) string
5961
GetStream(key string, createIfNotExists bool) (*Stream, error)
62+
Export() map[string]Value
6063
}
6164

6265
type kv struct {
@@ -188,3 +191,31 @@ func (kv *kv) GetStream(key string, createIfNotExist bool) (*Stream, error) {
188191
}
189192
return s.Data.(*Stream), nil
190193
}
194+
195+
func (kv *kv) GetExpiry(key string) uint64 {
196+
kv.mu.RLock()
197+
defer kv.mu.RUnlock()
198+
if v, ok := kv.store[key]; ok {
199+
return v.Expiry
200+
}
201+
return 0
202+
}
203+
204+
func (kv *kv) IsExpired(key string) bool {
205+
kv.mu.RLock()
206+
defer kv.mu.RUnlock()
207+
if v, ok := kv.store[key]; ok {
208+
return v.Expiry > 0 && v.Expiry < uint64(time.Now().UnixMilli())
209+
}
210+
return false
211+
}
212+
213+
func (kv *kv) Export() map[string]Value {
214+
kv.mu.RLock()
215+
defer kv.mu.RUnlock()
216+
exportedData := make(map[string]Value, len(kv.store))
217+
for k, v := range kv.store {
218+
exportedData[k] = v
219+
}
220+
return exportedData
221+
}

pkg/rdb/rdb.go

+8
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,14 @@ func (p *RDBParser) handlerAuxiliaryField(ctx context.Context) error {
201201
return fmt.Errorf("failed to parse used-mem: %w", err)
202202
}
203203
logger.Info(ctx, "RDB memory usage when created %.2f MB", float64(usedMem)/1024/1024)
204+
case "redis-bits":
205+
redisBits, err := strconv.ParseInt(value, 10, 64)
206+
if err != nil {
207+
return fmt.Errorf("failed to parse redis-bits: %w", err)
208+
}
209+
logger.Info(ctx, "RDB is %d bit", redisBits)
210+
case "aof-base":
211+
logger.Info(ctx, "AOF enabled: %s", value)
204212
}
205213

206214
return nil

0 commit comments

Comments
 (0)