diff --git a/cmd/get.go b/cmd/get.go index 6cd00e6c..d533a261 100644 --- a/cmd/get.go +++ b/cmd/get.go @@ -41,7 +41,8 @@ func getCMDsFlags(getCmd *cobra.Command) { getCmd.PersistentFlags().String("prometheus-prefix", "zeno:", "String used as a prefix for the exported Prometheus metrics.") getCmd.PersistentFlags().Int("max-redirect", 20, "Specifies the maximum number of redirections to follow for a resource.") getCmd.PersistentFlags().Int("max-retry", 5, "Number of retry if error happen when executing HTTP request.") - getCmd.PersistentFlags().Int("http-timeout", -1, "Number of seconds to wait before timing out a request.") + getCmd.PersistentFlags().Int("http-timeout", -1, "Number of seconds to wait before timing out a request. Note: this will CANCEL large files download.") + getCmd.PersistentFlags().Int("http-read-deadline", 60, "Number of seconds to wait before timing out a (blocking) read.") getCmd.PersistentFlags().StringSlice("domains-crawl", []string{}, "Naive domains, full URLs or regexp to match against any URL to determine hop behaviour for outlinks. If an outlink URL is matched it will be queued to crawl with a hop of 0. This flag helps crawling entire domains while doing non-focused crawls.") getCmd.PersistentFlags().StringSlice("disable-html-tag", []string{}, "Specify HTML tag to not extract assets from") getCmd.PersistentFlags().Bool("capture-alternate-pages", false, "If turned on, HTML tags with \"alternate\" values for their \"rel\" attribute will be archived.") diff --git a/go.mod b/go.mod index da6062d3..89093e7a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/internetarchive/Zeno go 1.23.5 require ( - github.com/CorentinB/warc v0.8.67 + github.com/CorentinB/warc v0.8.68 github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 github.com/PuerkitoBio/goquery v1.10.1 github.com/ada-url/goada v0.0.0-20250104020233-00cbf4dc9da1 diff --git a/go.sum b/go.sum index 55102804..44aa03b9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/CorentinB/warc v0.8.67 h1:wW8Wi+X83RYKyPMS+DxpC8Jz5n4VwSd3yr0Jc9r/qmU= -github.com/CorentinB/warc v0.8.67/go.mod h1:A9Ds2kT59j2Bzbe5pDZ925XmVODwq9fAlmSSS45SRlk= +github.com/CorentinB/warc v0.8.68 h1:WJkIuMr+oLhzAAhcjpPpEuzlWiSXUUl//WqofswH0Rg= +github.com/CorentinB/warc v0.8.68/go.mod h1:A9Ds2kT59j2Bzbe5pDZ925XmVODwq9fAlmSSS45SRlk= github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 h1:ClzzXMDDuUbWfNNZqGeYq4PnYOlwlOVIvSyNaIy0ykg= github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3/go.mod h1:we0YA5CsBbH5+/NUzC/AlMmxaDtWlXeNsqrwXjTzmzA= github.com/PuerkitoBio/goquery v1.10.1 h1:Y8JGYUkXWTGRB6Ars3+j3kN0xg1YqqlwvdTV8WTFQcU= diff --git a/internal/pkg/archiver/body.go b/internal/pkg/archiver/body.go index 1e1e613a..8cf2ebf2 100644 --- a/internal/pkg/archiver/body.go +++ b/internal/pkg/archiver/body.go @@ -4,42 +4,46 @@ import ( "bytes" "io" "strings" + "time" "github.com/CorentinB/warc/pkg/spooledtempfile" "github.com/gabriel-vasile/mimetype" + "github.com/internetarchive/Zeno/internal/pkg/config" "github.com/internetarchive/Zeno/pkg/models" ) +// ProcessBody processes the body of a URL response, loading it into memory or a temporary file func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops int, WARCTempDir string) error { defer u.GetResponse().Body.Close() // Ensure the response body is closed - // If we are not capturing assets nor do we want to extract outlinks (and domains crawl is disabled) - // we can just consume the body and discard it + // Retrieve the underlying TCP connection and apply a 10s read deadline + conn, ok := u.GetResponse().Body.(interface{ SetReadDeadline(time.Time) error }) + if ok { + conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline))) + } + + // If we are not capturing assets, not extracting outlinks, and domains crawl is disabled + // we can just consume and discard the body if disableAssetsCapture && !domainsCrawl && maxHops == 0 { - // Read the rest of the body but discard it - _, err := io.Copy(io.Discard, u.GetResponse().Body) - if err != nil { + if err := copyWithTimeout(io.Discard, u.GetResponse().Body, conn); err != nil { return err } } - // Create a buffer to hold the body + // Create a buffer to hold the body (first 2KB) buffer := new(bytes.Buffer) - _, err := io.CopyN(buffer, u.GetResponse().Body, 2048) - if err != nil && err != io.EOF { + if err := copyWithTimeoutN(buffer, u.GetResponse().Body, 2048, conn); err != nil { return err } - // We do not use http.DetectContentType because it only supports - // a limited number of MIME types, those commonly found in web. + // Detect and set MIME type u.SetMIMEType(mimetype.Detect(buffer.Bytes())) - // Check if the MIME type is one that we post-process - if (u.GetMIMEType().Parent() != nil && - u.GetMIMEType().Parent().String() == "text/plain") || + // Check if the MIME type requires post-processing + if (u.GetMIMEType().Parent() != nil && u.GetMIMEType().Parent().String() == "text/plain") || strings.Contains(u.GetMIMEType().String(), "text/") { - // Create a spooled temp file, that is a ReadWriteSeeker that writes to a temporary file - // when the in-memory buffer exceeds a certain size. (here, 2MB) + + // Create a temp file with a 2MB memory buffer spooledBuff := spooledtempfile.NewSpooledTempFile("zeno", WARCTempDir, 2097152, false, -1) _, err := io.Copy(spooledBuff, buffer) if err != nil { @@ -50,9 +54,8 @@ func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops return err } - // Read the rest of the body and set it in SetBody() - _, err = io.Copy(spooledBuff, u.GetResponse().Body) - if err != nil && err != io.EOF { + // Read the rest of the body into the spooled buffer + if err := copyWithTimeout(spooledBuff, u.GetResponse().Body, conn); err != nil { closeErr := spooledBuff.Close() if closeErr != nil { panic(closeErr) @@ -66,11 +69,47 @@ func ProcessBody(u *models.URL, disableAssetsCapture, domainsCrawl bool, maxHops return nil } else { // Read the rest of the body but discard it - _, err := io.Copy(io.Discard, u.GetResponse().Body) + if err := copyWithTimeout(io.Discard, u.GetResponse().Body, conn); err != nil { + return err + } + } + + return nil +} + +// copyWithTimeout copies data and resets the read deadline after each successful read +func copyWithTimeout(dst io.Writer, src io.Reader, conn interface{ SetReadDeadline(time.Time) error }) error { + buf := make([]byte, 4096) + for { + n, err := src.Read(buf) + if n > 0 { + // Reset the deadline after each successful read + if conn != nil { + conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline))) + } + if _, writeErr := dst.Write(buf[:n]); writeErr != nil { + return writeErr + } + } if err != nil { + if err == io.EOF { + break + } return err } } + return nil +} +// copyWithTimeoutN copies a limited number of bytes and applies the timeout +func copyWithTimeoutN(dst io.Writer, src io.Reader, n int64, conn interface{ SetReadDeadline(time.Time) error }) error { + _, err := io.CopyN(dst, src, n) + if err != nil && err != io.EOF { + return err + } + // Reset deadline after partial read + if conn != nil { + conn.SetReadDeadline(time.Now().Add(time.Duration(config.Get().HTTPReadDeadline))) + } return nil } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index a10c90bd..dcec199d 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -63,6 +63,7 @@ type Config struct { MaxRedirect int `mapstructure:"max-redirect"` MaxRetry int `mapstructure:"max-retry"` HTTPTimeout int `mapstructure:"http-timeout"` + HTTPReadDeadline int `mapstructure:"http-read-deadline"` CrawlTimeLimit int `mapstructure:"crawl-time-limit"` CrawlMaxTimeLimit int `mapstructure:"crawl-max-time-limit"` MinSpaceRequired int `mapstructure:"min-space-required"`