From fe2d6f083f3e83848aa0fdfc33fecde0714590b3 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Mon, 12 Oct 2020 22:18:07 +0300 Subject: [PATCH] refactor: Improve redirection logic #54 --- dmap_backup_test.go | 6 +++--- dmap_delete.go | 6 ++---- dmap_delete_test.go | 2 +- dmap_expire.go | 4 ++-- dmap_expire_test.go | 2 +- dmap_get.go | 8 +++----- dmap_get_test.go | 4 ++-- dmap_lock.go | 4 ++-- dmap_put.go | 4 ++-- dmap_put_test.go | 2 +- dmap_query.go | 2 +- dtopic.go | 2 +- internal/discovery/discovery.go | 3 +++ olric.go | 26 ++++++++------------------ rebalancer.go | 6 +++--- rebalancer_test.go | 8 ++++---- routing.go | 14 +++++++------- stats_test.go | 6 +++--- 18 files changed, 49 insertions(+), 60 deletions(-) diff --git a/dmap_backup_test.go b/dmap_backup_test.go index f73d830f..6d388f0f 100644 --- a/dmap_backup_test.go +++ b/dmap_backup_test.go @@ -49,7 +49,7 @@ func TestDMap_PutBackup(t *testing.T) { key := bkey(i) owner, hkey := dm.db.findPartitionOwner(mname, key) var backup = db1 - if hostCmp(owner, db1.this) { + if cmpMembersByID(owner, db1.this) { backup = db2 } partID := db1.getPartitionID(hkey) @@ -113,7 +113,7 @@ func TestDMap_DeleteBackup(t *testing.T) { key := bkey(i) owner, hkey := dm.db.findPartitionOwner(mname, key) var backup = db1 - if hostCmp(owner, db1.this) { + if cmpMembersByID(owner, db1.this) { backup = db2 } partID := db1.getPartitionID(hkey) @@ -161,7 +161,7 @@ func TestDMap_GetBackup(t *testing.T) { key := bkey(i) owner, hkey := dm.db.findPartitionOwner(mname, key) var kloc = db1 - if !hostCmp(owner, db1.this) { + if !cmpMembersByID(owner, db1.this) { kloc = db2 } diff --git a/dmap_delete.go b/dmap_delete.go index bcccd752..28178767 100644 --- a/dmap_delete.go +++ b/dmap_delete.go @@ -96,11 +96,11 @@ func (db *Olric) delKeyVal(dm *dmap, hkey uint64, name, key string) error { func (db *Olric) deleteKey(name, key string) error { member, hkey := db.findPartitionOwner(name, key) - if !hostCmp(member, db.this) { + if !cmpMembersByName(member, db.this) { req := protocol.NewDMapMessage(protocol.OpDelete) req.SetDMap(name) req.SetKey(key) - _, err := db.redirectTo(member, req) + _, err := db.requestTo(member.String(), req) return err } @@ -181,8 +181,6 @@ func (db *Olric) deleteKeyValBackup(hkey uint64, name, key string) error { backupOwners := db.getBackupPartitionOwners(hkey) var g errgroup.Group for _, backup := range backupOwners { - if hostCmp(db.this, backup) { continue } - mem := backup g.Go(func() error { // TODO: Add retry with backoff diff --git a/dmap_delete_test.go b/dmap_delete_test.go index 8f66a56e..c74c00d5 100644 --- a/dmap_delete_test.go +++ b/dmap_delete_test.go @@ -238,7 +238,7 @@ func TestDMap_DeleteKeyValFromPreviousOwners(t *testing.T) { data := []discovery.Member{} for _, member := range db1.discovery.GetMembers() { - if hostCmp(member, owner) { + if cmpMembersByID(member, owner) { continue } data = append(data, member) diff --git a/dmap_expire.go b/dmap_expire.go index 9c4075bf..b3de78bf 100644 --- a/dmap_expire.go +++ b/dmap_expire.go @@ -118,13 +118,13 @@ func (db *Olric) callExpireOnCluster(hkey uint64, w *writeop) error { func (db *Olric) expire(w *writeop) error { member, hkey := db.findPartitionOwner(w.dmap, w.key) - if hostCmp(member, db.this) { + if cmpMembersByName(member, db.this) { // We are on the partition owner. return db.callExpireOnCluster(hkey, w) } // Redirect to the partition owner req := w.toReq(protocol.OpExpire) - _, err := db.redirectTo(member, req) + _, err := db.requestTo(member.String(), req) return err } diff --git a/dmap_expire_test.go b/dmap_expire_test.go index 48f12b84..ccfda1bb 100644 --- a/dmap_expire_test.go +++ b/dmap_expire_test.go @@ -189,7 +189,7 @@ func TestDMap_ExpireWriteQuorum(t *testing.T) { for i := 0; i < 10; i++ { key := bkey(i) host, _ := db1.findPartitionOwner(dm.name, key) - if hostCmp(db1.this, host) { + if cmpMembersByID(db1.this, host) { err = dm.Expire(key, time.Millisecond) if err != ErrWriteQuorum { t.Fatalf("Expected ErrWriteQuorum. Got: %v", err) diff --git a/dmap_get.go b/dmap_get.go index fa16f5d0..9bda9f1d 100644 --- a/dmap_get.go +++ b/dmap_get.go @@ -138,8 +138,6 @@ func (db *Olric) lookupOnReplicas(hkey uint64, name, key string) []*version { // Check backups. backups := db.getBackupPartitionOwners(hkey) for _, replica := range backups { - if hostCmp(db.this, replica) { continue } - req := protocol.NewDMapMessage(protocol.OpGetBackup) req.SetDMap(name) req.SetKey(key) @@ -189,7 +187,7 @@ func (db *Olric) readRepair(name string, dm *dmap, winner *version, versions []* } // Sync - if hostCmp(*ver.host, db.this) { + if cmpMembersByID(*ver.host, db.this) { hkey := db.getHKey(name, winner.data.Key) w := &writeop{ dmap: name, @@ -268,7 +266,7 @@ func (db *Olric) callGetOnCluster(hkey uint64, name, key string) ([]byte, error) func (db *Olric) get(name, key string) ([]byte, error) { member, hkey := db.findPartitionOwner(name, key) // We are on the partition owner - if hostCmp(member, db.this) { + if cmpMembersByName(member, db.this) { return db.callGetOnCluster(hkey, name, key) } @@ -276,7 +274,7 @@ func (db *Olric) get(name, key string) ([]byte, error) { req := protocol.NewDMapMessage(protocol.OpGet) req.SetDMap(name) req.SetKey(key) - resp, err := db.redirectTo(member, req) + resp, err := db.requestTo(member.String(), req) if err != nil { return nil, err } diff --git a/dmap_get_test.go b/dmap_get_test.go index f653d419..5c49de4d 100644 --- a/dmap_get_test.go +++ b/dmap_get_test.go @@ -243,7 +243,7 @@ func TestDMap_GetReadQuorum(t *testing.T) { for i := 0; i < 10; i++ { key := bkey(i) host, _ := db1.findPartitionOwner(dm.name, key) - if hostCmp(db1.this, host) { + if cmpMembersByID(db1.this, host) { _, err = dm.Get(key) if err != ErrReadQuorum { t.Errorf("Expected ErrReadQuorum. Got: %v", err) @@ -328,7 +328,7 @@ func TestDMap_ReadRepair(t *testing.T) { } dm3.RLock() owners := db3.getBackupPartitionOwners(hkey) - if hostCmp(owners[0], db3.this) { + if cmpMembersByID(owners[0], db3.this) { vdata, err := dm3.storage.Get(hkey) if err != nil { t.Fatalf("Expected nil. Got: %v", err) diff --git a/dmap_lock.go b/dmap_lock.go index f288b8c7..d18b4e2d 100644 --- a/dmap_lock.go +++ b/dmap_lock.go @@ -82,14 +82,14 @@ func (db *Olric) unlockKey(name, key string, token []byte) error { // It redirects the request to the partition owner, if required. func (db *Olric) unlock(name, key string, token []byte) error { member, _ := db.findPartitionOwner(name, key) - if hostCmp(member, db.this) { + if cmpMembersByName(member, db.this) { return db.unlockKey(name, key, token) } req := protocol.NewDMapMessage(protocol.OpUnlock) req.SetDMap(name) req.SetKey(key) req.SetValue(token) - _, err := db.redirectTo(member, req) + _, err := db.requestTo(member.String(), req) return err } diff --git a/dmap_put.go b/dmap_put.go index e02e648a..135c5d69 100644 --- a/dmap_put.go +++ b/dmap_put.go @@ -304,13 +304,13 @@ func (db *Olric) callPutOnCluster(hkey uint64, w *writeop) error { // if the key belongs to another host. func (db *Olric) put(w *writeop) error { member, hkey := db.findPartitionOwner(w.dmap, w.key) - if hostCmp(member, db.this) { + if cmpMembersByName(member, db.this) { // We are on the partition owner. return db.callPutOnCluster(hkey, w) } // Redirect to the partition owner. req := w.toReq(w.opcode) - _, err := db.redirectTo(member, req) + _, err := db.requestTo(member.String(), req) return err } diff --git a/dmap_put_test.go b/dmap_put_test.go index e6ab801d..a78c7a20 100644 --- a/dmap_put_test.go +++ b/dmap_put_test.go @@ -187,7 +187,7 @@ func TestDMap_PutWriteQuorum(t *testing.T) { for i := 0; i < 10; i++ { key := bkey(i) host, _ := db1.findPartitionOwner(dm.name, key) - if hostCmp(db1.this, host) { + if cmpMembersByID(db1.this, host) { err = dm.Put(key, bval(i)) if err != ErrWriteQuorum { t.Fatalf("Expected ErrWriteQuorum. Got: %v", err) diff --git a/dmap_query.go b/dmap_query.go index a4a28893..222bf082 100644 --- a/dmap_query.go +++ b/dmap_query.go @@ -174,7 +174,7 @@ func (c *Cursor) runQueryOnOwners(partID uint64) ([]*storage.VData, error) { owners := c.db.partitions[partID].loadOwners() var responses []queryResponse for _, owner := range owners { - if hostCmp(owner, c.db.this) { + if cmpMembersByID(owner, c.db.this) { response, err := c.db.runLocalQuery(partID, c.name, c.query) if err != nil { return nil, err diff --git a/dtopic.go b/dtopic.go index c2a61f04..96afe5b9 100644 --- a/dtopic.go +++ b/dtopic.go @@ -236,7 +236,7 @@ func (db *Olric) publishDTopicMessageToAddr(member discovery.Member, topic strin defer db.wg.Done() defer sem.Release(1) - if hostCmp(member, db.this) { + if cmpMembersByID(member, db.this) { // Dispatch messages in this process. err := db.dtopic.dispatch(topic, msg) if err != nil { diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index b353e904..03c43983 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -82,6 +82,7 @@ type Discovery struct { // Member represents a node in the cluster. type Member struct { Name string + NameHash uint64 ID uint64 Birthdate int64 } @@ -106,8 +107,10 @@ func New(log *flog.Logger, c *config.Config) (*Discovery, error) { buf = append(buf, []byte(c.MemberlistConfig.Name)...) id := c.Hasher.Sum64(buf) + nameHash := c.Hasher.Sum64([]byte(c.MemberlistConfig.Name)) host := &Member{ Name: c.MemberlistConfig.Name, + NameHash: nameHash, ID: id, Birthdate: birthdate, } diff --git a/olric.go b/olric.go index 2d3e9a47..0c51a07e 100644 --- a/olric.go +++ b/olric.go @@ -64,9 +64,6 @@ var ( ErrKeyTooLarge = errors.New("key too large") ErrNotImplemented = errors.New("not implemented") - - // ErrRedirectionCycle means that a node redirects requests to itself. - ErrRedirectionCycle = errors.New("redirection cycle detected") ) // ReleaseVersion is the current stable version of Olric @@ -659,21 +656,14 @@ func isKeyExpired(ttl int64) bool { return (time.Now().UnixNano() / 1000000) >= ttl } -// hostCmp returns true if o1 and o2 is the same. -func hostCmp(o1, o2 discovery.Member) bool { - return o1.ID == o2.ID +// cmpMembersByID returns true if two members denote the same member in the cluster. +func cmpMembersByID(one, two discovery.Member) bool { + // ID variable is calculated by combining member's name and birthdate + return one.ID == two.ID } -func (db *Olric) redirectTo(member discovery.Member, req protocol.EncodeDecoder) (protocol.EncodeDecoder, error) { - if hostCmp(db.this, member) { - return nil, errors.WithMessage(ErrRedirectionCycle, fmt.Sprintf("OpCode: %v", req.OpCode())) - } - // Here I write the following lines to detect a bug which I suspected. See #Issue:54 - if db.this.String() == member.String() { - db.log.V(1).Printf("[ERROR] There are members in the consistent hash ring with the same name and "+ - "different IDs. Name: %s, ID of this node: %d, ID of the target node: %d. Please report this.", db.this, db.this.ID, member.ID) - return nil, errors.WithMessage(ErrRedirectionCycle, - fmt.Sprintf("Member names are the same: %s OpCode: %v", db.this, req.OpCode())) - } - return db.requestTo(member.String(), req) +// cmpMembersByName returns true if the two members has the same name in the cluster. +// This function is intended to redirect the requests to the partition owner. +func cmpMembersByName(one, two discovery.Member) bool { + return one.NameHash == two.NameHash } diff --git a/rebalancer.go b/rebalancer.go index 0402516d..276a4f45 100644 --- a/rebalancer.go +++ b/rebalancer.go @@ -164,7 +164,7 @@ func (db *Olric) rebalancePrimaryPartitions() { } owner := part.owner() - if hostCmp(owner, db.this) { + if cmpMembersByID(owner, db.this) { // Already belongs to me. continue } @@ -206,7 +206,7 @@ func (db *Olric) rebalanceBackupPartitions() { offset := len(owners) - 1 - (db.config.ReplicaCount - 1) for i := len(owners) - 1; i > offset; i-- { owner := owners[i] - if hostCmp(db.this, owner) { + if cmpMembersByID(db.this, owner) { continue } ids = append(ids, owner.ID) @@ -262,7 +262,7 @@ func (db *Olric) rebalancer() { func (db *Olric) checkOwnership(part *partition) bool { owners := part.loadOwners() for _, owner := range owners { - if hostCmp(owner, db.this) { + if cmpMembersByID(owner, db.this) { return true } } diff --git a/rebalancer_test.go b/rebalancer_test.go index d4600b95..68e5c615 100644 --- a/rebalancer_test.go +++ b/rebalancer_test.go @@ -51,7 +51,7 @@ func TestRebalance_Merge(t *testing.T) { for partID := uint64(0); partID < db1.config.PartitionCount; partID++ { part := db1.partitions[partID] - if !hostCmp(part.owner(), db1.this) { + if !cmpMembersByID(part.owner(), db1.this) { if part.length() != 0 { t.Fatalf("Expected key count is 0 for PartID: %d on %s. Got: %d", partID, db1.this, part.length()) @@ -61,7 +61,7 @@ func TestRebalance_Merge(t *testing.T) { for partID := uint64(0); partID < db2.config.PartitionCount; partID++ { part := db2.partitions[partID] - if hostCmp(part.owner(), db2.this) { + if cmpMembersByID(part.owner(), db2.this) { if part.length() == 0 { t.Fatalf("Expected key count is different than zero for PartID: %d on %s", partID, db2.this) } @@ -192,7 +192,7 @@ func TestRebalance_MergeBackups(t *testing.T) { part := db.partitions[partID] for _, backupOwner := range backup.loadOwners() { - if hostCmp(backupOwner, part.owner()) { + if cmpMembersByID(backupOwner, part.owner()) { t.Fatalf("Partition owner is also backup owner. PartID: %d: %s", partID, backupOwner) } @@ -218,7 +218,7 @@ func TestRebalance_CheckOwnership(t *testing.T) { t.Fatalf("Invalid ownership distribution") } for _, backupOwner := range backup.loadOwners() { - if hostCmp(backupOwner, part.owner()) { + if cmpMembersByID(backupOwner, part.owner()) { t.Fatalf("Partition owner is also backup owner. PartID: %d: %s", partID, backupOwner) } diff --git a/routing.go b/routing.go index 43a38cb9..e6a2b818 100644 --- a/routing.go +++ b/routing.go @@ -90,7 +90,7 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member { i-- continue } - if !hostCmp(backup, cur) { + if !cmpMembersByID(backup, cur) { db.log.V(3).Printf("[WARN] One of the backup owners is probably re-joined: %s", cur) // Delete it. owners = append(owners[:i], owners[i+1:]...) @@ -134,7 +134,7 @@ func (db *Olric) distributeBackups(partID uint64) []discovery.Member { for _, backup := range newOwners { var exists bool for i, bkp := range owners { - if hostCmp(bkp, backup.(discovery.Member)) { + if cmpMembersByID(bkp, backup.(discovery.Member)) { exists = true // Remove it from the current position owners = append(owners[:i], owners[i+1:]...) @@ -175,7 +175,7 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member { i-- continue } - if !hostCmp(owner, current) { + if !cmpMembersByID(owner, current) { db.log.V(4).Printf("[WARN] One of the partitions owners is probably re-joined: %s", current) owners = append(owners[:i], owners[i+1:]...) i-- @@ -213,7 +213,7 @@ func (db *Olric) distributePrimaryCopies(partID uint64) []discovery.Member { // Here add the new partition newOwner. for i, owner := range owners { - if hostCmp(owner, newOwner.(discovery.Member)) { + if cmpMembersByID(owner, newOwner.(discovery.Member)) { // Remove it from the current position owners = append(owners[:i], owners[i+1:]...) // Append it again to head @@ -318,7 +318,7 @@ func (db *Olric) updateRouting() { func (db *Olric) processOwnershipReports(reports map[discovery.Member]ownershipReport) { check := func(member discovery.Member, owners []discovery.Member) bool { for _, owner := range owners { - if hostCmp(member, owner) { + if cmpMembersByID(member, owner) { return true } } @@ -422,7 +422,7 @@ func (db *Olric) checkAndGetCoordinator(id uint64) (discovery.Member, error) { } myCoordinator := db.discovery.GetCoordinator() - if !hostCmp(coordinator, myCoordinator) { + if !cmpMembersByID(coordinator, myCoordinator) { return discovery.Member{}, fmt.Errorf("unrecognized cluster coordinator: %s: %s", coordinator, myCoordinator) } return coordinator, nil @@ -432,7 +432,7 @@ func (db *Olric) setOwnedPartitionCount() { var count uint64 for partID := uint64(0); partID < db.config.PartitionCount; partID++ { part := db.partitions[partID] - if hostCmp(part.owner(), db.this) { + if cmpMembersByID(part.owner(), db.this) { count++ } } diff --git a/stats_test.go b/stats_test.go index a9706610..4ef859be 100644 --- a/stats_test.go +++ b/stats_test.go @@ -47,7 +47,7 @@ func TestStatsStandalone(t *testing.T) { t.Fatalf("Expected nil. Got: %v", err) } - if !hostCmp(s.ClusterCoordinator, db.this) { + if !cmpMembersByID(s.ClusterCoordinator, db.this) { t.Fatalf("Expected cluster coordinator: %v. Got: %v", db.this, s.ClusterCoordinator) } @@ -64,7 +64,7 @@ func TestStatsStandalone(t *testing.T) { if part.Length <= 0 { t.Fatalf("Unexpected Length: %d", part.Length) } - if !hostCmp(part.Owner, db.this) { + if !cmpMembersByID(part.Owner, db.this) { t.Fatalf("Expected partition owner: %s. Got: %s", db.this, part.Owner) } } @@ -136,7 +136,7 @@ func TestStatsCluster(t *testing.T) { t.Fatalf("Expected nil. Got: %v", err) } - if !hostCmp(s.ClusterCoordinator, db1.this) { + if !cmpMembersByID(s.ClusterCoordinator, db1.this) { t.Fatalf("Expected cluster coordinator: %v. Got: %v", db1.this, s.ClusterCoordinator) } })