Skip to content

Commit

Permalink
Merge pull request #506 from oasisprotocol/pro-wh/feature/holders12
Browse files Browse the repository at this point in the history
NFT analyzer
  • Loading branch information
pro-wh authored Oct 3, 2023
2 parents c0b1933 + a8d53b2 commit 2f01561
Show file tree
Hide file tree
Showing 18 changed files with 539 additions and 15 deletions.
134 changes: 134 additions & 0 deletions analyzer/evmnfts/evm_nfts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package evmnfts

import (
"context"
"fmt"

"github.com/oasisprotocol/nexus/analyzer"
"github.com/oasisprotocol/nexus/analyzer/evmnfts/ipfsclient"
"github.com/oasisprotocol/nexus/analyzer/item"
"github.com/oasisprotocol/nexus/analyzer/queries"
"github.com/oasisprotocol/nexus/analyzer/runtime/evm"
"github.com/oasisprotocol/nexus/common"
"github.com/oasisprotocol/nexus/config"
"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage"
"github.com/oasisprotocol/nexus/storage/client"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"
)

const (
evmNFTsAnalyzerPrefix = "evm_nfts_"
)

type processor struct {
runtime common.Runtime
source nodeapi.RuntimeApiLite
ipfsClient ipfsclient.Client
target storage.TargetStorage
logger *log.Logger
}

var _ item.ItemProcessor[*StaleNFT] = (*processor)(nil)

func NewAnalyzer(
runtime common.Runtime,
cfg config.ItemBasedAnalyzerConfig,
sourceClient nodeapi.RuntimeApiLite,
ipfsClient ipfsclient.Client,
target storage.TargetStorage,
logger *log.Logger,
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", evmNFTsAnalyzerPrefix+runtime)
p := &processor{
runtime: runtime,
source: sourceClient,
ipfsClient: ipfsClient,
target: target,
logger: logger,
}
return item.NewAnalyzer[*StaleNFT](
evmNFTsAnalyzerPrefix+string(runtime),
cfg,
p,
target,
logger,
)
}

type StaleNFT struct {
Addr string
ID common.BigInt
Type *evm.EVMTokenType
AddrContextIdentifier string
AddrContextVersion int
AddrData []byte
DownloadRound uint64
}

func (p *processor) GetItems(ctx context.Context, limit uint64) ([]*StaleNFT, error) {
var staleNFTs []*StaleNFT
rows, err := p.target.Query(ctx, queries.RuntimeEVMNFTAnalysisStale, p.runtime, limit)
if err != nil {
return nil, fmt.Errorf("querying discovered NFTs: %w", err)
}
defer rows.Close()
for rows.Next() {
var staleNFT StaleNFT
if err = rows.Scan(
&staleNFT.Addr,
&staleNFT.ID,
&staleNFT.Type,
&staleNFT.AddrContextIdentifier,
&staleNFT.AddrContextVersion,
&staleNFT.AddrData,
&staleNFT.DownloadRound,
); err != nil {
return nil, fmt.Errorf("scanning discovered nft: %w", err)
}
staleNFTs = append(staleNFTs, &staleNFT)
}
return staleNFTs, nil
}

func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch, staleNFT *StaleNFT) error {
p.logger.Info("downloading", "stale_nft", staleNFT)
tokenEthAddr, err := client.EVMEthAddrFromPreimage(staleNFT.AddrContextIdentifier, staleNFT.AddrContextVersion, staleNFT.AddrData)
if err != nil {
return fmt.Errorf("token address: %w", err)
}
nftData, err := evm.EVMDownloadNewNFT(
ctx,
p.logger,
p.source,
p.ipfsClient,
staleNFT.DownloadRound,
tokenEthAddr,
*staleNFT.Type,
&staleNFT.ID.Int,
)
if err != nil {
return fmt.Errorf("downloading NFT %s %v: %w", staleNFT.Addr, staleNFT.ID, err)
}
batch.Queue(
queries.RuntimeEVMNFTUpdate,
p.runtime,
staleNFT.Addr,
staleNFT.ID,
staleNFT.DownloadRound,
nftData.MetadataURI,
nftData.MetadataAccessed,
nftData.Name,
nftData.Description,
nftData.Image,
)
return nil
}

func (p *processor) QueueLength(ctx context.Context) (int, error) {
var queueLength int
if err := p.target.QueryRow(ctx, queries.RuntimeEVMNFTAnalysisStaleCount, p.runtime).Scan(&queueLength); err != nil {
return 0, fmt.Errorf("querying number of stale NFTs: %w", err)
}
return queueLength, nil
}
52 changes: 52 additions & 0 deletions analyzer/evmnfts/httpmisc/http_misc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package httpmisc

import (
"context"
"fmt"
"net/http"
"time"
)

// httpmisc is a bunch of opinions that are common to a few places that use
// HTTP.

const ClientTimeout = 30 * time.Second

func GetWithContextWithClient(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
return client.Do(req)
}

type ResourceError struct {
// Note: .error is the implementation of .Error, .Unwrap etc. It is not
// in the Unwrap chain. Use something like
// `ResourceError{fmt.Errorf("...: %w", err)}` to set up an
// instance with `err` in the Unwrap chain.
error
}

func (err ResourceError) Is(target error) bool {
if _, ok := target.(ResourceError); ok {
return true
}
return false
}

