Skip to content

Commit

Permalink
refactor: Improve redirection logic #54
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Oct 12, 2020
1 parent 63c6d11 commit fe2d6f0
Show file tree
Hide file tree
Showing 18 changed files with 49 additions and 60 deletions.
6 changes: 3 additions & 3 deletions dmap_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDMap_PutBackup(t *testing.T) {
key := bkey(i)
owner, hkey := dm.db.findPartitionOwner(mname, key)
var backup = db1
if hostCmp(owner, db1.this) {
if cmpMembersByID(owner, db1.this) {
backup = db2
}
partID := db1.getPartitionID(hkey)
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestDMap_DeleteBackup(t *testing.T) {
key := bkey(i)
owner, hkey := dm.db.findPartitionOwner(mname, key)
var backup = db1
if hostCmp(owner, db1.this) {
if cmpMembersByID(owner, db1.this) {
backup = db2
}
partID := db1.getPartitionID(hkey)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestDMap_GetBackup(t *testing.T) {
key := bkey(i)
owner, hkey := dm.db.findPartitionOwner(mname, key)
var kloc = db1
if !hostCmp(owner, db1.this) {
if !cmpMembersByID(owner, db1.this) {
kloc = db2
}

Expand Down
6 changes: 2 additions & 4 deletions dmap_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func (db *Olric) delKeyVal(dm *dmap, hkey uint64, name, key string) error {

func (db *Olric) deleteKey(name, key string) error {
member, hkey := db.findPartitionOwner(name, key)
if !hostCmp(member, db.this) {
if !cmpMembersByName(member, db.this) {
req := protocol.NewDMapMessage(protocol.OpDelete)
req.SetDMap(name)
req.SetKey(key)
_, err := db.redirectTo(member, req)
_, err := db.requestTo(member.String(), req)
return err
}

Expand Down Expand Up @@ -181,8 +181,6 @@ func (db *Olric) deleteKeyValBackup(hkey uint64, name, key string) error {
backupOwners := db.getBackupPartitionOwners(hkey)
var g errgroup.Group
for _, backup := range backupOwners {
if hostCmp(db.this, backup) { continue }

mem := backup
g.Go(func() error {
// TODO: Add retry with backoff
Expand Down
2 changes: 1 addition & 1 deletion dmap_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestDMap_DeleteKeyValFromPreviousOwners(t *testing.T) {

data := []discovery.Member{}
for _, member := range db1.discovery.GetMembers() {
if hostCmp(member, owner) {
if cmpMembersByID(member, owner) {
continue
}
data = append(data, member)
Expand Down
4 changes: 2 additions & 2 deletions dmap_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func (db *Olric) callExpireOnCluster(hkey uint64, w *writeop) error {

func (db *Olric) expire(w *writeop) error {
member, hkey := db.findPartitionOwner(w.dmap, w.key)
if hostCmp(member, db.this) {
if cmpMembersByName(member, db.this) {
// We are on the partition owner.
return db.callExpireOnCluster(hkey, w)
}
// Redirect to the partition owner
req := w.toReq(protocol.OpExpire)
_, err := db.redirectTo(member, req)
_, err := db.requestTo(member.String(), req)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion dmap_expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestDMap_ExpireWriteQuorum(t *testing.T) {
for i := 0; i < 10; i++ {
key := bkey(i)
host, _ := db1.findPartitionOwner(dm.name, key)
if hostCmp(db1.this, host) {
if cmpMembersByID(db1.this, host) {
err = dm.Expire(key, time.Millisecond)
if err != ErrWriteQuorum {
t.Fatalf("Expected ErrWriteQuorum. Got: %v", err)
Expand Down
8 changes: 3 additions & 5 deletions dmap_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ func (db *Olric) lookupOnReplicas(hkey uint64, name, key string) []*version {
// Check backups.
backups := db.getBackupPartitionOwners(hkey)
for _, replica := range backups {
if hostCmp(db.this, replica) { continue }

req := protocol.NewDMapMessage(protocol.OpGetBackup)
req.SetDMap(name)
req.SetKey(key)
Expand Down Expand Up @@ -189,7 +187,7 @@ func (db *Olric) readRepair(name string, dm *dmap, winner *version, versions []*
}

// Sync
if hostCmp(*ver.host, db.this) {
if cmpMembersByID(*ver.host, db.this) {
hkey := db.getHKey(name, winner.data.Key)
w := &writeop{
dmap: name,
Expand Down Expand Up @@ -268,15 +266,15 @@ func (db *Olric) callGetOnCluster(hkey uint64, name, key string) ([]byte, error)
func (db *Olric) get(name, key string) ([]byte, error) {
member, hkey := db.findPartitionOwner(name, key)
// We are on the partition owner
if hostCmp(member, db.this) {
if cmpMembersByName(member, db.this) {
return db.callGetOnCluster(hkey, name, key)
}

// Redirect to the partition owner
req := protocol.NewDMapMessage(protocol.OpGet)
req.SetDMap(name)
req.SetKey(key)
resp, err := db.redirectTo(member, req)
resp, err := db.requestTo(member.String(), req)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions dmap_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func TestDMap_GetReadQuorum(t *testing.T) {
for i := 0; i < 10; i++ {
key := bkey(i)
host, _ := db1.findPartitionOwner(dm.name, key)
if hostCmp(db1.this, host) {
if cmpMembersByID(db1.this, host) {
_, err = dm.Get(key)
if err != ErrReadQuorum {
t.Errorf("Expected ErrReadQuorum. Got: %v", err)
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestDMap_ReadRepair(t *testing.T) {
}
dm3.RLock()
owners := db3.getBackupPartitionOwners(hkey)
if hostCmp(owners[0], db3.this) {
if cmpMembersByID(owners[0], db3.this) {
vdata, err := dm3.storage.Get(hkey)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions dmap_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ func (db *Olric) unlockKey(name, key string, token []byte) error {
// It redirects the request to the partition owner, if required.
func (db *Olric) unlock(name, key string, token []byte) error {
member, _ := db.findPartitionOwner(name, key)
if hostCmp(member, db.this) {
if cmpMembersByName(member, db.this) {
return db.unlockKey(name, key, token)
}
req := protocol.NewDMapMessage(protocol.OpUnlock)
req.SetDMap(name)
req.SetKey(key)
req.SetValue(token)
_, err := db.redirectTo(member, req)
_, err := db.requestTo(member.String(), req)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions dmap_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,13 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error {
// if the key belongs to another host.
func (db *Olric) put(w *writeop) error {
member, hkey := db.findPartitionOwner(w.dmap, w.key)
if hostCmp(member, db.this) {
if cmpMembersByName(member, db.this) {
// We are on the partition owner.
return db.callPutOnCluster(hkey, w)
}
// Redirect to the partition owner.
req := w.toReq(w.opcode)
_, err := db.redirectTo(member, req)
_, err := db.requestTo(member.String(), req)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion dmap_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestDMap_PutWriteQuorum(t *testing.T) {
for i := 0; i < 10; i++ {
key := bkey(i)
host, _ := db1.findPartitionOwner(dm.name, key)
if hostCmp(db1.this, host) {
if cmpMembersByID(db1.this, host) {
err = dm.Put(key, bval(i))
if err != ErrWriteQuorum {
t.Fatalf("Expected ErrWriteQuorum. Got: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion dmap_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *Cursor) runQueryOnOwners(partID uint64) ([]*storage.VData, error) {
owners := c.db.partitions[partID].loadOwners()
var responses []queryResponse
for _, owner := range owners {
if hostCmp(owner, c.db.this) {
if cmpMembersByID(owner, c.db.this) {
response, err := c.db.runLocalQuery(partID, c.name, c.query)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (db *Olric) publishDTopicMessageToAddr(member discovery.Member, topic strin
defer db.wg.Done()
defer sem.Release(1)

if hostCmp(member, db.this) {
if cmpMembersByID(member, db.this) {
// Dispatch messages in this process.
err := db.dtopic.dispatch(topic, msg)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Discovery struct {
// Member represents a node in the cluster.
type Member struct {
Name string
NameHash uint64
ID uint64
Birthdate int64
}
Expand All @@ -106,8 +107,10 @@ func New(log *flog.Logger, c *config.Config) (*Discovery, error) {
buf = append(buf, []byte(c.MemberlistConfig.Name)...)

id := c.Hasher.Sum64(buf)
nameHash := c.Hasher.Sum64([]byte(c.MemberlistConfig.Name))
host := &Member{
Name: c.MemberlistConfig.Name,
NameHash: nameHash,
ID: id,
Birthdate: birthdate,
}
Expand Down
26 changes: 8 additions & 18 deletions olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ var (
ErrKeyTooLarge = errors.New("key too large")

ErrNotImplemented = errors.New("not implemented")

// ErrRedirectionCycle means that a node redirects requests to itself.
ErrRedirectionCycle = errors.New("redirection cycle detected")
)

// ReleaseVersion is the current stable version of Olric
Expand Down Expand Up @@ -659,21 +656,14 @@ func isKeyExpired(ttl int64) bool {
return (time.Now().UnixNano() / 1000000) >= ttl
}

// hostCmp returns true if o1 and o2 is the same.
func hostCmp(o1, o2 discovery.Member) bool {
return o1.ID == o2.ID
// cmpMembersByID returns true if two members denote the same member in the cluster.
func cmpMembersByID(one, two discovery.Member) bool {
// ID variable is calculated by combining member's name and birthdate
return one.ID == two.ID
}

func (db *Olric) redirectTo(member discovery.Member, req protocol.EncodeDecoder) (protocol.EncodeDecoder, error) {
if hostCmp(db.this, member) {
return nil, errors.WithMessage(ErrRedirectionCycle, fmt.Sprintf("OpCode: %v", req.OpCode()))
}
// Here I write the following lines to detect a bug which I suspected. See #Issue:54
if db.this.String() == member.String() {
db.log.V(1).Printf("[ERROR] There are members in the consistent hash ring with the same name and "+
"different IDs. Name: %s, ID of this node: %d, ID of the target node: %d. Please report this.", db.this, db.this.ID, member.ID)
return nil, errors.WithMessage(ErrRedirectionCycle,
fmt.Sprintf("Member names are the same: %s OpCode: %v", db.this, req.OpCode()))
}
return db.requestTo(member.String(), req)
// cmpMembersByName returns true if the two members has the same name in the cluster.
// This function is intended to redirect the requests to the partition owner.
func cmpMembersByName(one, two discovery.Member) bool {
return one.NameHash == two.NameHash
}
6 changes: 3 additions & 3 deletions rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (db *Olric) rebalancePrimaryPartitions() {
}

owner := part.owner()
if hostCmp(owner, db.this) {
if cmpMembersByID(owner, db.this) {
// Already belongs to me.
continue
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (db *Olric) rebalanceBackupPartitions() {
offset := len(owners) - 1 - (db.config.ReplicaCount - 1)
for i := len(owners) - 1; i > offset; i-- {
owner := owners[i]
if hostCmp(db.this, owner) {
if cmpMembersByID(db.this, owner) {
continue
}
ids = append(ids, owner.ID)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (db *Olric) rebalancer() {
func (db *Olric) checkOwnership(part *partition) bool {
owners := part.loadOwners()
for _, owner := range owners {
if hostCmp(owner, db.this) {
if cmpMembersByID(owner, db.this) {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRebalance_Merge(t *testing.T) {

for partID := uint64(0); partID < db1.config.PartitionCount; partID++ {
part := db1.partitions[partID]
if !hostCmp(part.owner(), db1.this) {
if !cmpMembersByID(part.owner(), db1.this) {
if part.length() != 0 {
t.Fatalf("Expected key count is 0 for PartID: %d on %s. Got: %d",
partID, db1.this, part.length())
Expand All @@ -61,7 +61,7 @@ func TestRebalance_Merge(t *testing.T) {

for partID := uint64(0); partID < db2.config.PartitionCount; partID++ {
part := db2.partitions[partID]
if hostCmp(part.owner(), db2.this) {
if cmpMembersByID(part.owner(), db2.this) {
if part.length() == 0 {
t.Fatalf("Expected key count is different than zero for PartID: %d on %s", partID, db2.this)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestRebalance_MergeBackups(t *testing.T) {

part := db.partitions[partID]
for _, backupOwner := range backup.loadOwners() {
if hostCmp(backupOwner, part.owner()) {
if cmpMembersByID(backupOwner, part.owner()) {
t.Fatalf("Partition owner is also backup owner. PartID: %d: %s",
partID, backupOwner)
}
Expand All @@ -218,7 +218,7 @@ func TestRebalance_CheckOwnership(t *testing.T) {
t.Fatalf("Invalid ownership distribution")
}
for _, backupOwner := range backup.loadOwners() {
if hostCmp(backupOwner, part.owner()) {
if cmpMembersByID(backupOwner, part.owner()) {
t.Fatalf("Partition owner is also backup owner. PartID: %d: %s",
partID, backupOwner)
}
Expand Down
14 changes: 7 additions & 7 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member {
i--
continue
}
if !hostCmp(backup, cur) {
if !cmpMembersByID(backup, cur) {
db.log.V(3).Printf("[WARN] One of the backup owners is probably re-joined: %s", cur)
// Delete it.
owners = append(owners[:i], owners[i+1:]...)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member {
for _, backup := range newOwners {
var exists bool
for i, bkp := range owners {
if hostCmp(bkp, backup.(discovery.Member)) {
if cmpMembersByID(bkp, backup.(discovery.Member)) {
exists = true
// Remove it from the current position
owners = append(owners[:i], owners[i+1:]...)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member {
i--
continue
}
if !hostCmp(owner, current) {
if !cmpMembersByID(owner, current) {
db.log.V(4).Printf("[WARN] One of the partitions owners is probably re-joined: %s", current)
owners = append(owners[:i], owners[i+1:]...)
i--
Expand Down Expand Up @@ -213,7 +213,7 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member {

// Here add the new partition newOwner.
for i, owner := range owners {
if hostCmp(owner, newOwner.(discovery.Member)) {
if cmpMembersByID(owner, newOwner.(discovery.Member)) {
// Remove it from the current position
owners = append(owners[:i], owners[i+1:]...)
// Append it again to head
Expand Down Expand Up @@ -318,7 +318,7 @@ func (db *Olric) updateRouting() {
func (db *Olric) processOwnershipReports(reports map[discovery.Member]ownershipReport) {
check := func(member discovery.Member, owners []discovery.Member) bool {
for _, owner := range owners {
if hostCmp(member, owner) {
if cmpMembersByID(member, owner) {
return true
}
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func (db *Olric) checkAndGetCoordinator(id uint64) (discovery.Member, error) {
}

myCoordinator := db.discovery.GetCoordinator()
if !hostCmp(coordinator, myCoordinator) {
if !cmpMembersByID(coordinator, myCoordinator) {
return discovery.Member{}, fmt.Errorf("unrecognized cluster coordinator: %s: %s", coordinator, myCoordinator)
}
return coordinator, nil
Expand All @@ -432,7 +432,7 @@ func (db *Olric) setOwnedPartitionCount() {
var count uint64
for partID := uint64(0); partID < db.config.PartitionCount; partID++ {
part := db.partitions[partID]
if hostCmp(part.owner(), db.this) {
if cmpMembersByID(part.owner(), db.this) {
count++
}
}
Expand Down
Loading

0 comments on commit fe2d6f0

Please sign in to comment.