Skip to content

Commit

Permalink
simplify index
Browse files Browse the repository at this point in the history
  • Loading branch information
mjudeikis committed Dec 10, 2023
1 parent 64db180 commit 323486f
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 71 deletions.
43 changes: 29 additions & 14 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ import (

// Index implements a mapping from logical cluster to (shard) URL.
type Index interface {
Lookup(path logicalcluster.Path) (shard string, cluster, canonicalPath logicalcluster.Path, found bool)
LookupURL(logicalCluster logicalcluster.Path) (url string, canonicalPath logicalcluster.Path, found bool)
Lookup(path logicalcluster.Path) Result
LookupURL(logicalCluster logicalcluster.Path) Result
}

// Result is the result of a lookup.
type Result struct {
URL string
Found bool
Shard string
// Cluster canonical path
Cluster logicalcluster.Name
}

// PathRewriter can rewrite a logical cluster path before the actual mapping through
Expand Down Expand Up @@ -188,7 +197,7 @@ func (c *State) DeleteShard(shardName string) {
delete(c.shardClusterParentCluster, shardName)
}

func (c *State) Lookup(path logicalcluster.Path) (shard string, cluster logicalcluster.Name, found bool) {
func (c *State) Lookup(path logicalcluster.Path) Result {
segments := strings.Split(path.String(), ":")

for _, rewriter := range c.rewriters {
Expand All @@ -198,13 +207,16 @@ func (c *State) Lookup(path logicalcluster.Path) (shard string, cluster logicalc
c.lock.RLock()
defer c.lock.RUnlock()

var shard string
var cluster logicalcluster.Name

// walk through index graph to find the final logical cluster and shard
for i, s := range segments {
if i == 0 {
var found bool
shard, found = c.clusterShards[logicalcluster.Name(s)]
if !found {
return "", "", false
return Result{}
}
cluster = logicalcluster.Name(s)
continue
Expand All @@ -213,27 +225,30 @@ func (c *State) Lookup(path logicalcluster.Path) (shard string, cluster logicalc
var found bool
cluster, found = c.shardWorkspaceNameCluster[shard][cluster][s]
if !found {
return "", "", false
return Result{}
}
shard, found = c.clusterShards[cluster]
if !found {
return "", "", false
return Result{}
}
}

return shard, cluster, true
return Result{Shard: shard, Cluster: cluster, Found: true}
}

func (c *State) LookupURL(path logicalcluster.Path) (url string, found bool) {
shard, cluster, found := c.Lookup(path)
if !found {
return "", false
func (c *State) LookupURL(path logicalcluster.Path) Result {
result := c.Lookup(path)
if !result.Found {
return Result{}
}

baseURL, found := c.shardBaseURLs[shard]
baseURL, found := c.shardBaseURLs[result.Shard]
if !found {
return "", false
return Result{}
}

return strings.TrimSuffix(baseURL, "/") + cluster.Path().RequestPath(), true
return Result{
URL: strings.TrimSuffix(baseURL, "/") + result.Cluster.Path().RequestPath(),
Found: true,
}
}
98 changes: 49 additions & 49 deletions pkg/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,21 @@ func TestLookup(t *testing.T) {
target.UpsertLogicalCluster(shardName, lc)
}
}
actualShard, actualCluster, found := target.Lookup(scenario.targetPath)
if scenario.expectFound && !found {
r := target.Lookup(scenario.targetPath)
if scenario.expectFound && !r.Found {
t.Fatalf("expected to lookup the path = %v", scenario.targetPath)
}
if !scenario.expectFound && found {
if !scenario.expectFound && r.Found {
t.Errorf("didn't expect to lookup the path = %v", scenario.targetPath)
}
if !scenario.expectFound {
return
}
if actualShard != scenario.expectedShard {
t.Errorf("unexpected shard = %v, for path = %v, expected = %v", actualShard, scenario.targetPath, scenario.expectedShard)
if r.Shard != scenario.expectedShard {
t.Errorf("unexpected shard = %v, for path = %v, expected = %v", r.Shard, scenario.targetPath, scenario.expectedShard)
}
if actualCluster != scenario.expectedCluster {
t.Errorf("unexpected cluster = %v, for path = %v, expected = %v", actualCluster, scenario.targetPath, scenario.expectedCluster)
if r.Cluster != scenario.expectedCluster {
t.Errorf("unexpected cluster = %v, for path = %v, expected = %v", r.Cluster, scenario.targetPath, scenario.expectedCluster)
}
})
}
Expand All @@ -278,16 +278,16 @@ func TestDeleteShard(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
target.UpsertLogicalCluster("amber", newLogicalCluster("43"))

shard, cluster, found := target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "amber", "43", true)
r := target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "amber", "43", true)

// delete the shard and ensure we cannot look up a path on it
target.DeleteShard("amber")
shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "", "", false)
r = target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "", "", false)

shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "34", true)
r = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "34", true)
}

func TestDeleteLogicalCluster(t *testing.T) {
Expand All @@ -301,17 +301,17 @@ func TestDeleteLogicalCluster(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("root"))
target.UpsertLogicalCluster("root", newLogicalCluster("34"))

shard, cluster, found := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "34", true)
r := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "34", true)

// ensure that after deleting the logical cluster it cannot be looked up
target.DeleteLogicalCluster("root", newLogicalCluster("34"))

shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "", "", false)
r = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "", "", false)

