Skip to content

Commit

Permalink
add: HTTP read deadline & bump warc lib to v0.8.68
Browse files Browse the repository at this point in the history
  • Loading branch information
CorentinB committed Jan 31, 2025
1 parent 96d0204 commit 91071ea
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 23 deletions.
3 changes: 2 additions & 1 deletion cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func getCMDsFlags(getCmd *cobra.Command) {
getCmd.PersistentFlags().String("prometheus-prefix", "zeno:", "String used as a prefix for the exported Prometheus metrics.")
getCmd.PersistentFlags().Int("max-redirect", 20, "Specifies the maximum number of redirections to follow for a resource.")
getCmd.PersistentFlags().Int("max-retry", 5, "Number of retry if error happen when executing HTTP request.")
getCmd.PersistentFlags().Int("http-timeout", -1, "Number of seconds to wait before timing out a request.")
getCmd.PersistentFlags().Int("http-timeout", -1, "Number of seconds to wait before timing out a request. Note: this will CANCEL large files download.")
getCmd.PersistentFlags().Int("http-read-deadline", 60, "Number of seconds to wait before timing out a (blocking) read.")
getCmd.PersistentFlags().StringSlice("domains-crawl", []string{}, "Naive domains, full URLs or regexp to match against any URL to determine hop behaviour for outlinks. If an outlink URL is matched it will be queued to crawl with a hop of 0. This flag helps crawling entire domains while doing non-focused crawls.")
getCmd.PersistentFlags().StringSlice("disable-html-tag", []string{}, "Specify HTML tag to not extract assets from")
getCmd.PersistentFlags().Bool("capture-alternate-pages", false, "If turned on, <link> HTML tags with \"alternate\" values for their \"rel\" attribute will be archived.")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/internetarchive/Zeno
go 1.23.5

require (
github.com/CorentinB/warc v0.8.67
github.com/CorentinB/warc v0.8.68
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3
github.com/PuerkitoBio/goquery v1.10.1
github.com/ada-url/goada v0.0.0-20250104020233-00cbf4dc9da1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/CorentinB/warc v0.8.67 h1:wW8Wi+X83RYKyPMS+DxpC8Jz5n4VwSd3yr0Jc9r/qmU=
github.com/CorentinB/warc v0.8.67/go.mod h1:A9Ds2kT59j2Bzbe5pDZ925XmVODwq9fAlmSSS45SRlk=
github.com/CorentinB/warc v0.8.68 h1:WJkIuMr+oLhzAAhcjpPpEuzlWiSXUUl//WqofswH0Rg=
github.com/CorentinB/warc v0.8.68/go.mod h1:A9Ds2kT59j2Bzbe5pDZ925XmVODwq9fAlmSSS45SRlk=
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg=
github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3/go.mod h1:we0YA5CsBbH5+/NUzC/AlMmxaDtWlXeNsqrwXjTzmzA=
github.com/PuerkitoBio/goquery v1.10.1 h1:Y8JGYUkXWTGRB6Ars3+j3kN0xg1YqqlwvdTV8WTFQcU=
Expand Down
77 changes: 58 additions & 19 deletions internal/pkg/archiver/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,46 @@ import (
"bytes"
"io"
"strings"
"time"

"github.com/CorentinB/warc/pkg/spooledtempfile"
"github.com/gabriel-vasile/mimetype"
"github.com/internetarchive/Zeno/internal/pkg/config"
"github.com/internetarchive/Zeno/pkg/models"
)

// ProcessBody processes the body of a URL response, loading it into memory or a temporary file
func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops int, WARCTempDir string) error {
defer u.GetResponse().Body.Close() // Ensure the response body is closed

// If we are not capturing assets nor do we want to extract outlinks (and domains crawl is disabled)
// we can just consume the body and discard it
// Retrieve the underlying TCP connection and apply a 10s read deadline
conn, ok := u.GetResponse().Body.(interface{ SetReadDeadline(time.Time) error })
if ok {
conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline)))
}

