Skip to content

Commit

Permalink
Merge pull request #14 from PuerkitoBio/cancel
Browse files Browse the repository at this point in the history
add Queue.Cancel to quickly stop a fetcher, closes #10
  • Loading branch information
mna committed Jul 27, 2015
2 parents 3bf8bb0 + 2ec4a1f commit 4587cf0
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 19 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The [API documentation is available on godoc.org](http://godoc.org/github.com/Pu

## Changes

* 2015-07-25 : add `Cancel` method on the `Queue`, to close and drain without requesting any pending commands, unlike `Close` that waits for all pending commands to be processed (thanks to [@buro9][buro9] for the feature request).
* 2015-07-24 : add `HandlerCmd` and call the Command's `Handler` function if it implements the `Handler` interface, bypassing the `Fetcher`'s handler. Support a `Custom` matcher on the `Mux`, using a predicate. (thanks to [@mmcdole][mmcdole] for the feature requests).
* 2015-06-18 : add `Scheme` criteria on the muxer (thanks to [@buro9][buro9]).
* 2015-06-10 : add `DisablePoliteness` field on the `Fetcher` to optionally bypass robots.txt checks (thanks to [@oli-g][oli]).
Expand Down
50 changes: 39 additions & 11 deletions example/full/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ var (
dup = map[string]bool{}

// Command-line flags
seed = flag.String("seed", "http://golang.org", "seed URL")
stopAfter = flag.Duration("stopafter", 0, "automatically stop the fetchbot after a given time")
stopAtURL = flag.String("stopat", "", "automatically stop the fetchbot at a given URL")
memStats = flag.Duration("memstats", 0, "display memory statistics at a given interval")
seed = flag.String("seed", "http://golang.org", "seed URL")
cancelAfter = flag.Duration("cancelafter", 0, "automatically cancel the fetchbot after a given time")
cancelAtURL = flag.String("cancelat", "", "automatically cancel the fetchbot at a given URL")
stopAfter = flag.Duration("stopafter", 0, "automatically stop the fetchbot after a given time")
stopAtURL = flag.String("stopat", "", "automatically stop the fetchbot at a given URL")
memStats = flag.Duration("memstats", 0, "display memory statistics at a given interval")
)

func main() {
Expand Down Expand Up @@ -71,10 +73,15 @@ func main() {

// Create the Fetcher, handle the logging first, then dispatch to the Muxer
h := logHandler(mux)
if *stopAtURL != "" {
h = stopHandler(*stopAtURL, logHandler(mux))
if *stopAtURL != "" || *cancelAtURL != "" {
stopURL := *stopAtURL
if *cancelAtURL != "" {
stopURL = *cancelAtURL
}
h = stopHandler(stopURL, *cancelAtURL != "", logHandler(mux))
}
f := fetchbot.New(h)

// First mem stat print must be right after creating the fetchbot
if *memStats > 0 {
// Print starting stats
Expand All @@ -87,15 +94,27 @@ func main() {
printMemStats(nil)
}()
}

// Start processing
q := f.Start()
if *stopAfter > 0 {

// if a stop or cancel is requested after some duration, launch the goroutine
// that will stop or cancel.
if *stopAfter > 0 || *cancelAfter > 0 {
after := *stopAfter
stopFunc := q.Close
if *cancelAfter != 0 {
after = *cancelAfter
stopFunc = q.Cancel
}

go func() {
c := time.After(*stopAfter)
c := time.After(after)
<-c
q.Close()
stopFunc()
}()
}

// Enqueue the seed, which is the first entry in the dup map
dup[*seed] = true
_, err = q.SendStringGet(*seed)
Expand Down Expand Up @@ -147,10 +166,19 @@ func printMemStats(di *fetchbot.DebugInfo) {

// stopHandler stops the fetcher if the stopurl is reached. Otherwise it dispatches
// the call to the wrapped Handler.
func stopHandler(stopurl string, wrapped fetchbot.Handler) fetchbot.Handler {
func stopHandler(stopurl string, cancel bool, wrapped fetchbot.Handler) fetchbot.Handler {
return fetchbot.HandlerFunc(func(ctx *fetchbot.Context, res *http.Response, err error) {
if ctx.Cmd.URL().String() == stopurl {
ctx.Q.Close()
fmt.Printf(">>>>> STOP URL %s\n", ctx.Cmd.URL())
// generally not a good idea to stop/block from a handler goroutine
// so do it in a separate goroutine
go func() {
if cancel {
ctx.Q.Cancel()
} else {
ctx.Q.Close()
}
}()
return
}
wrapped.Handle(ctx, res, err)
Expand Down
49 changes: 43 additions & 6 deletions fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ func New(h Handler) *Fetcher {
// Queue offers methods to send Commands to the Fetcher, and to Stop the crawling process.
// It is safe to use from concurrent goroutines.
type Queue struct {
ch chan Command
closed, done chan struct{}
wg sync.WaitGroup
ch chan Command

// signal channels
closed, cancelled, done chan struct{}

wg sync.WaitGroup
}

// Close closes the Queue so that no more Commands can be sent. It blocks until
Expand Down Expand Up @@ -153,6 +156,22 @@ func (q *Queue) Block() {
<-q.done
}

// Cancel closes the Queue and drains the pending commands without processing
// them, allowing for a fast "stop immediately"-ish operation.
func (q *Queue) Cancel() error {
select {
case <-q.cancelled:
// already cancelled, no-op
return nil
default:
// mark the queue as cancelled
close(q.cancelled)
// Close the Queue, that will wait for pending commands to drain
// will unblock any callers waiting on q.Block
return q.Close()
}
}

// Send enqueues a Command into the Fetcher. If the Queue has been closed, it
// returns ErrQueueClosed. The Command's URL must have a Host.
func (q *Queue) Send(c Command) error {
Expand Down Expand Up @@ -213,9 +232,10 @@ func (f *Fetcher) Start() *Queue {
f.hosts = make(map[string]chan Command)

f.q = &Queue{
ch: make(chan Command, 1),
closed: make(chan struct{}),
done: make(chan struct{}),
ch: make(chan Command, 1),
closed: make(chan struct{}),
cancelled: make(chan struct{}),
done: make(chan struct{}),
}

// Start the one and only queue processing goroutine.
Expand Down Expand Up @@ -249,6 +269,13 @@ loop:
// Keep going
}
}
select {
case <-f.q.cancelled:
// queue got cancelled, drain
continue
default:
// go on
}

// Get the URL to enqueue
u := v.URL()
Expand Down Expand Up @@ -326,6 +353,8 @@ func (f *Fetcher) processChan(ch <-chan Command, hostKey string) {
loop:
for {
select {
case <-f.q.cancelled:
break loop
case v, ok := <-ch:
if !ok {
// Terminate this goroutine, channel is closed
Expand All @@ -337,6 +366,14 @@ loop:
<-wait
}

// was it cancelled during the wait? check again
select {
case <-f.q.cancelled:
break loop
default:
// go on
}

switch r, ok := v.(robotCommand); {
case ok:
// This is the robots.txt request
Expand Down
49 changes: 47 additions & 2 deletions fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -412,7 +413,7 @@ Crawl-delay: 1
t.Errorf("expected no errors, got %d", cnt)
}
// Assert that the total elapsed time is around 2 seconds
if delay < 2*time.Second || delay > (2*time.Second+10*time.Millisecond) {
if delay < 2*time.Second || delay > (2*time.Second+100*time.Millisecond) {
t.Errorf("expected delay to be around 2s, got %s", delay)
}
}
Expand Down Expand Up @@ -644,13 +645,15 @@ func TestOverflowBuffer(t *testing.T) {
}))
defer srv.Close()
cases := []string{srv.URL + "/a", srv.URL + "/b", srv.URL + "/c", srv.URL + "/d", srv.URL + "/e", srv.URL + "/f"}
signal := make(chan struct{})
sh := &spyHandler{fn: HandlerFunc(func(ctx *Context, res *http.Response, err error) {
if ctx.Cmd.URL().Path == "/a" {
// Enqueue a bunch, while this host's goroutine is busy waiting for this call
_, err := ctx.Q.SendStringGet(cases[1:]...)
if err != nil {
t.Fatal(err)
}
close(signal)
}
})}
f := New(sh)
Expand All @@ -660,7 +663,7 @@ func TestOverflowBuffer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
<-signal
q.Close()
// Assert that the handler got called with the right values
if ok := sh.CalledWithExactly(cases...); !ok {
Expand All @@ -671,3 +674,45 @@ func TestOverflowBuffer(t *testing.T) {
t.Errorf("expected no errors, got %d", cnt)
}
}

func TestCancel(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
}))
defer srv.Close()
allowHandler := make(chan struct{})
allowCancel := make(chan struct{})
sh := &spyHandler{fn: HandlerFunc(func(ctx *Context, res *http.Response, err error) {
// allow cancel as soon as /0 is received
<-allowHandler
if res.Request.URL.Path == "/0" {
close(allowCancel)
}
})}

f := New(sh)
f.CrawlDelay = time.Second
f.DisablePoliteness = true
q := f.Start()
// enqueue a bunch of URLs
for i := 0; i < 1000; i++ {
_, err := q.SendStringGet(srv.URL + "/" + strconv.Itoa(i))
if err != nil {
t.Fatal(err)
}
}
// allow one to proceed
close(allowHandler)
// wait for cancel signal
<-allowCancel
q.Cancel()

// Assert that the handler got called with the right values
if ok := sh.CalledWithExactly(srv.URL + "/0"); !ok {
t.Error("expected handler to be called only with /0")
}
// Assert that there was no error
if cnt := sh.Errors(); cnt > 0 {
t.Errorf("expected no errors, got %d", cnt)
}
}

0 comments on commit 4587cf0

Please sign in to comment.