shard, cluster, found = target.Lookup(logicalcluster.NewPath("root"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "root", true)
r = target.Lookup(logicalcluster.NewPath("root"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "root", true)
}

func TestDeleteWorkspace(t *testing.T) {
Expand All @@ -327,16 +327,16 @@ func TestDeleteWorkspace(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
target.UpsertLogicalCluster("root", newLogicalCluster("43"))

shard, cluster, found := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "34", true)
r := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "34", true)

target.DeleteWorkspace("root", newWorkspace("org", "root", "34"))

shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "", "", false)
r = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "", "", false)

shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "43", true)
r = target.Lookup(logicalcluster.NewPath("root:org1"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "43", true)
}

func TestUpsertLogicalCluster(t *testing.T) {
Expand All @@ -348,12 +348,12 @@ func TestUpsertLogicalCluster(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("root"))
target.UpsertLogicalCluster("root", newLogicalCluster("34"))

shard, cluster, found := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "34", true)
r := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "34", true)

target.UpsertLogicalCluster("amber", newLogicalCluster("34"))
shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "amber", "34", true)
r = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "amber", "34", true)
}

// Since LookupURL uses Lookup method the following test is just a smoke tests.
Expand All @@ -365,20 +365,20 @@ func TestLookupURL(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("root"))
target.UpsertLogicalCluster("root", newLogicalCluster("34"))

url, found := target.LookupURL(logicalcluster.NewPath("root:org"))
if !found {
r := target.LookupURL(logicalcluster.NewPath("root:org"))
if !r.Found {
t.Fatalf("expected to find a URL for %q path", "root:org")
}
if url != "https://root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", url, "https://root.io/clusters/34", "root:org")
if r.URL != "https://root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", r.URL, "https://root.io/clusters/34", "root:org")
}

url, found = target.LookupURL(logicalcluster.NewPath("root:org:rh"))
if found {
r = target.LookupURL(logicalcluster.NewPath("root:org:rh"))
if r.Found {
t.Fatalf("didn't expected to find a URL for %q path", "root:org:rh")
}
if len(url) > 0 {
t.Fatalf("unexpected url = %v returned for %q path", url, "root:org:rh")
if len(r.URL) > 0 {
t.Fatalf("unexpected url = %v returned for %q path", r.URL, "root:org:rh")
}
}

Expand All @@ -390,21 +390,21 @@ func TestUpsertShard(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("root"))
target.UpsertLogicalCluster("root", newLogicalCluster("34"))

url, found := target.LookupURL(logicalcluster.NewPath("root:org"))
if !found {
r := target.LookupURL(logicalcluster.NewPath("root:org"))
if !r.Found {
t.Fatalf("expected to find a URL for %q path", "root:org")
}
if url != "https://root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", url, "https://root.io/clusters/34", "root:org")
if r.URL != "https://root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", r.URL, "https://root.io/clusters/34", "root:org")
}

