Skip to content

Commit

Permalink
Merge pull request #153 from SiaFoundation/christopher/fix-host-scanning
Browse files Browse the repository at this point in the history
Fix host scanning bug
  • Loading branch information
n8maninger authored Jan 3, 2025
2 parents 996d8af + bff0bfd commit 827504e
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 125 deletions.
2 changes: 1 addition & 1 deletion explorer/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Store interface {
SiafundElements(ids []types.SiafundOutputID) (result []SiafundOutput, err error)

Hosts(pks []types.PublicKey) ([]Host, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]Host, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) ([]Host, error)
}

// Explorer implements a Sia explorer.
Expand Down
169 changes: 48 additions & 121 deletions explorer/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"sync"
"time"

crhpv2 "go.sia.tech/core/rhp/v2"
Expand Down Expand Up @@ -159,70 +158,6 @@ func (e *Explorer) scanV2Host(locator geoip.Locator, host Host) (HostScan, error
}, nil
}

func (e *Explorer) addHostScans(hosts chan Host) {
// use default included ip2location database
locator, err := geoip.NewIP2LocationLocator("")
if err != nil {
e.log.Error("Failed to create geoip database", zap.Error(err))
return
}
defer locator.Close()

worker := func() {
var scans []HostScan
for host := range hosts {
if e.isClosed() {
break
}

var scan HostScan
var addr string
var ok bool
var err error

if host.IsV2() {
addr, ok = host.V2SiamuxAddr()
if !ok {
e.log.Debug("Host did not have any v2 siamux net addresses in its announcement, unable to scan", zap.Stringer("pk", host.PublicKey))
continue
}
scan, err = e.scanV2Host(locator, host)
} else {
scan, err = e.scanV1Host(locator, host)
}
if err != nil {
scans = append(scans, HostScan{
PublicKey: host.PublicKey,
Success: false,
Timestamp: types.CurrentTimestamp(),
})
e.log.Debug("Scanning host failed", zap.String("addr", addr), zap.Stringer("pk", host.PublicKey), zap.Error(err))
continue
}

e.log.Debug("Scanning host succeeded", zap.String("addr", addr), zap.Stringer("pk", host.PublicKey))
scans = append(scans, scan)
}

if err := e.s.AddHostScans(scans); err != nil {
e.log.Error("Failed to add host scans to DB", zap.Error(err))
}
}

// launch all workers
var wg sync.WaitGroup
for t := 0; t < e.scanCfg.Threads; t++ {
wg.Add(1)
go func() {
defer wg.Done()
worker()
}()
}

// wait until they're done
wg.Wait()
}

