Skip to content

Commit

Permalink
p2p/dnsdisc: add enode.Iterator API (#20437)
Browse files Browse the repository at this point in the history
* p2p/dnsdisc: add support for enode.Iterator

This changes the dnsdisc.Client API to support the enode.Iterator
interface.

* p2p/dnsdisc: rate-limit DNS requests

* p2p/dnsdisc: preserve linked trees across root updates

This improves the way links are handled when the link root changes.
Previously, sync would simply remove all links from the current tree and
garbage-collect all unreachable trees before syncing the new list of
links.

This behavior isn't great in certain cases: Consider a structure where
trees A, B, and C reference each other and D links to A. If D's link
root changed, the sync code would first remove trees A, B and C, only to
re-sync them later when the link to A was found again.

The fix for this problem is to track the current set of links in each
clientTree and removing old links only AFTER all links are synced.

* p2p/dnsdisc: deflake iterator test

* cmd/devp2p: adapt dnsClient to new p2p/dnsdisc API

* p2p/dnsdisc: tiny comment fix
  • Loading branch information
fjl authored and karalabe committed Dec 12, 2019
1 parent d90d1db commit 191364c
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 219 deletions.
3 changes: 1 addition & 2 deletions cmd/devp2p/dnscmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ func dnsClient(ctx *cli.Context) *dnsdisc.Client {
if commandHasFlag(ctx, dnsTimeoutFlag) {
cfg.Timeout = ctx.Duration(dnsTimeoutFlag.Name)
}
c, _ := dnsdisc.NewClient(cfg) // cannot fail because no URLs given
return c
return dnsdisc.NewClient(cfg)
}

// There are two file formats for DNS node trees on disk:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772
gopkg.in/sourcemap.v1 v1.0.5 // indirect
Expand Down
248 changes: 155 additions & 93 deletions p2p/dnsdisc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"net"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
Expand All @@ -31,15 +32,13 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/time/rate"
)

// Client discovers nodes by querying DNS servers.
type Client struct {
cfg Config
clock mclock.Clock
linkCache linkCache
trees map[string]*clientTree

cfg Config
clock mclock.Clock
entries *lru.Cache
}

