-
Notifications
You must be signed in to change notification settings - Fork 7
/
metastore.go
155 lines (120 loc) · 3.39 KB
/
metastore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package streams
import (
"sync"
"sync/atomic"
"unsafe"
)
// Metastore represents a metadata store.
//
// Metastore is only partially concurrency safe. Mark and PullAll
// can be used concurrently, but Mark and Pull cannot. This is done
// to avoid locks and improve performance.
type Metastore interface {
// Pull gets and clears the processors metadata.
Pull(Processor) (Metaitems, error)
// PullAll gets and clears all metadata.
PullAll() (map[Processor]Metaitems, error)
// Mark sets metadata for a processor.
Mark(Processor, Source, Metadata) error
}
// Metaitem represents the source metadata combination.
type Metaitem struct {
Source Source
Metadata Metadata
}
// Metaitems represents a slice of Metaitem pointers.
type Metaitems []*Metaitem
// Merge combines contents of two Metaitems objects, merging the Metadata where necessary.
func (m Metaitems) Merge(items Metaitems, strategy MetadataStrategy) Metaitems {
OUTER:
for _, newItem := range items {
for _, oldItem := range m {
if oldItem.Source == newItem.Source {
if oldItem.Metadata != nil {
oldItem.Metadata = oldItem.Metadata.Merge(newItem.Metadata, strategy)
}
continue OUTER
}
}
m = append(m, newItem)
}
return m
}
type metastore struct {
metadata *atomic.Value // map[Processor][]Metaitem
procMu sync.Mutex
}
// NewMetastore creates a new Metastore instance.
func NewMetastore() Metastore {
s := &metastore{
metadata: &atomic.Value{},
}
s.metadata.Store(&map[Processor]Metaitems{})
return s
}
// Pull gets and clears the processors metadata.
func (s *metastore) Pull(p Processor) (Metaitems, error) {
s.procMu.Lock()
meta := s.metadata.Load().(*map[Processor]Metaitems)
// We dont lock here as the Pump should be locked at this point
items, ok := (*meta)[p]
if ok {
delete(*meta, p)
s.procMu.Unlock()
return items, nil
}
s.procMu.Unlock()
return nil, nil
}
// PullAll gets and clears all metadata.
func (s *metastore) PullAll() (map[Processor]Metaitems, error) {
oldMeta := s.pullMetadata()
// Make sure no marks are happening on the old metadata
s.procMu.Lock()
s.procMu.Unlock() //lint:ignore SA2001 syncpoint
return oldMeta, nil
}
// pullMetadata atomically replaces the current metadata with a new instance and returns the old instance.
func (s *metastore) pullMetadata() map[Processor]Metaitems {
newMeta := atomic.Value{}
newMeta.Store(&map[Processor]Metaitems{})
metaPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.metadata))
oldMetaPtr := atomic.SwapPointer(metaPtr, unsafe.Pointer(&newMeta))
return *(*atomic.Value)(oldMetaPtr).Load().(*map[Processor]Metaitems)
}
// Mark sets metadata for a processor.
func (s *metastore) Mark(p Processor, src Source, meta Metadata) error {
if p == nil {
return nil
}
if meta != nil {
o := ProcessorOrigin
if _, ok := p.(Committer); ok {
o = CommitterOrigin
}
meta.WithOrigin(o)
}
s.procMu.Lock()
procMeta := s.metadata.Load().(*map[Processor]Metaitems)
items, ok := (*procMeta)[p]
if !ok {
(*procMeta)[p] = Metaitems{{Source: src, Metadata: meta}}
s.procMu.Unlock()
return nil
}
if src == nil || meta == nil {
s.procMu.Unlock()
return nil
}
for _, item := range items {
if item.Source == src {
item.Metadata = meta.Merge(item.Metadata, Dupless)
s.procMu.Unlock()
return nil
}
}
items = append(items, &Metaitem{Source: src, Metadata: meta})
(*procMeta)[p] = items
s.procMu.Unlock()
return nil
}