Skip to content

Commit

Permalink
all: resolve and test for issues in rethinkdb coldstart - closes #207
Browse files Browse the repository at this point in the history
  • Loading branch information
arekkas authored Aug 17, 2016
1 parent 0baa04e commit 6a0bbd7
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 43 deletions.
24 changes: 10 additions & 14 deletions client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,24 +176,20 @@ func BenchmarkRethinkAuthenticate(b *testing.B) {
}

func TestColdStartRethinkManager(t *testing.T) {
err := rethinkManager.CreateClient(&Client{
ID: "2341234",
Secret: "secret",
RedirectURIs: []string{"http://redirect"},
TermsOfServiceURI: "foo",
})
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
_, err = rethinkManager.GetClient("2341234")
assert.Nil(t, err)
assert.Nil(t, rethinkManager.CreateClient(&Client{ID: "foo" }))
assert.Nil(t, rethinkManager.CreateClient(&Client{ID: "bar" }))

time.Sleep(time.Second / 2)
rethinkManager.Clients = make(map[string]Client)
_, err = rethinkManager.GetClient("2341234")
assert.NotNil(t, err)
assert.Nil(t, rethinkManager.ColdStart())

rethinkManager.ColdStart()
_, err = rethinkManager.GetClient("2341234")
c1, err := rethinkManager.GetClient("foo")
assert.Nil(t, err)
c2, err := rethinkManager.GetClient("bar")
assert.Nil(t, err)

assert.NotEqual(t, c1, c2)
assert.Equal(t, "foo", c1.GetID())

rethinkManager.Clients = make(map[string]Client)
}
Expand Down
17 changes: 17 additions & 0 deletions connection/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ func BenchmarkRethinkGet(b *testing.B) {
}
}

func TestColdStart(t *testing.T) {
pkg.AssertError(t, false, rethinkManager.Create(&Connection{ID: "foo"}))
pkg.AssertError(t, false, rethinkManager.Create(&Connection{ID: "bar"}))

time.Sleep(time.Second / 2)
rethinkManager.Connections = map[string]Connection{}
pkg.AssertError(t, false, rethinkManager.ColdStart())

c1, err := rethinkManager.Get("foo")
pkg.AssertError(t, false, err)
c2, err := rethinkManager.Get("bar")
pkg.AssertError(t, false, err)

assert.NotEqual(t, c1, c2)
assert.Equal(t, "foo", c1.ID)
}