Expand All @@ -48,6 +47,7 @@ type Config struct {
Timeout time.Duration // timeout used for DNS lookups (default 5s)
RecheckInterval time.Duration // time between tree root update checks (default 30min)
CacheLimit int // maximum number of cached records (default 1000)
RateLimit float64 // maximum DNS requests / second (default 3)
ValidSchemes enr.IdentityScheme // acceptable ENR identity schemes (default enode.ValidSchemes)
Resolver Resolver // the DNS resolver to use (defaults to system DNS)
Logger log.Logger // destination of client log messages (defaults to root logger)
Expand All @@ -60,9 +60,10 @@ type Resolver interface {

func (cfg Config) withDefaults() Config {
const (
defaultTimeout = 5 * time.Second
defaultRecheck = 30 * time.Minute
defaultCache = 1000
defaultTimeout = 5 * time.Second
defaultRecheck = 30 * time.Minute
defaultRateLimit = 3
defaultCache = 1000
)
if cfg.Timeout == 0 {
cfg.Timeout = defaultTimeout
Expand All @@ -73,6 +74,9 @@ func (cfg Config) withDefaults() Config {
if cfg.CacheLimit == 0 {
cfg.CacheLimit = defaultCache
}
if cfg.RateLimit == 0 {
cfg.RateLimit = defaultRateLimit
}
if cfg.ValidSchemes == nil {
cfg.ValidSchemes = enode.ValidSchemes
}
Expand All @@ -86,32 +90,24 @@ func (cfg Config) withDefaults() Config {
}

// NewClient creates a client.
func NewClient(cfg Config, urls ...string) (*Client, error) {
c := &Client{
cfg: cfg.withDefaults(),
clock: mclock.System{},
trees: make(map[string]*clientTree),
}
var err error
if c.entries, err = lru.New(c.cfg.CacheLimit); err != nil {
return nil, err
}
for _, url := range urls {
if err := c.AddTree(url); err != nil {
return nil, err
}
func NewClient(cfg Config) *Client {
cfg = cfg.withDefaults()
cache, err := lru.New(cfg.CacheLimit)
if err != nil {
panic(err)
}
return c, nil
rlimit := rate.NewLimiter(rate.Limit(cfg.RateLimit), 10)
cfg.Resolver = &rateLimitResolver{cfg.Resolver, rlimit}
return &Client{cfg: cfg, entries: cache, clock: mclock.System{}}
}

// SyncTree downloads the entire node tree at the given URL. This doesn't add the tree for
// later use, but any previously-synced entries are reused.
// SyncTree downloads the entire node tree at the given URL.
func (c *Client) SyncTree(url string) (*Tree, error) {
le, err := parseLink(url)
if err != nil {
return nil, fmt.Errorf("invalid enrtree URL: %v", err)
}
ct := newClientTree(c, le)
ct := newClientTree(c, new(linkCache), le)
t := &Tree{entries: make(map[string]entry)}
if err := ct.syncAll(t.entries); err != nil {
return nil, err
Expand All @@ -120,75 +116,16 @@ func (c *Client) SyncTree(url string) (*Tree, error) {
return t, nil
}

// AddTree adds a enrtree:// URL to crawl.
func (c *Client) AddTree(url string) error {
le, err := parseLink(url)
if err != nil {
return fmt.Errorf("invalid enrtree URL: %v", err)
}
ct, err := c.ensureTree(le)
if err != nil {
return err
}
c.linkCache.add(ct)
return nil
}

func (c *Client) ensureTree(le *linkEntry) (*clientTree, error) {
if tree, ok := c.trees[le.domain]; ok {
if !tree.matchPubkey(le.pubkey) {
return nil, fmt.Errorf("conflicting public keys for domain %q", le.domain)
}
return tree, nil
}
ct := newClientTree(c, le)
c.trees[le.domain] = ct
return ct, nil
}

// RandomNode retrieves the next random node.
func (c *Client) RandomNode(ctx context.Context) *enode.Node {
for {
ct := c.randomTree()
if ct == nil {
return nil
}
n, err := ct.syncRandom(ctx)
if err != nil {
if err == ctx.Err() {
return nil // context canceled.
}
c.cfg.Logger.Debug("Error in DNS random node sync", "tree", ct.loc.domain, "err", err)
continue
}
if n != nil {
return n
}
}
}

// randomTree returns a random tree.
func (c *Client) randomTree() *clientTree {
if !c.linkCache.valid() {
c.gcTrees()
}
limit := rand.Intn(len(c.trees))
for _, ct := range c.trees {
if limit == 0 {
return ct
// NewIterator creates an iterator that visits all nodes at the
// given tree URLs.
func (c *Client) NewIterator(urls ...string) (enode.Iterator, error) {
it := c.newRandomIterator()
for _, url := range urls {
if err := it.addTree(url); err != nil {
return nil, err
}
limit--
}
return nil
}

// gcTrees rebuilds the 'trees' map.
func (c *Client) gcTrees() {
trees := make(map[string]*clientTree)
for t := range c.linkCache.all() {
trees[t.loc.domain] = t
}
c.trees = trees
return it, nil
}

// resolveRoot retrieves a root entry via DNS.
Expand Down Expand Up @@ -258,3 +195,128 @@ func (c *Client) doResolveEntry(ctx context.Context, domain, hash string) (entry
}
return nil, nameError{name, errNoEntry}
}

// rateLimitResolver applies a rate limit to a Resolver.
type rateLimitResolver struct {
r Resolver
limiter *rate.Limiter
}

func (r *rateLimitResolver) LookupTXT(ctx context.Context, domain string) ([]string, error) {
if err := r.limiter.Wait(ctx); err != nil {
return nil, err
}
return r.r.LookupTXT(ctx, domain)
}

// randomIterator traverses a set of trees and returns nodes found in them.
type randomIterator struct {
cur *enode.Node
ctx context.Context
cancelFn context.CancelFunc
c *Client

mu sync.Mutex
trees map[string]*clientTree // all trees
lc linkCache // tracks tree dependencies
}

func (c *Client) newRandomIterator() *randomIterator {
ctx, cancel := context.WithCancel(context.Background())
return &randomIterator{
c: c,
ctx: ctx,
cancelFn: cancel,
trees: make(map[string]*clientTree),
}
}

// Node returns the current node.
func (it *randomIterator) Node() *enode.Node {
return it.cur
}

// Close closes the iterator.
func (it *randomIterator) Close() {
it.mu.Lock()
defer it.mu.Unlock()

it.cancelFn()
it.trees = nil
}

// Next moves the iterator to the next node.
func (it *randomIterator) Next() bool {
it.cur = it.nextNode()
return it.cur != nil
}

// addTree adds a enrtree:// URL to the iterator.
func (it *randomIterator) addTree(url string) error {
le, err := parseLink(url)
if err != nil {
return fmt.Errorf("invalid enrtree URL: %v", err)
}
it.lc.addLink("", le.str)
return nil
}

// nextNode syncs random tree entries until it finds a node.
func (it *randomIterator) nextNode() *enode.Node {
for {
ct := it.nextTree()
if ct == nil {
return nil
}
n, err := ct.syncRandom(it.ctx)
if err != nil {
if err == it.ctx.Err() {
return nil // context canceled.
}
it.c.cfg.Logger.Debug("Error in DNS random node sync", "tree", ct.loc.domain, "err", err)
continue
}
if n != nil {
return n
}
}
}

// nextTree returns a random tree.
func (it *randomIterator) nextTree() *clientTree {
it.mu.Lock()
defer it.mu.Unlock()

if it.lc.changed {
it.rebuildTrees()
it.lc.changed = false
}
if len(it.trees) == 0 {
return nil
}
limit := rand.Intn(len(it.trees))
for _, ct := range it.trees {
if limit == 0 {
return ct
}
limit--
}
return nil
}

// rebuildTrees rebuilds the 'trees' map.
func (it *randomIterator) rebuildTrees() {
// Delete removed trees.
for loc := range it.trees {
if !it.lc.isReferenced(loc) {
delete(it.trees, loc)
}
}
// Add new trees.
for loc := range it.lc.backrefs {
if it.trees[loc] == nil {
link, _ := parseLink(linkPrefix + loc)
it.trees[loc] = newClientTree(it.c, &it.lc, link)
}
}
}
Loading

0 comments on commit 191364c

Please sign in to comment.