Skip to content

Commit

Permalink
Merge pull request #49 from peterargue/add-contexts-datastore-v5
Browse files Browse the repository at this point in the history
Add contexts to support datastore v0.5
  • Loading branch information
whyrusleeping authored Nov 19, 2021
2 parents ff7104a + 6a9f7db commit 7318f1b
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 146 deletions.
61 changes: 31 additions & 30 deletions datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package badger

import (
"context"
"errors"
"fmt"
"runtime"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (d *Datastore) periodicGC() {
// NewTransaction starts a new transaction. The resulting transaction object
// can be mutated without incurring changes to the underlying Datastore until
// the transaction is Committed.
func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
func (d *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -220,7 +221,7 @@ func (d *Datastore) newImplicitTransaction(readOnly bool) *txn {
return &txn{d, d.DB.NewTransaction(!readOnly), true}
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -237,7 +238,7 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return txn.commit()
}

func (d *Datastore) Sync(prefix ds.Key) error {
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -251,7 +252,7 @@ func (d *Datastore) Sync(prefix ds.Key) error {
return d.DB.Sync()
}

func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (d *Datastore) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -268,7 +269,7 @@ func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) erro
return txn.commit()
}

func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
func (d *Datastore) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -285,7 +286,7 @@ func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
return txn.commit()
}

func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
func (d *Datastore) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -298,7 +299,7 @@ func (d *Datastore) GetExpiration(key ds.Key) (time.Time, error) {
return txn.getExpiration(key)
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -311,7 +312,7 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return txn.get(key)
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
func (d *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -324,7 +325,7 @@ func (d *Datastore) Has(key ds.Key) (bool, error) {
return txn.has(key)
}

func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -337,7 +338,7 @@ func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return txn.getSize(key)
}

func (d *Datastore) Delete(key ds.Key) error {
func (d *Datastore) Delete(ctx context.Context, key ds.Key) error {
d.closeLk.RLock()
defer d.closeLk.RUnlock()

Expand All @@ -352,7 +353,7 @@ func (d *Datastore) Delete(key ds.Key) error {
return txn.commit()
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -368,7 +369,7 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -391,9 +392,9 @@ func (d *Datastore) Close() error {
return d.DB.Close()
}

// Batch creats a new Batch object. This provides a way to do many writes, when
// Batch creates a new Batch object. This provides a way to do many writes, when
// there may be too many to fit into a single transaction.
func (d *Datastore) Batch() (ds.Batch, error) {
func (d *Datastore) Batch(ctx context.Context) (ds.Batch, error) {
d.closeLk.RLock()
defer d.closeLk.RUnlock()
if d.closed {
Expand All @@ -411,7 +412,7 @@ func (d *Datastore) Batch() (ds.Batch, error) {
return b, nil
}

func (d *Datastore) CollectGarbage() (err error) {
func (d *Datastore) CollectGarbage(ctx context.Context) (err error) {
// The idea is to keep calling DB.RunValueLogGC() till Badger no longer has any log files
// to GC(which would be indicated by an error, please refer to Badger GC docs).
for err == nil {
Expand All @@ -436,7 +437,7 @@ func (d *Datastore) gcOnce() error {

var _ ds.Batch = (*batch)(nil)

func (b *batch) Put(key ds.Key, value []byte) error {
func (b *batch) Put(ctx context.Context, key ds.Key, value []byte) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -449,7 +450,7 @@ func (b *batch) put(key ds.Key, value []byte) error {
return b.writeBatch.Set(key.Bytes(), value)
}

func (b *batch) Delete(key ds.Key) error {
func (b *batch) Delete(ctx context.Context, key ds.Key) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand All @@ -463,7 +464,7 @@ func (b *batch) delete(key ds.Key) error {
return b.writeBatch.Delete(key.Bytes())
}

func (b *batch) Commit() error {
func (b *batch) Commit(ctx context.Context) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
Expand Down Expand Up @@ -503,7 +504,7 @@ func (b *batch) cancel() {
var _ ds.Datastore = (*txn)(nil)
var _ ds.TTLDatastore = (*txn)(nil)

func (t *txn) Put(key ds.Key, value []byte) error {
func (t *txn) Put(ctx context.Context, key ds.Key, value []byte) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -516,7 +517,7 @@ func (t *txn) put(key ds.Key, value []byte) error {
return t.txn.Set(key.Bytes(), value)
}

func (t *txn) Sync(prefix ds.Key) error {
func (t *txn) Sync(ctx context.Context, prefix ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -526,7 +527,7 @@ func (t *txn) Sync(prefix ds.Key) error {
return nil
}

func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
func (t *txn) PutWithTTL(ctx context.Context, key ds.Key, value []byte, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -539,7 +540,7 @@ func (t *txn) putWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
return t.txn.SetEntry(badger.NewEntry(key.Bytes(), value).WithTTL(ttl))
}

func (t *txn) GetExpiration(key ds.Key) (time.Time, error) {
func (t *txn) GetExpiration(ctx context.Context, key ds.Key) (time.Time, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -559,7 +560,7 @@ func (t *txn) getExpiration(key ds.Key) (time.Time, error) {
return time.Unix(int64(item.ExpiresAt()), 0), nil
}

func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error {
func (t *txn) SetTTL(ctx context.Context, key ds.Key, ttl time.Duration) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -580,7 +581,7 @@ func (t *txn) setTTL(key ds.Key, ttl time.Duration) error {

}

func (t *txn) Get(key ds.Key) ([]byte, error) {
func (t *txn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -602,7 +603,7 @@ func (t *txn) get(key ds.Key) ([]byte, error) {
return item.ValueCopy(nil)
}

func (t *txn) Has(key ds.Key) (bool, error) {
func (t *txn) Has(ctx context.Context, key ds.Key) (bool, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -624,7 +625,7 @@ func (t *txn) has(key ds.Key) (bool, error) {
}
}

func (t *txn) GetSize(key ds.Key) (int, error) {
func (t *txn) GetSize(ctx context.Context, key ds.Key) (int, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -646,7 +647,7 @@ func (t *txn) getSize(key ds.Key) (int, error) {
}
}

func (t *txn) Delete(key ds.Key) error {
func (t *txn) Delete(ctx context.Context, key ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand All @@ -660,7 +661,7 @@ func (t *txn) delete(key ds.Key) error {
return t.txn.Delete(key.Bytes())
}

func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
func (t *txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -850,7 +851,7 @@ func (t *txn) query(q dsq.Query) (dsq.Results, error) {
return qrb.Results(), nil
}

func (t *txn) Commit() error {
func (t *txn) Commit(ctx context.Context) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down Expand Up @@ -878,7 +879,7 @@ func (t *txn) close() error {
return t.txn.Commit()
}

func (t *txn) Discard() {
func (t *txn) Discard(ctx context.Context) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
Expand Down
Loading

0 comments on commit 7318f1b

Please sign in to comment.