func ResponseOK(resp *http.Response) error {
if resp.StatusCode >= 500 || resp.StatusCode == 429 {
if err := resp.Body.Close(); err != nil {
return fmt.Errorf("HTTP closing body due to HTTP %d: %w", resp.StatusCode, err)
}
return fmt.Errorf("HTTP %d", resp.StatusCode)
}
if resp.StatusCode != 200 {
if err := resp.Body.Close(); err != nil {
return fmt.Errorf("HTTP closing body: %w", err)
}
return ResourceError{fmt.Errorf("HTTP %d", resp.StatusCode)}
}
return nil
}
14 changes: 14 additions & 0 deletions analyzer/evmnfts/ipfsclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ipfsclient

import (
"context"
"io"
)

// ipfsclient is a super-simple interface for getting a file from ipfs. It
// comes with an implementation that uses an IPFS HTTP gateway.

// Client is a simplified IPFS interface.
type Client interface {
Get(ctx context.Context, path string) (io.ReadCloser, error)
}
41 changes: 41 additions & 0 deletions analyzer/evmnfts/ipfsclient/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ipfsclient

import (
"context"
"fmt"
"io"
"net/http"

"github.com/oasisprotocol/nexus/analyzer/evmnfts/httpmisc"
)

// gatewayHTTPClient is an *http.Client to use for accessing gateways.
// We can't use the pubclient package because we use a local ipfs gateway.
var gatewayHTTPClient = &http.Client{
Timeout: httpmisc.ClientTimeout,
}

// Gateway is a Client that fetches through an IPFS HTTP gateway.
type Gateway struct {
base string
}

var _ Client = (*Gateway)(nil)

func NewGateway(base string) (*Gateway, error) {
return &Gateway{
base: base,
}, nil
}

func (g *Gateway) Get(ctx context.Context, path string) (io.ReadCloser, error) {
url := g.base + "/ipfs/" + path
resp, err := httpmisc.GetWithContextWithClient(ctx, gatewayHTTPClient, url)
if err != nil {
return nil, fmt.Errorf("requesting from gateway: %w", err)
}
if err = httpmisc.ResponseOK(resp); err != nil {
return nil, err
}
return resp.Body, nil
}
4 changes: 4 additions & 0 deletions analyzer/evmnfts/ipfsclient/kubo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package ipfsclient

// Alternative:
// https://pkg.go.dev/github.com/ipfs/kubo/core/coreapi#UnixfsAPI.Get
45 changes: 45 additions & 0 deletions analyzer/evmnfts/multiproto/multiproto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package multiproto

import (
"context"
"fmt"
"io"
"net/url"

"github.com/oasisprotocol/nexus/analyzer/evmnfts/httpmisc"
"github.com/oasisprotocol/nexus/analyzer/evmnfts/ipfsclient"
"github.com/oasisprotocol/nexus/analyzer/evmnfts/pubclient"
)

// multiproto gets a file from a URL by looking at the URL's scheme and
// requesting it from suitable client. Cross-scheme redirects thus don't work,
// as we only switch once on protocol out here.

func Get(ctx context.Context, ipfsClient ipfsclient.Client, whyDidTheyNameTheirLibraryURLNowICantUseThatName string) (io.ReadCloser, error) {
parsed, err := url.Parse(whyDidTheyNameTheirLibraryURLNowICantUseThatName)
if err != nil {
return nil, fmt.Errorf("parse URL: %w", err)
}
if parsed.Scheme == "" {
return nil, fmt.Errorf("scheme not set")
}
switch parsed.Scheme {
case "http", "https":
resp, err1 := pubclient.GetWithContext(ctx, whyDidTheyNameTheirLibraryURLNowICantUseThatName)
if err1 != nil {
return nil, fmt.Errorf("HTTP get: %w", err1)
}
if err1 = httpmisc.ResponseOK(resp); err1 != nil {
return nil, err1
}
return resp.Body, nil
case "ipfs":
rc, err1 := ipfsClient.Get(ctx, parsed.Host+"/"+parsed.Path)
if err1 != nil {
return nil, fmt.Errorf("IPFS get: %w", err1)
}
return rc, nil
default:
return nil, fmt.Errorf("unsupported protocol %s", parsed.Scheme)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

coreCommon "github.com/oasisprotocol/oasis-core/go/common"

"github.com/oasisprotocol/nexus/analyzer/evmnfts/httpmisc"
)

// Use this package for connecting to untrusted URLs.
Expand Down Expand Up @@ -83,17 +85,9 @@ var client = &http.Client{
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 30 * time.Second,
}

func getWithContextWithClient(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
return client.Do(req)
Timeout: httpmisc.ClientTimeout,
}

func GetWithContext(ctx context.Context, url string) (*http.Response, error) {
return getWithContextWithClient(ctx, client, url)
return httpmisc.GetWithContextWithClient(ctx, client, url)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/nexus/analyzer/evmnfts/httpmisc"
)

func wasteResp(resp *http.Response) error {
Expand Down Expand Up @@ -58,11 +60,11 @@ func TestMisc(t *testing.T) {
ctx := context.Background()

// Default client should reach local server. This makes sure the test server is working.
resp, err := getWithContextWithClient(ctx, http.DefaultClient, "http://localhost:8001/test.json")
resp, err := httpmisc.GetWithContextWithClient(ctx, http.DefaultClient, "http://localhost:8001/test.json")
requireNoErrorAndWaste(t, resp, err)
require.True(t, requested)
requested = false
resp, err = getWithContextWithClient(ctx, http.DefaultClient, "http://[::1]:8001/test.json")
resp, err = httpmisc.GetWithContextWithClient(ctx, http.DefaultClient, "http://[::1]:8001/test.json")
requireNoErrorAndWaste(t, resp, err)
require.True(t, requested)
requested = false
Expand Down
Loading

0 comments on commit 2f01561

Please sign in to comment.