target.UpsertShard("root", "https://new-root.io")
url, found = target.LookupURL(logicalcluster.NewPath("root:org"))
if !found {
r = target.LookupURL(logicalcluster.NewPath("root:org"))
if !r.Found {
t.Fatalf("expected to find a URL for %q path", "root:org")
}
if url != "https://new-root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", url, "https://new-root.io/clusters/34", "root:org")
if r.URL != "https://new-root.io/clusters/34" {
t.Fatalf("unexpected url = %v returned, expected = %v for %q path", r.URL, "https://new-root.io/clusters/34", "root:org")
}
}

Expand All @@ -417,12 +417,12 @@ func TestUpsertWorkspace(t *testing.T) {
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
target.UpsertLogicalCluster("root", newLogicalCluster("44"))

shard, cluster, found := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "34", true)
r := target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "34", true)

target.UpsertWorkspace("root", newWorkspace("org", "root", "44"))
shard, cluster, found = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), shard, cluster, found, "root", "44", true)
r = target.Lookup(logicalcluster.NewPath("root:org"))
validateLookupOutput(t, logicalcluster.NewPath("root:org"), r.Shard, r.Cluster, r.Found, "root", "44", true)
}

func validateLookupOutput(t *testing.T, path logicalcluster.Path, shard string, cluster logicalcluster.Name, found bool, expectedShard string, expectedCluster logicalcluster.Name, expectToFind bool) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/metadata/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *metadataTransport) RoundTrip(req *http.Request) (*http.Response, error)
}

func partialType(req *http.Request) (string, error) {
// strip off /cluster/<lcluster>
// strip off /clusters/<lcluster>
baseReq := *req
if strings.HasPrefix(req.URL.Path, "/clusters/") {
parts := strings.SplitN(req.URL.Path, "/", 4)
Expand All @@ -88,6 +88,7 @@ func partialType(req *http.Request) (string, error) {
if err != nil {
return "", err
}

switch info.Verb {
case "list":
return "PartialObjectMetadataList", nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/index/index_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,5 +287,6 @@ func (c *Controller) stopShard(shardName string) {
}

func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool) {
return c.state.LookupURL(path)
r := c.state.LookupURL(path)
return r.URL, r.Found
}
13 changes: 7 additions & 6 deletions pkg/server/localproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ func WithLocalProxy(
}

// lookup in our local, potentially partial index
requestShardName, rewrittenClusterName, foundInIndex := indexState.Lookup(path)
if foundInIndex && requestShardName != shardName {
logger.WithValues("cluster", cluster.Name, "requestedShard", requestShardName, "actualShard", shardName).Info("cluster is not on this shard, but on another")

r := indexState.Lookup(path)
if r.Found && r.Shard != shardName && r.URL == "" {
logger.WithValues("cluster", cluster.Name, "requestedShard", r.Shard, "actualShard", shardName).Info("cluster is not on this shard, but on another")

w.Header().Set("Retry-After", fmt.Sprintf("%d", 1))
http.Error(w, "Not found on this shard", http.StatusTooManyRequests)
Expand All @@ -154,10 +155,10 @@ func WithLocalProxy(
}

if foundInIndex {
if rewrittenClusterName.Path() != path {
logger.WithValues("from", path, "to", rewrittenClusterName).V(4).Info("rewriting cluster")
if r.Cluster.Path() != path {
logger.WithValues("from", path, "to", r.Cluster).V(4).Info("rewriting cluster")
}
cluster.Name = rewrittenClusterName
cluster.Name = r.Cluster
} else {
// we check "!isName && !foundInIndex" above. So, here it is a name.
cluster.Name = clusterName
Expand Down

0 comments on commit 323486f

Please sign in to comment.