diff --git a/modules/frontend/query_range_sharding.go b/modules/frontend/query_range_sharding.go index 5ea937ef7ab..d69bf4d799f 100644 --- a/modules/frontend/query_range_sharding.go +++ b/modules/frontend/query_range_sharding.go @@ -19,6 +19,7 @@ import ( "github.com/gogo/protobuf/jsonpb" //nolint:all deprecated "github.com/grafana/dskit/user" "github.com/opentracing/opentracing-go" + "go.uber.org/atomic" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/querier" @@ -133,13 +134,19 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) { totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, reqCh, stopCh) wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests)) + jobErr := atomic.Error{} c := traceql.QueryRangeCombiner{} mtx := sync.Mutex{} startedReqs := 0 for job := range reqCh { if job.err != nil { - return nil, fmt.Errorf("unexpected err building reqs: %w", job.err) + jobErr.Store(fmt.Errorf("unexpected err building reqs: %w", job.err)) + break + } + + if jErr := jobErr.Load(); jErr != nil { + break } // When we hit capacity of boundedwaitgroup, wg.Add will block @@ -166,14 +173,13 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) { // if the status code is anything but happy, save the error and pass it down the line if resp.StatusCode != http.StatusOK { - /*statusCode := resp.StatusCode bytesMsg, err := io.ReadAll(resp.Body) if err != nil { _ = level.Error(s.logger).Log("msg", "error reading response body status != ok", "url", innerR.RequestURI, "err", err) } - statusMsg := fmt.Sprintf("upstream: (%d) %s", statusCode, string(bytesMsg)) - progress.setStatus(statusCode, statusMsg) - */ + statusMsg := fmt.Sprintf("upstream: (%d) %s", resp.StatusCode, string(bytesMsg)) + jobErr.Store(fmt.Errorf(statusMsg)) + /* progress.setStatus(statusCode, statusMsg) */ return } @@ -221,6 +227,10 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) { span.SetTag("finishedJobs", res.Metrics.CompletedJobs) span.SetTag("requestThroughput", throughput) + if jErr := jobErr.Load(); jErr != nil { + return s.respErrHandler(isProm, jErr) + } + var bodyString string if isProm { promResp := s.convertToPromFormat(res)