Skip to content

Commit

Permalink
Fix prom response in query range sharder (#3287)
Browse files Browse the repository at this point in the history
* Fix prom response in query range sharder

* Store the job.err in jobErr
  • Loading branch information
zalegrala authored Jan 12, 2024
1 parent 2ca7265 commit 5c932ab
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions modules/frontend/query_range_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/gogo/protobuf/jsonpb" //nolint:all deprecated
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"go.uber.org/atomic"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
Expand Down Expand Up @@ -133,13 +134,19 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, reqCh, stopCh)

wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
jobErr := atomic.Error{}
c := traceql.QueryRangeCombiner{}
mtx := sync.Mutex{}

startedReqs := 0
for job := range reqCh {
if job.err != nil {
return nil, fmt.Errorf("unexpected err building reqs: %w", job.err)
jobErr.Store(fmt.Errorf("unexpected err building reqs: %w", job.err))
break
}

if jErr := jobErr.Load(); jErr != nil {
break
}

// When we hit capacity of boundedwaitgroup, wg.Add will block
Expand All @@ -166,14 +173,13 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// if the status code is anything but happy, save the error and pass it down the line
if resp.StatusCode != http.StatusOK {
/*statusCode := resp.StatusCode
bytesMsg, err := io.ReadAll(resp.Body)
if err != nil {
_ = level.Error(s.logger).Log("msg", "error reading response body status != ok", "url", innerR.RequestURI, "err", err)
}
statusMsg := fmt.Sprintf("upstream: (%d) %s", statusCode, string(bytesMsg))
progress.setStatus(statusCode, statusMsg)
*/
statusMsg := fmt.Sprintf("upstream: (%d) %s", resp.StatusCode, string(bytesMsg))
jobErr.Store(fmt.Errorf(statusMsg))
/* progress.setStatus(statusCode, statusMsg) */
return
}

Expand Down Expand Up @@ -221,6 +227,10 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
span.SetTag("finishedJobs", res.Metrics.CompletedJobs)
span.SetTag("requestThroughput", throughput)

if jErr := jobErr.Load(); jErr != nil {
return s.respErrHandler(isProm, jErr)
}

var bodyString string
if isProm {
promResp := s.convertToPromFormat(res)
Expand Down

0 comments on commit 5c932ab

Please sign in to comment.