-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmutex.go
193 lines (169 loc) · 4.03 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package distlock
import (
"bytes"
"context"
"math/rand"
"sync"
"time"
)
type (
// Mutex / mutual exclusion lock
Mutex struct {
pool []mutexStorageDriver
tries uint
quorum int
genValueFunc func() []byte
delayFunc func(tries int) time.Duration
kv sync.Map
}
// a lock value and expiration time
lockVal struct {
value []byte
until time.Time
}
// Option of mutex
Option func(*Mutex)
// the interface of a driver to manage lock keys
mutexStorageDriver interface {
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error // Should return error if a key is already exists
Get(ctx context.Context, key string) ([]byte, error)
Delete(ctx context.Context, key string) error
}
)
// New mutex instance with options
func New(opt ...Option) *Mutex {
m := &Mutex{
tries: 1,
genValueFunc: genValue,
delayFunc: defaultDelayFunc,
}
for _, o := range opt {
o(m)
}
if m.pool == nil {
panic("add at least one storage driver in pool")
}
if len(m.pool)%2 != 1 {
panic("must be odd number of storage nodes to prevent split brain")
}
m.quorum = (len(m.pool) / 2) + 1
return m
}
// Lock func
// Returns error if the lock is already in use.
func (m *Mutex) Lock(key string, ttl time.Duration) error {
return m.LockContext(nil, key, ttl)
}
// LockContext func is the same as Lock but has context :)
// Returns error if the lock is already in use.
func (m *Mutex) LockContext(ctx context.Context, key string, ttl time.Duration) error {
val := m.genValueFunc()
for i := 0; i < int(m.tries); i++ {
if i != 0 {
time.Sleep(m.delayFunc(i))
}
start := time.Now()
n := storeAsync(ctx, m.pool, key, val, ttl)
now := time.Now()
until := now.Add(ttl - now.Sub(start))
if n >= m.quorum && now.Before(until) {
m.kv.Store(key, lockVal{
value: val,
until: until,
})
return nil
}
deleteAsync(ctx, m.pool, key, val)
}
return ErrAlreadyTaken
}
// Unlock func
// Returns error if the lock is already in use.
func (m *Mutex) Unlock(key string) error {
return m.UnlockContext(nil, key)
}
// UnlockContext func is the same as Unlock but has context :)
// Returns error if the lock is already in use.
func (m *Mutex) UnlockContext(ctx context.Context, key string) error {
if value, ok := m.kv.Load(key); ok {
if v, ok := value.(lockVal); ok {
n := deleteAsync(ctx, m.pool, key, v.value)
if n < m.quorum {
return ErrQuorumNotDeleted
}
}
}
return nil
}
// Stores a lock key into each storage
func storeAsync(ctx context.Context, pool []mutexStorageDriver, key string, value []byte, ttl time.Duration) int {
// try to store lock key into each storage in pool
ch := make(chan bool)
for _, sd := range pool {
go func(s mutexStorageDriver) {
err := s.Set(ctx, key, value, ttl)
ch <- (err == nil)
}(sd)
}
// count all successful storing attempt
n := 0
for range pool {
r := <-ch
if r {
n++
}
}
return n
}
// Delete a lock key from each storage
func deleteAsync(ctx context.Context, pool []mutexStorageDriver, key string, value []byte) int {
// try to delete lock key from each storage in pool
ch := make(chan bool)
for _, sd := range pool {
go func(s mutexStorageDriver) {
v, err := s.Get(ctx, key)
if err != nil {
ch <- false
return
}
// delete only records with the same values to avoid release another lock
if bytes.Compare(v, value) == 0 {
err := s.Delete(ctx, key)
ch <- (err == nil)
} else {
ch <- false
}
}(sd)
}
// count all successful deleting attempt
n := 0
for range pool {
r := <-ch
if r {
n++
}
}
return n
}
// generates random value
func genValue() []byte {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return []byte("")
}
return b
}
// generates pseudo-random delay time
// to avoid run all pods at the same time
// after failed first lock attempting
func defaultDelayFunc(tries int) time.Duration {
if tries < 1 {
tries = 1
}
rn := rand.Intn(tries)
if rn == 0 {
rn = 1
}
return time.Duration(rn * 100 * int(time.Millisecond))
}