From 9d8257ee2f557b944c323a8b2b883a90b9c27bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Filipowicz?= Date: Fri, 21 Jun 2024 19:42:22 +0200 Subject: [PATCH] patched partition collector --- service/reader/service.go | 48 ++++++++++++++++----------------------- view/collector.go | 32 +++++++++++++++++++++++++- view/match.go | 4 ++-- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/service/reader/service.go b/service/reader/service.go index 479366fe..0f8f3062 100644 --- a/service/reader/service.go +++ b/service/reader/service.go @@ -53,7 +53,7 @@ func (s *Service) read(ctx context.Context, session *Session) error { } wg := sync.WaitGroup{} - collector := session.View.Collector(session.DataPtr, session.HandleViewMeta, session.View.MatchStrategy.SupportsParallel()) + collector := session.View.Collector(session.DataPtr, session.HandleViewMeta, session.View.MatchStrategy.ReadAll()) errors := shared.NewErrors(0) s.readAll(ctx, session, collector, &wg, errors, session.Parent) @@ -491,11 +491,10 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi repeat := max(1, len(partitions.Partitions)) var executions []*response.SQLExecution var mux sync.Mutex - xSlice := collector.View().Schema.Slice() var err error var rateLimit = make(chan bool, concurrency) - var results = make([]*reflect.Value, 0, repeat) + var collectors = make([]*view.Collector, repeat) for i := 0; i < repeat; i++ { clone := *partitions wg.Add(1) @@ -508,16 +507,10 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi if index < len(partitions.Partitions) { partitions.Partition = partitions.Partitions[index] } - parametrizedSQL, columnInMatcher, e := s.buildParametrizedSQL(ctx, aView, selector, batchData, collector, session, partitions) + + collectors[index] = collector.Clone() + parametrizedSQL, columnInMatcher, e := s.buildParametrizedSQL(ctx, aView, selector, batchData, collectors[index], session, partitions) readData := 0 - if e != nil { - err = e - fmt.Printf("error: %v\n", err) - return - } - localSlice := reflect.New(aView.Schema.SliceType()) - slicePtr := unsafe.Pointer(localSlice.Pointer()) - appender := xSlice.Appender(slicePtr) handler := func(row interface{}) error { row, err = aView.UnwrapDatabaseType(ctx, row) if err != nil { @@ -529,45 +522,44 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi return err } } - appender.Append(row) return nil } exec, e := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData) mux.Lock() - defer mux.Unlock() if exec != nil { executions = append(executions, exec...) } + mux.Unlock() if e != nil { err = e } - if readData > 0 { - results = append(results, &localSlice) - } }(i, &clone) if err != nil { break } } wg.Wait() - if len(results) == 0 || err != nil { + if len(collectors) == 0 || err != nil { return executions, err } - result := results[0] - for i := 1; i < len(results); i++ { - second := results[i] - merged := combineSlices(result.Elem().Interface(), second.Elem().Interface()) - result.Elem().Set(reflect.ValueOf(merged)) + + result := collectors[0] + + for i := 1; i < len(collectors); i++ { + second := collectors[i] + merged := combineSlices(result.Dest(), second.Dest()) + result.SetDest(merged) } + if newReducer, ok := partitioner.(view.ReducerProvider); ok { reducer := newReducer.Reducer(ctx) - reduced := reducer.Reduce(result.Elem().Interface()) - result.Elem().Set(reflect.ValueOf(reduced)) + reduced := reducer.Reduce(result.Dest()) + collector.SetDest(reduced) } - values := result.Elem() - for i := 0; i < values.Len(); i++ { - value := values.Index(i).Interface() + resultValue := reflect.ValueOf(result.Dest()) + for i := 0; i < resultValue.Len(); i++ { + value := resultValue.Index(i).Interface() if err := visitor(value); err != nil { return executions, err } diff --git a/view/collector.go b/view/collector.go index 40a485ed..62441aa5 100644 --- a/view/collector.go +++ b/view/collector.go @@ -47,6 +47,36 @@ type Collector struct { viewMetaHandler viewSummaryHandlerFn } +func (r *Collector) SetDest(dest interface{}) { + r.dest = dest + r.appender = r.slice.Appender(xunsafe.AsPointer(dest)) +} + +func (r *Collector) Clone() *Collector { + dest := reflect.MakeSlice(r.view.Schema.SliceType(), 0, 1).Interface() + return &Collector{ + parent: r.parent, + dest: dest, + appender: r.slice.Appender(xunsafe.AsPointer(dest)), + valuePosition: r.valuePosition, + types: r.types, + relation: r.relation, + values: r.values, + slice: r.slice, + view: r.view, + relations: r.relations, + wg: r.wg, + readAll: r.readAll, + wgDelta: r.wgDelta, + indexCounter: r.indexCounter, + manyCounter: r.manyCounter, + codecSlice: r.codecSlice, + codecSliceDest: r.codecSliceDest, + codecAppender: r.codecAppender, + viewMetaHandler: r.viewMetaHandler, + } +} + func (r *Collector) Lock() *sync.Mutex { if r.parent == nil { return &r.mutex @@ -401,7 +431,7 @@ func (r *Collector) Relations(selector *Statelet) ([]*Collector, error) { slice: slice, view: &r.view.With[i].Of.View, relation: r.view.With[i], - readAll: r.view.With[i].Of.MatchStrategy.SupportsParallel(), + readAll: r.view.With[i].Of.MatchStrategy.ReadAll(), wg: &wg, wgDelta: delta, } diff --git a/view/match.go b/view/match.go index 0222c11e..880f9299 100644 --- a/view/match.go +++ b/view/match.go @@ -17,8 +17,8 @@ func (s MatchStrategy) Validate() error { return fmt.Errorf("unsupported match strategy %v", s) } -// SupportsParallel indicates whether MatchStrategy support parallel read. -func (s MatchStrategy) SupportsParallel() bool { +// ReadAll indicates whether MatchStrategy support parallel read. +func (s MatchStrategy) ReadAll() bool { return s == ReadAll }