From 10d29ffa55c2e041c07ab46d9fa6f9c3c422bcd3 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 19 Jul 2023 14:59:33 -0500 Subject: [PATCH] GetManyIpldStore --- store.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/store.go b/store.go index 8210cf3..598df55 100644 --- a/store.go +++ b/store.go @@ -3,6 +3,7 @@ package cbornode import ( "bytes" "context" + "errors" "fmt" block "github.com/ipfs/go-block-format" @@ -207,3 +208,66 @@ func (mb *mockBlocks) Put(ctx context.Context, b block.Block) error { mb.data[b.Cid()] = b return nil } + +type Cursor struct { + CID cid.Cid + Index int + Err error +} + +// IpldGetManyStore wraps a GetManyBlockstore and provides an interface for retrieving CBOR encoded data in batches +type IpldGetManyStore interface { + GetMany(ctx context.Context, cs []cid.Cid, outs []interface{}) (<-chan Cursor, []cid.Cid, error) +} + +// IpldGetManyBlockStore defines a subset of the go-ipfs-blockstore Blockstore interface providing a method +// for retrieving block-centered data in batches +type IpldGetManyBlockStore interface { + IpldBlockstore + GetMany(ctx context.Context, cs []cid.Cid) ([]block.Block, []cid.Cid, error) +} + +// GetManyIpldStore wraps and IpldBlockstore and implements the IpldGetManyStore interface. +type GetManyIpldStore struct { + *BasicIpldStore + GetManyBlocks IpldGetManyBlockStore +} + +var _ IpldStore = &GetManyIpldStore{} +var _ IpldGetManyStore = &GetManyIpldStore{} + +// NewGetManyCborStore returns an IpldStore implementation backed by the provided IpldGetManyBlockStore. +func NewGetManyCborStore(bs IpldGetManyBlockStore) *GetManyIpldStore { + viewer, _ := bs.(IpldBlockstoreViewer) + bis := &BasicIpldStore{Blocks: bs, Viewer: viewer} + return &GetManyIpldStore{GetManyBlocks: bs, BasicIpldStore: bis} +} + +// GetMany reads and unmarshals the content at `cs` into `outs` +// it returns a channel for tracking the position, identify, and any decode errors in output list +// as well as a list of all the CIDs that could not be retrieved from the underlying blockstore +func (g *GetManyIpldStore) GetMany(ctx context.Context, cs []cid.Cid, outs []interface{}) (<-chan Cursor, []cid.Cid, error) { + if len(cs) != len(outs) { + return nil, nil, errors.New("expected list of cids to be the same length as the destination decode list") + } + blks, missingCIDs, err := g.GetManyBlocks.GetMany(ctx, cs) + if err != nil { + return nil, nil, err + } + // tempted to make this all-or-nothing, where if there are any missing CIDs we simply return an error + // because this all feels very hacky... + cursors := make(chan Cursor) + go func() { + for i, blk := range blks { + err := g.decode(blk.RawData(), outs[i]) + cursors <- Cursor{ + CID: blk.Cid(), + Index: i, + Err: err, + } + } + close(cursors) // this will never be closed if the receiver encounters an error and decides to stop receiving off the channel + // but that's OK, it will eventually be garbage collected in that case + }() + return cursors, missingCIDs, nil +}