func TestCreateGetFindDelete(t *testing.T) {
for m, store := range managers {
_, err := store.Get("asdf")
Expand Down
27 changes: 14 additions & 13 deletions internal/fosite_store_rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func requestFromRDB(s *RdbSchema, proto interface{}) (*fosite.Request, error) {
}

func (m *FositeRehinkDBStore) ColdStart() error {
if err := m.AccessTokens.coldStart(m.Session, m.RWMutex, m.AccessTokensTable); err != nil {
if err := m.AccessTokens.coldStart(m.Session, &m.RWMutex, m.AccessTokensTable); err != nil {
return err
} else if err := m.AuthorizeCodes.coldStart(m.Session, m.RWMutex, m.AuthorizeCodesTable); err != nil {
} else if err := m.AuthorizeCodes.coldStart(m.Session, &m.RWMutex, m.AuthorizeCodesTable); err != nil {
return err
} else if err := m.IDSessions.coldStart(m.Session, m.RWMutex, m.IDSessionsTable); err != nil {
} else if err := m.IDSessions.coldStart(m.Session, &m.RWMutex, m.IDSessionsTable); err != nil {
return err
} else if err := m.Implicit.coldStart(m.Session, m.RWMutex, m.ImplicitTable); err != nil {
} else if err := m.Implicit.coldStart(m.Session, &m.RWMutex, m.ImplicitTable); err != nil {
return err
} else if err := m.RefreshTokens.coldStart(m.Session, m.RWMutex, m.RefreshTokensTable); err != nil {
} else if err := m.RefreshTokens.coldStart(m.Session, &m.RWMutex, m.RefreshTokensTable); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -259,14 +259,14 @@ func (s *FositeRehinkDBStore) PersistRefreshTokenGrantSession(ctx context.Contex
}

func (m *FositeRehinkDBStore) Watch(ctx context.Context) {
m.AccessTokens.watch(ctx, m.Session, m.RWMutex, m.AccessTokensTable)
m.AuthorizeCodes.watch(ctx, m.Session, m.RWMutex, m.AuthorizeCodesTable)
m.IDSessions.watch(ctx, m.Session, m.RWMutex, m.IDSessionsTable)
m.Implicit.watch(ctx, m.Session, m.RWMutex, m.ImplicitTable)
m.RefreshTokens.watch(ctx, m.Session, m.RWMutex, m.RefreshTokensTable)
m.AccessTokens.watch(ctx, m.Session, &m.RWMutex, m.AccessTokensTable)
m.AuthorizeCodes.watch(ctx, m.Session, &m.RWMutex, m.AuthorizeCodesTable)
m.IDSessions.watch(ctx, m.Session, &m.RWMutex, m.IDSessionsTable)
m.Implicit.watch(ctx, m.Session, &m.RWMutex, m.ImplicitTable)
m.RefreshTokens.watch(ctx, m.Session, &m.RWMutex, m.RefreshTokensTable)
}

func (items RDBItems) coldStart(sess *r.Session, lock sync.RWMutex, table r.Term) error {
func (items RDBItems) coldStart(sess *r.Session, lock *sync.RWMutex, table r.Term) error {
rows, err := table.Run(sess)
if err != nil {
return errors.New(err)
Expand All @@ -276,7 +276,8 @@ func (items RDBItems) coldStart(sess *r.Session, lock sync.RWMutex, table r.Term
lock.Lock()
defer lock.Unlock()
for rows.Next(&item) {
items[item.ID] = &item
var cp = item
items[item.ID] = &cp
}

if rows.Err() != nil {
Expand All @@ -285,7 +286,7 @@ func (items RDBItems) coldStart(sess *r.Session, lock sync.RWMutex, table r.Term
return nil
}

func (items RDBItems) watch(ctx context.Context, sess *r.Session, lock sync.RWMutex, table r.Term) {
func (items RDBItems) watch(ctx context.Context, sess *r.Session, lock *sync.RWMutex, table r.Term) {
go pkg.Retry(time.Second*15, time.Minute, func() error {
changes, err := table.Changes().Run(sess)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion internal/fosite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pborman/uuid"
"golang.org/x/net/context"
r "gopkg.in/dancannon/gorethink.v2"
"github.com/stretchr/testify/assert"
)

var rethinkManager *FositeRehinkDBStore
Expand Down Expand Up @@ -109,6 +110,12 @@ func TestColdStartRethinkManager(t *testing.T) {

err := m.CreateAuthorizeCodeSession(ctx, id, &defaultRequest)
pkg.AssertError(t, false, err)
err = m.CreateAccessTokenSession(ctx, "12345", &fosite.Request{
RequestedAt: time.Now().Round(time.Second),
Client: &client.Client{ID: "baz"},
})
pkg.AssertError(t, false, err)

err = m.CreateAccessTokenSession(ctx, id, &defaultRequest)
pkg.AssertError(t, false, err)

Expand All @@ -119,6 +126,7 @@ func TestColdStartRethinkManager(t *testing.T) {

delete(rethinkManager.AuthorizeCodes, id)
delete(rethinkManager.AccessTokens, id)
delete(rethinkManager.AccessTokens, "12345")

_, err = m.GetAuthorizeCodeSession(ctx, id, &testSession{})
pkg.AssertError(t, true, err)
Expand All @@ -130,8 +138,12 @@ func TestColdStartRethinkManager(t *testing.T) {

_, err = m.GetAuthorizeCodeSession(ctx, id, &testSession{})
pkg.AssertError(t, false, err)
_, err = m.GetAccessTokenSession(ctx, id, &testSession{})

s1, err := m.GetAccessTokenSession(ctx, id, &testSession{})
pkg.AssertError(t, false, err)
s2, err := m.GetAccessTokenSession(ctx, "12345", &testSession{})
pkg.AssertError(t, false, err)
assert.NotEqual(t, s1, s2)
}

func TestCreateGetDeleteAuthorizeCodes(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions jwk/manager_rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (m *RethinkManager) Watch(ctx context.Context) {
logrus.Debug("Received update from RethinkDB Cluster in JSON Web Key manager.")
newVal := update["new_val"]
oldVal := update["old_val"]
m.Lock()
if newVal == nil && oldVal != nil {
m.watcherRemove(oldVal)
} else if newVal != nil && oldVal != nil {
Expand All @@ -185,7 +184,6 @@ func (m *RethinkManager) Watch(ctx context.Context) {
} else {
m.watcherInsert(newVal)
}
m.Unlock()
}

if connections.Err() != nil {
Expand All @@ -210,6 +208,8 @@ func (m *RethinkManager) watcherInsert(val *rethinkSchema) {
return
}

m.Lock()
defer m.Unlock()
keys := m.Keys[val.Set]
keys.Keys = append(keys.Keys, c)
m.Keys[val.Set] = keys
Expand All @@ -221,6 +221,8 @@ func (m *RethinkManager) watcherRemove(val *rethinkSchema) {
return
}

m.Lock()
defer m.Unlock()
keys.Keys = filter(keys.Keys, func(k jose.JsonWebKey) bool {
return k.KeyID != val.KID
})
Expand Down
31 changes: 18 additions & 13 deletions jwk/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func init() {
managers["http"] = &HTTPManager{Client: httpClient, Endpoint: u}
}

var rethinkManager *RethinkManager
var rethinkManager = new(RethinkManager)

func TestMain(m *testing.M) {
var session *r.Session
Expand Down Expand Up @@ -128,26 +128,31 @@ func BenchmarkRethinkGet(b *testing.B) {
}
}

func TestColdStartRethinkManager(t *testing.T) {
func TestColdStart(t *testing.T) {
ks, _ := testGenerator.Generate("")
priv := ks.Key("private")
p1 := ks.Key("private")

err := rethinkManager.AddKey("testcoldstart", First(priv))
assert.Nil(t, err)
ks, _ = testGenerator.Generate("")
p2 := ks.Key("private")

time.Sleep(100 * time.Millisecond)
_, err = rethinkManager.GetKey("testcoldstart", "private")
assert.Nil(t, err)
pkg.AssertError(t, false, rethinkManager.AddKey("foo", First(p1)))
pkg.AssertError(t, false, rethinkManager.AddKey("bar", First(p2)))

time.Sleep(time.Second / 2)
rethinkManager.Lock()
rethinkManager.Keys = make(map[string]jose.JsonWebKeySet)
_, err = rethinkManager.GetKey("testcoldstart", "private")
assert.NotNil(t, err)
rethinkManager.Unlock()
pkg.AssertError(t, false, rethinkManager.ColdStart())

rethinkManager.ColdStart()
_, err = rethinkManager.GetKey("testcoldstart", "private")
assert.Nil(t, err)
c1, err := rethinkManager.GetKey("foo", "private")
pkg.AssertError(t, false, err)
c2, err := rethinkManager.GetKey("bar", "private")
pkg.AssertError(t, false, err)

assert.NotEqual(t, c1, c2)
rethinkManager.Lock()
rethinkManager.Keys = make(map[string]jose.JsonWebKeySet)
rethinkManager.Unlock()
}

func TestManagerKey(t *testing.T) {
Expand Down

0 comments on commit 6a0bbd7

Please sign in to comment.