Skip to content

Commit

Permalink
Add: --hq-rate-limiting-send-back
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Apr 1, 2024
1 parent 1260c48 commit b17ca8c
Show file tree
Hide file tree
Showing 25 changed files with 138 additions and 96 deletions.
4 changes: 2 additions & 2 deletions cmd/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package all

import (
_ "github.com/CorentinB/Zeno/cmd/get"
_ "github.com/CorentinB/Zeno/cmd/version"
_ "github.com/internetarchive/Zeno/cmd/get"
_ "github.com/internetarchive/Zeno/cmd/version"
)
7 changes: 6 additions & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/CorentinB/Zeno/config"
"github.com/internetarchive/Zeno/config"
)

var GlobalFlags = []cli.Flag{
Expand Down Expand Up @@ -288,6 +288,11 @@ var GlobalFlags = []cli.Flag{
Value: "lifo",
Destination: &config.App.Flags.HQStrategy,
},
&cli.BoolFlag{
Name: "hq-rate-limiting-send-back",
Usage: "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.",
Destination: &config.App.Flags.HQRateLimitingSendBack,
},
&cli.StringFlag{
Name: "es-url",
Usage: "ElasticSearch URL to use for indexing crawl logs.",
Expand Down
6 changes: 3 additions & 3 deletions cmd/get/get.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package get

import (
"github.com/CorentinB/Zeno/cmd"
"github.com/CorentinB/Zeno/config"
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

func initLogging(c *cli.Context) (err error) {
func initLogging() (err error) {
// Log as JSON instead of the default ASCII formatter.
if config.App.Flags.JSON {
log.SetFormatter(&log.JSONFormatter{})
Expand Down
6 changes: 3 additions & 3 deletions cmd/get/hq.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package get

import (
"github.com/CorentinB/Zeno/cmd"
"github.com/CorentinB/Zeno/config"
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
Expand All @@ -19,7 +19,7 @@ func newGetHQCmd() *cli.Command {
}

func cmdGetHQ(c *cli.Context) error {
err := initLogging(c)
err := initLogging()
if err != nil {
log.Error("Unable to parse arguments")
return err
Expand Down
8 changes: 4 additions & 4 deletions cmd/get/list.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package get

import (
"github.com/CorentinB/Zeno/cmd"
"github.com/CorentinB/Zeno/config"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
Expand All @@ -20,7 +20,7 @@ func newGetListCmd() *cli.Command {
}

func cmdGetList(c *cli.Context) error {
err := initLogging(c)
err := initLogging()
if err != nil {
log.Error("Unable to parse arguments")
return err
Expand Down
10 changes: 5 additions & 5 deletions cmd/get/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package get
import (
"net/url"

"github.com/CorentinB/Zeno/cmd"
"github.com/CorentinB/Zeno/config"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
Expand All @@ -21,7 +21,7 @@ func newGetURLCmd() *cli.Command {
}

func cmdGetURL(c *cli.Context) error {
err := initLogging(c)
err := initLogging()
if err != nil {
logrus.Error("Unable to parse arguments")
return err
Expand All @@ -40,7 +40,7 @@ func cmdGetURL(c *cli.Context) error {
return err
}

crawl.SeedList = append(crawl.SeedList, *frontier.NewItem(input, nil, "seed", 0, ""))
crawl.SeedList = append(crawl.SeedList, *frontier.NewItem(input, nil, "seed", 0, "", false))

// Start crawl
err = crawl.Start()
Expand Down
9 changes: 5 additions & 4 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"path"
"time"

"github.com/CorentinB/Zeno/config"
"github.com/CorentinB/Zeno/internal/pkg/crawl"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/google/uuid"
"github.com/internetarchive/Zeno/config"
"github.com/internetarchive/Zeno/internal/pkg/crawl"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/paulbellamy/ratecounter"
"github.com/remeh/sizedwaitgroup"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -130,6 +130,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {
c.HQStrategy = flags.HQStrategy
c.HQBatchSize = int(flags.HQBatchSize)
c.HQContinuousPull = flags.HQContinuousPull
c.HQRateLimitingSendBack = flags.HQRateLimitingSendBack

return c
}
4 changes: 2 additions & 2 deletions cmd/version/version.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package version

import (
"github.com/CorentinB/Zeno/cmd"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/internetarchive/Zeno/cmd"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/urfave/cli/v2"
)

Expand Down
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ type Flags struct {
WARCTempDir string
WARCCustomCookie string

UseHQ bool
HQBatchSize int64
HQAddress string
HQProject string
HQKey string
HQSecret string
HQStrategy string
HQContinuousPull bool
UseHQ bool
HQBatchSize int64
HQAddress string
HQProject string
HQKey string
HQSecret string
HQStrategy string
HQContinuousPull bool
HQRateLimitingSendBack bool

CDXDedupeServer string
DisableLocalDedupe bool
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/CorentinB/Zeno
module github.com/internetarchive/Zeno

go 1.22

Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/crawl/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"regexp"
"strings"

"github.com/CorentinB/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/PuerkitoBio/goquery"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/utils"
)

func (c *Crawl) extractAssets(base *url.URL, item *frontier.Item, doc *goquery.Document) (assets []*url.URL, err error) {
Expand Down
39 changes: 26 additions & 13 deletions internal/pkg/crawl/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import (
"sync/atomic"
"time"

"github.com/CorentinB/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/CorentinB/Zeno/internal/pkg/crawl/sitespecific/telegram"
"github.com/CorentinB/Zeno/internal/pkg/crawl/sitespecific/tiktok"
"github.com/CorentinB/Zeno/internal/pkg/crawl/sitespecific/vk"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/PuerkitoBio/goquery"
"github.com/clbanning/mxj/v2"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/cloudflarestream"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/telegram"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/tiktok"
"github.com/internetarchive/Zeno/internal/pkg/crawl/sitespecific/vk"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/remeh/sizedwaitgroup"
"github.com/tomnomnom/linkheader"

"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
)

func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection bool) (resp *http.Response, err error) {
Expand Down Expand Up @@ -98,22 +98,31 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
"sleepTime": sleepTime.String(),
"retryCount": retry,
"statusCode": resp.StatusCode,
})).Warn("we are being rate limited, sleeping then retrying..")
})).Debugf("we are being rate limited")

// This ensures we aren't leaving the warc dialer hanging.
// Do note, 429s are filtered out by WARC writer regardless.
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

time.Sleep(sleepTime)
// If --hq-rate-limiting-send-back is enabled, we send the URL back to HQ
if c.UseHQ && c.HQRateLimitingSendBack {
return nil, errors.New("URL is being rate limited, sending back to HQ")
} else {
logWarning.WithFields(c.genLogFields(err, req.URL, map[string]interface{}{
"sleepTime": sleepTime.String(),
"retryCount": retry,
"statusCode": resp.StatusCode,
})).Warn("URL is being rate limited")
}

continue
} else {
c.logCrawlSuccess(executionStart, resp.StatusCode, item)
break
}
}

c.logCrawlSuccess(executionStart, resp.StatusCode, item)

// If a redirection is catched, then we execute the redirection
if isStatusCodeRedirect(resp.StatusCode) {
if resp.Header.Get("location") == utils.URLToString(req.URL) || item.Redirect >= c.MaxRedirect {
Expand Down Expand Up @@ -153,7 +162,7 @@ func (c *Crawl) executeGET(item *frontier.Item, req *http.Request, isRedirection
}
}

newItem = frontier.NewItem(URL, item, item.Type, item.Hop, item.ID)
newItem = frontier.NewItem(URL, item, item.Type, item.Hop, item.ID, false)
newItem.Redirect = item.Redirect + 1

// Prepare GET request
Expand Down Expand Up @@ -239,7 +248,7 @@ func (c *Crawl) Capture(item *frontier.Item) {
telegram.TransformURL(item.URL)

// Then we create an item
embedItem := frontier.NewItem(item.URL, item, item.Type, item.Hop, item.ID)
embedItem := frontier.NewItem(item.URL, item, item.Type, item.Hop, item.ID, false)

// And capture it
c.Capture(embedItem)
Expand All @@ -251,6 +260,10 @@ func (c *Crawl) Capture(item *frontier.Item) {
resp, err = c.executeGET(item, req, false)
if err != nil && err.Error() == "URL from redirection has already been seen" {
return
} else if err != nil && err.Error() == "URL is being rate limited, sending back to HQ" {
c.HQFinishedChannel <- item
c.HQProducerChannel <- frontier.NewItem(item.URL, item.ParentItem, item.Type, item.Hop, "", true)
return
} else if err != nil {
logError.WithFields(c.genLogFields(err, item.URL, nil)).Error("error while executing GET request")
return
Expand Down Expand Up @@ -503,7 +516,7 @@ func (c *Crawl) Capture(item *frontier.Item) {
defer swg.Done()

// Create the asset's item
newAsset := frontier.NewItem(asset, item, "asset", item.Hop, "")
newAsset := frontier.NewItem(asset, item, "asset", item.Hop, "", false)

// Capture the asset
err = c.captureAsset(newAsset, resp.Cookies())
Expand Down
29 changes: 15 additions & 14 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"time"

"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/CorentinB/warc"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/paulbellamy/ratecounter"
"github.com/prometheus/client_golang/prometheus"
"github.com/remeh/sizedwaitgroup"
Expand Down Expand Up @@ -105,18 +105,19 @@ type Crawl struct {
WARCCustomCookie string

// Crawl HQ settings
UseHQ bool
HQAddress string
HQProject string
HQKey string
HQSecret string
HQStrategy string
HQBatchSize int
HQContinuousPull bool
HQClient *gocrawlhq.Client
HQFinishedChannel chan *frontier.Item
HQProducerChannel chan *frontier.Item
HQChannelsWg *sync.WaitGroup
UseHQ bool
HQAddress string
HQProject string
HQKey string
HQSecret string
HQStrategy string
HQBatchSize int
HQContinuousPull bool
HQClient *gocrawlhq.Client
HQFinishedChannel chan *frontier.Item
HQProducerChannel chan *frontier.Item
HQChannelsWg *sync.WaitGroup
HQRateLimitingSendBack bool
}

// Start fire up the crawling process
Expand Down
27 changes: 23 additions & 4 deletions internal/pkg/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/internetarchive/Zeno/internal/pkg/frontier"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -104,15 +104,34 @@ func (c *Crawl) HQProducer() {

// listen to the discovered channel and add the URLs to the discoveredArray
for discoveredItem := range c.HQProducerChannel {
var via string

if discoveredItem.ParentItem != nil {
via = utils.URLToString(discoveredItem.ParentItem.URL)
}

discoveredURL := gocrawlhq.URL{
Value: utils.URLToString(discoveredItem.URL),
Via: utils.URLToString(discoveredItem.ParentItem.URL),
Via: via,
}

for i := 0; uint8(i) < discoveredItem.Hop; i++ {
discoveredURL.Path += "L"
}

if discoveredItem.BypassSeencheck {
for {
_, err := c.HQClient.Discovered([]gocrawlhq.URL{discoveredURL}, "seed", true, false)
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, nil)).Errorln("error sending payload to crawl HQ, waiting 1s then retrying..")
time.Sleep(time.Second)
continue
}
break
}
continue
}

mutex.Lock()
discoveredArray = append(discoveredArray, discoveredURL)
mutex.Unlock()
Expand Down Expand Up @@ -169,7 +188,7 @@ func (c *Crawl) HQConsumer() {
})).Errorln("unable to parse URL received from crawl HQ, discarding")
}

c.Frontier.PushChan <- frontier.NewItem(newURL, nil, "seed", uint8(strings.Count(URL.Path, "L")), URL.ID)
c.Frontier.PushChan <- frontier.NewItem(newURL, nil, "seed", uint8(strings.Count(URL.Path, "L")), URL.ID, false)
}
}
}
Expand Down
Loading

0 comments on commit b17ca8c

Please sign in to comment.