Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/cover: parallelize filesCoverageWithDetails #5714

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 57 additions & 30 deletions pkg/cover/heatmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ where
return stmt
}

func readCoverage(iterManager spannerclient.RowIterator) ([]*fileCoverageWithDetails, error) {
func readCoverage(ctx context.Context, iterManager spannerclient.RowIterator) ([]*fileCoverageWithDetails, error) {
res := []*fileCoverageWithDetails{}
ch := make(chan *fileCoverageWithDetails)
var err error
go func() {
defer close(ch)
err = readIterToChan(context.Background(), iterManager, ch)
err = readIterToChan(ctx, iterManager, ch)
}()
for fc := range ch {
res = append(res, fc)
Expand All @@ -231,18 +231,18 @@ func readCoverage(iterManager spannerclient.RowIterator) ([]*fileCoverageWithDet

// Unique coverage from specific manager is more expensive to get.
// We get unique coverage comparing manager and total coverage on the AppEngine side.
func readCoverageUniq(full, mgr spannerclient.RowIterator,
func readCoverageUniq(ctx context.Context, full, mgr spannerclient.RowIterator,
) ([]*fileCoverageWithDetails, error) {
eg, ctx := errgroup.WithContext(context.Background())
eg, egCtx := errgroup.WithContext(ctx)
fullCh := make(chan *FileCoverageWithLineInfo)
eg.Go(func() error {
defer close(fullCh)
return readIterToChan(ctx, full, fullCh)
return readIterToChan(egCtx, full, fullCh)
})
partCh := make(chan *FileCoverageWithLineInfo)
eg.Go(func() error {
defer close(partCh)
return readIterToChan(ctx, mgr, partCh)
return readIterToChan(egCtx, mgr, partCh)
})
res := []*fileCoverageWithDetails{}
eg.Go(func() error {
Expand Down Expand Up @@ -346,33 +346,60 @@ func readIterToChan[K FileCoverageWithLineInfo | fileCoverageWithDetails](
func filesCoverageWithDetails(
ctx context.Context, client spannerclient.SpannerClient, scope *SelectScope, onlyUnique bool,
) ([]*fileCoverageWithDetails, error) {
var res []*fileCoverageWithDetails
for _, timePeriod := range scope.Periods {
needLinesDetails := onlyUnique
iterManager := client.Single().Query(ctx,
filesCoverageWithDetailsStmt(scope.Ns, scope.Subsystem, scope.Manager, timePeriod, needLinesDetails))
defer iterManager.Stop()
resCh := make(chan *fileCoverageWithDetails)
var egErr error
go func() {
defer close(resCh)
eg, egCtx := errgroup.WithContext(ctx)
for _, timePeriod := range scope.Periods {
eg.Go(func() error {
needLinesDetails := onlyUnique
iterManager := client.Single().Query(egCtx,
filesCoverageWithDetailsStmt(scope.Ns, scope.Subsystem, scope.Manager, timePeriod, needLinesDetails))
defer iterManager.Stop()

var err error
var periodRes []*fileCoverageWithDetails
if onlyUnique {
iterAll := client.Single().Query(ctx,
filesCoverageWithDetailsStmt(scope.Ns, scope.Subsystem, "", timePeriod, needLinesDetails))
defer iterAll.Stop()
periodRes, err = readCoverageUniq(iterAll, iterManager)
if err != nil {
return nil, fmt.Errorf("uniqueFilesCoverageWithDetails: %w", err)
}
} else {
periodRes, err = readCoverage(iterManager)
if err != nil {
return nil, fmt.Errorf("readCoverage: %w", err)
}
var err error
var periodRes []*fileCoverageWithDetails
if onlyUnique {
iterAll := client.Single().Query(egCtx,
filesCoverageWithDetailsStmt(scope.Ns, scope.Subsystem, "", timePeriod, needLinesDetails))
defer iterAll.Stop()
periodRes, err = readCoverageUniq(egCtx, iterAll, iterManager)
if err != nil {
return fmt.Errorf("uniqueFilesCoverageWithDetails: %w", err)
}
} else {
periodRes, err = readCoverage(egCtx, iterManager)
if err != nil {
return fmt.Errorf("readCoverage: %w", err)
}
}
for _, r := range periodRes {
r.TimePeriod = timePeriod
resCh <- r
}
return nil
})
}
for _, r := range periodRes {
r.TimePeriod = timePeriod
egErr = eg.Wait()
}()

var res []*fileCoverageWithDetails
done := false
for !done {
select {
case fcwd := <-resCh:
if fcwd == nil {
done = true
break
}
res = append(res, fcwd)
case <-ctx.Done():
return nil, fmt.Errorf("external context terminated")
}
res = append(res, periodRes...)
}
if egErr != nil {
return nil, egErr
}
return res, nil
}
Expand Down
Loading