// If we are not capturing assets, not extracting outlinks, and domains crawl is disabled
// we can just consume and discard the body
if disableAssetsCapture && !domainsCrawl && maxHops == 0 {
// Read the rest of the body but discard it
_, err := io.Copy(io.Discard, u.GetResponse().Body)
if err != nil {
if err := copyWithTimeout(io.Discard, u.GetResponse().Body, conn); err != nil {
return err
}
}

// Create a buffer to hold the body
// Create a buffer to hold the body (first 2KB)
buffer := new(bytes.Buffer)
_, err := io.CopyN(buffer, u.GetResponse().Body, 2048)
if err != nil && err != io.EOF {
if err := copyWithTimeoutN(buffer, u.GetResponse().Body, 2048, conn); err != nil {
return err
}

// We do not use http.DetectContentType because it only supports
// a limited number of MIME types, those commonly found in web.
// Detect and set MIME type
u.SetMIMEType(mimetype.Detect(buffer.Bytes()))

// Check if the MIME type is one that we post-process
if (u.GetMIMEType().Parent() != nil &&
u.GetMIMEType().Parent().String() == "text/plain") ||
// Check if the MIME type requires post-processing
if (u.GetMIMEType().Parent() != nil && u.GetMIMEType().Parent().String() == "text/plain") ||
strings.Contains(u.GetMIMEType().String(), "text/") {
// Create a spooled temp file, that is a ReadWriteSeeker that writes to a temporary file
// when the in-memory buffer exceeds a certain size. (here, 2MB)

// Create a temp file with a 2MB memory buffer
spooledBuff := spooledtempfile.NewSpooledTempFile("zeno", WARCTempDir, 2097152, false, -1)
_, err := io.Copy(spooledBuff, buffer)
if err != nil {
Expand All @@ -50,9 +54,8 @@ func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops
return err
}

// Read the rest of the body and set it in SetBody()
_, err = io.Copy(spooledBuff, u.GetResponse().Body)
if err != nil && err != io.EOF {
// Read the rest of the body into the spooled buffer
if err := copyWithTimeout(spooledBuff, u.GetResponse().Body, conn); err != nil {
closeErr := spooledBuff.Close()
if closeErr != nil {
panic(closeErr)
Expand All @@ -66,11 +69,47 @@ func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops
return nil
} else {
// Read the rest of the body but discard it
_, err := io.Copy(io.Discard, u.GetResponse().Body)
if err := copyWithTimeout(io.Discard, u.GetResponse().Body, conn); err != nil {
return err
}
}

return nil
}

// copyWithTimeout copies data and resets the read deadline after each successful read
func copyWithTimeout(dst io.Writer, src io.Reader, conn interface{ SetReadDeadline(time.Time) error }) error {
buf := make([]byte, 4096)
for {
n, err := src.Read(buf)
if n > 0 {
// Reset the deadline after each successful read
if conn != nil {
conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline)))
}
if _, writeErr := dst.Write(buf[:n]); writeErr != nil {
return writeErr
}
}
if err != nil {
if err == io.EOF {
break
}
return err
}
}
return nil
}

// copyWithTimeoutN copies a limited number of bytes and applies the timeout
func copyWithTimeoutN(dst io.Writer, src io.Reader, n int64, conn interface{ SetReadDeadline(time.Time) error }) error {
_, err := io.CopyN(dst, src, n)
if err != nil && err != io.EOF {
return err
}
// Reset deadline after partial read
if conn != nil {
conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline)))
}
return nil
}
1 change: 1 addition & 0 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Config struct {
MaxRedirect int `mapstructure:"max-redirect"`
MaxRetry int `mapstructure:"max-retry"`
HTTPTimeout int `mapstructure:"http-timeout"`
HTTPReadDeadline int `mapstructure:"http-read-deadline"`
CrawlTimeLimit int `mapstructure:"crawl-time-limit"`
CrawlMaxTimeLimit int `mapstructure:"crawl-max-time-limit"`
MinSpaceRequired int `mapstructure:"min-space-required"`
Expand Down

0 comments on commit 91071ea

Please sign in to comment.