func (e *Explorer) isClosed() bool {
select {
case <-e.ctx.Done():
Expand All @@ -232,33 +167,6 @@ func (e *Explorer) isClosed() bool {
}
}

func (e *Explorer) fetchHosts(hosts chan Host) {
var exhausted bool
offset := 0

t := types.CurrentTimestamp()
cutoff := t.Add(-e.scanCfg.MaxLastScan)
lastAnnouncement := t.Add(-e.scanCfg.MinLastAnnouncement)

for !exhausted && !e.isClosed() {
batch, err := e.s.HostsForScanning(cutoff, lastAnnouncement, uint64(offset), scanBatchSize)
if err != nil {
e.log.Error("failed to get hosts for scanning", zap.Error(err))
return
} else if len(batch) < scanBatchSize {
exhausted = true
}

for _, host := range batch {
select {
case <-e.ctx.Done():
return
case hosts <- host:
}
}
}
}

func (e *Explorer) scanHosts() {
e.log.Info("Waiting for syncing to complete before scanning hosts")
// don't scan hosts till we're at least nearly done with syncing
Expand All @@ -268,40 +176,59 @@ func (e *Explorer) scanHosts() {
}
e.log.Info("Syncing complete, will begin scanning hosts")

locator, err := geoip.NewIP2LocationLocator("")
if err != nil {
e.log.Info("failed to create geoip database:", zap.Error(err))
return
}
defer locator.Close()

for !e.isClosed() {
// fetch hosts
hosts := make(chan Host, scanBatchSize)
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.fetchHosts(hosts)
close(hosts)
}()

// scan hosts
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.addHostScans(hosts)
}()

// wait for scans to complete
waitChan := make(chan struct{})
go func() {
e.wg.Wait()
close(waitChan)
}()
select {
case <-waitChan:
case <-e.ctx.Done():
lastScanCutoff := time.Now().Add(-e.scanCfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-e.scanCfg.MinLastAnnouncement)

batch, err := e.s.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, scanBatchSize)
if err != nil {
e.log.Info("failed to get hosts for scanning:", zap.Error(err))
return
} else if len(batch) == 0 {
select {
case <-e.ctx.Done():
e.log.Info("shutdown:", zap.Error(e.ctx.Err()))
return
case <-time.After(15 * time.Second):
continue // check again
}
}

// pause
select {
case <-e.ctx.Done():
results := make([]HostScan, len(batch))
for i, host := range batch {
e.wg.Add(1)
go func(i int, host Host) {
defer e.wg.Done()

var err error
if len(host.V2NetAddresses) > 0 {
results[i], err = e.scanV2Host(locator, host)
} else {
results[i], err = e.scanV1Host(locator, host)
}
if err != nil {
e.log.Debug("host scan failed", zap.Stringer("pk", host.PublicKey), zap.Error(err))
results[i] = HostScan{
PublicKey: host.PublicKey,
Success: false,
Timestamp: types.CurrentTimestamp(),
}
return
}
}(i, host)
}
e.wg.Wait()

if err := e.s.AddHostScans(results); err != nil {
e.log.Info("failed to add host scans to DB:", zap.Error(err))
return
case <-time.After(30 * time.Second):
}
}
}
4 changes: 2 additions & 2 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func (st *Store) Hosts(pks []types.PublicKey) (result []explorer.Host, err error
// HostsForScanning returns hosts ordered by the transaction they were created in.
// Note that only the PublicKey, NetAddress, and V2NetAddresses fields are
// populated.
func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []explorer.Host, err error) {
func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) (result []explorer.Host, err error) {
err = s.transaction(func(tx *txn) error {
rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ? OFFSET ?`, encode(maxLastScan), encode(minLastAnnouncement), limit, offset)
rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ?`, encode(maxLastScan), encode(minLastAnnouncement), limit)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion persist/sqlite/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,7 +1651,7 @@ func TestHostAnnouncement(t *testing.T) {
}

ts := time.Unix(0, 0)
hosts, err := db.HostsForScanning(ts, ts, 0, 100)
hosts, err := db.HostsForScanning(ts, ts, 100)
if err != nil {
t.Fatal(err)
}
Expand Down
50 changes: 50 additions & 0 deletions persist/sqlite/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,45 @@ func TestScan(t *testing.T) {
t.Fatal(err)
}

for {
tip, err := e.Tip()
if err != nil {
t.Fatal(err)
}
if tip != cm.Tip() {
time.Sleep(time.Second)
} else {
break
}
}

{
lastScanCutoff := time.Now().Add(-cfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement)

dbHosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100)
if err != nil {
t.Fatal(err)
}
testutil.Equal(t, "len(hostsForScanning)", 3, len(dbHosts))

sort.Slice(dbHosts, func(i, j int) bool {
return dbHosts[i].NetAddress < dbHosts[j].NetAddress
})

host1 := dbHosts[0]
testutil.Equal(t, "host1.V2NetAddresses", ha3, host1.V2NetAddresses)
testutil.Equal(t, "host1.PublicKey", pubkey3, host1.PublicKey)

host2 := dbHosts[1]
testutil.Equal(t, "host2.NetAddress", ha2.NetAddress, host2.NetAddress)
testutil.Equal(t, "host2.PublicKey", ha2.PublicKey, host2.PublicKey)

host3 := dbHosts[2]
testutil.Equal(t, "host3.NetAddress", "sia1.euregiohosting.nl:9982", host3.NetAddress)
testutil.Equal(t, "host3.PublicKey", pubkey1, host3.PublicKey)
}

time.Sleep(4 * cfg.Timeout)

{
Expand Down Expand Up @@ -302,4 +341,15 @@ func TestScan(t *testing.T) {
log.Fatal("SectorSize = 0 on host that's supposed to be active")
}
}

{
lastScanCutoff := time.Now().Add(-cfg.MaxLastScan)
lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement)

hosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100)
if err != nil {
t.Fatal(err)
}
testutil.Equal(t, "len(hostsForScanning)", 0, len(hosts))
}
}

0 comments on commit 827504e

Please sign in to comment.