Skip to content

Commit

Permalink
patched partition collector
Browse files Browse the repository at this point in the history
  • Loading branch information
gopherus committed Jun 21, 2024
1 parent 0c550e3 commit 9d8257e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 31 deletions.
48 changes: 20 additions & 28 deletions service/reader/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *Service) read(ctx context.Context, session *Session) error {
}

wg := sync.WaitGroup{}
collector := session.View.Collector(session.DataPtr, session.HandleViewMeta, session.View.MatchStrategy.SupportsParallel())
collector := session.View.Collector(session.DataPtr, session.HandleViewMeta, session.View.MatchStrategy.ReadAll())
errors := shared.NewErrors(0)

s.readAll(ctx, session, collector, &wg, errors, session.Parent)
Expand Down Expand Up @@ -491,11 +491,10 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
repeat := max(1, len(partitions.Partitions))
var executions []*response.SQLExecution
var mux sync.Mutex
xSlice := collector.View().Schema.Slice()
var err error

var rateLimit = make(chan bool, concurrency)
var results = make([]*reflect.Value, 0, repeat)
var collectors = make([]*view.Collector, repeat)
for i := 0; i < repeat; i++ {
clone := *partitions
wg.Add(1)
Expand All @@ -508,16 +507,10 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
if index < len(partitions.Partitions) {
partitions.Partition = partitions.Partitions[index]
}
parametrizedSQL, columnInMatcher, e := s.buildParametrizedSQL(ctx, aView, selector, batchData, collector, session, partitions)

collectors[index] = collector.Clone()
parametrizedSQL, columnInMatcher, e := s.buildParametrizedSQL(ctx, aView, selector, batchData, collectors[index], session, partitions)
readData := 0
if e != nil {
err = e
fmt.Printf("error: %v\n", err)
return
}
localSlice := reflect.New(aView.Schema.SliceType())
slicePtr := unsafe.Pointer(localSlice.Pointer())
appender := xSlice.Appender(slicePtr)
handler := func(row interface{}) error {
row, err = aView.UnwrapDatabaseType(ctx, row)
if err != nil {
Expand All @@ -529,45 +522,44 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
return err
}
}
appender.Append(row)
return nil
}
exec, e := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData)
mux.Lock()
defer mux.Unlock()
if exec != nil {
executions = append(executions, exec...)
}
mux.Unlock()
if e != nil {
err = e
}
if readData > 0 {
results = append(results, &localSlice)
}
}(i, &clone)
if err != nil {
break
}
}
wg.Wait()
if len(results) == 0 || err != nil {
if len(collectors) == 0 || err != nil {
return executions, err
}
result := results[0]
for i := 1; i < len(results); i++ {
second := results[i]
merged := combineSlices(result.Elem().Interface(), second.Elem().Interface())
result.Elem().Set(reflect.ValueOf(merged))

result := collectors[0]

for i := 1; i < len(collectors); i++ {
second := collectors[i]
merged := combineSlices(result.Dest(), second.Dest())
result.SetDest(merged)
}

if newReducer, ok := partitioner.(view.ReducerProvider); ok {
reducer := newReducer.Reducer(ctx)
reduced := reducer.Reduce(result.Elem().Interface())
result.Elem().Set(reflect.ValueOf(reduced))
reduced := reducer.Reduce(result.Dest())
collector.SetDest(reduced)
}

values := result.Elem()
for i := 0; i < values.Len(); i++ {
value := values.Index(i).Interface()
resultValue := reflect.ValueOf(result.Dest())
for i := 0; i < resultValue.Len(); i++ {
value := resultValue.Index(i).Interface()
if err := visitor(value); err != nil {
return executions, err
}
Expand Down
32 changes: 31 additions & 1 deletion view/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@ type Collector struct {
viewMetaHandler viewSummaryHandlerFn
}

func (r *Collector) SetDest(dest interface{}) {
r.dest = dest
r.appender = r.slice.Appender(xunsafe.AsPointer(dest))
}

func (r *Collector) Clone() *Collector {
dest := reflect.MakeSlice(r.view.Schema.SliceType(), 0, 1).Interface()
return &Collector{
parent: r.parent,
dest: dest,
appender: r.slice.Appender(xunsafe.AsPointer(dest)),
valuePosition: r.valuePosition,
types: r.types,
relation: r.relation,
values: r.values,
slice: r.slice,
view: r.view,
relations: r.relations,
wg: r.wg,
readAll: r.readAll,
wgDelta: r.wgDelta,
indexCounter: r.indexCounter,
manyCounter: r.manyCounter,
codecSlice: r.codecSlice,
codecSliceDest: r.codecSliceDest,
codecAppender: r.codecAppender,
viewMetaHandler: r.viewMetaHandler,
}
}

func (r *Collector) Lock() *sync.Mutex {
if r.parent == nil {
return &r.mutex
Expand Down Expand Up @@ -401,7 +431,7 @@ func (r *Collector) Relations(selector *Statelet) ([]*Collector, error) {
slice: slice,
view: &r.view.With[i].Of.View,
relation: r.view.With[i],
readAll: r.view.With[i].Of.MatchStrategy.SupportsParallel(),
readAll: r.view.With[i].Of.MatchStrategy.ReadAll(),
wg: &wg,
wgDelta: delta,
}
Expand Down
4 changes: 2 additions & 2 deletions view/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func (s MatchStrategy) Validate() error {
return fmt.Errorf("unsupported match strategy %v", s)
}

// SupportsParallel indicates whether MatchStrategy support parallel read.
func (s MatchStrategy) SupportsParallel() bool {
// ReadAll indicates whether MatchStrategy support parallel read.
func (s MatchStrategy) ReadAll() bool {
return s == ReadAll
}

Expand Down

0 comments on commit 9d8257e

Please sign in to comment.