-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadd_quota_usage.go
112 lines (96 loc) · 2.51 KB
/
add_quota_usage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package andromeda
import (
"context"
"errors"
"fmt"
)
// AddUsageOption .
type AddUsageOption struct {
ModifiedUsage int64
Irreversible bool // does not reverse when the next update quota usage has an error
Listener UpdateQuotaUsageListener
}
type addQuotaUsage struct {
cache Cache
getQuotaUsageKey GetQuotaKey
getQuotaLimit GetQuota
next UpdateQuotaUsage
option AddUsageOption
}
func (q *addQuotaUsage) Do(ctx context.Context, req *QuotaUsageRequest) (res interface{}, err error) {
var totalUsage int64
var isNextErr bool
defer func() {
if q.option.Listener != nil && !isNextErr {
if err == nil {
q.option.Listener.OnSuccess(ctx, req, totalUsage)
} else {
q.option.Listener.OnError(ctx, req, err)
}
}
}()
quotaReq := &QuotaRequest{QuotaID: req.QuotaID, Data: req.Data}
key, err := q.getQuotaUsageKey.Do(ctx, quotaReq)
if errors.Is(err, ErrQuotaNotFound) {
return q.next.Do(ctx, req)
} else if err != nil {
return
}
usage := req.Usage
if q.option.ModifiedUsage > 0 {
usage = q.option.ModifiedUsage
}
limit, err := q.getQuotaLimit.Do(ctx, quotaReq)
if err != nil {
return
}
totalUsage, err = q.cache.IncrBy(ctx, key, usage)
if err != nil {
err = fmt.Errorf("%w: %v", ErrAddQuotaUsage, err)
return
}
if totalUsage > limit {
err = NewQuotaLimitExceededError(key, limit, totalUsage-usage)
if er := q.reverseUsage(ctx, key, usage); er != nil {
err = er
}
return
}
res, _err := q.next.Do(ctx, req)
if _err != nil {
isNextErr = true
if !q.option.Irreversible {
if er := q.reverseUsage(ctx, key, usage); er != nil {
err, _err = er, er
isNextErr = false
}
}
}
return res, _err
}
func (q *addQuotaUsage) reverseUsage(ctx context.Context, key string, usage int64) error {
if _, err := q.cache.DecrBy(ctx, key, usage); err != nil {
return fmt.Errorf("%w: %v", ErrReduceQuotaUsage, err)
}
return nil
}
// NewAddQuotaUsage .
func NewAddQuotaUsage(
cache Cache,
getQuotaUsageKey GetQuotaKey,
getQuotaLimit GetQuota,
next UpdateQuotaUsage,
option AddUsageOption,
) UpdateQuotaUsage {
return &addQuotaUsage{
cache: cache,
getQuotaUsageKey: getQuotaUsageKey,
getQuotaLimit: getQuotaLimit,
next: next,
option: option,
}
}
// NewQuotaLimitExceededError is a error helper for quota limit exceeded
func NewQuotaLimitExceededError(key string, limit, usage int64) error {
return fmt.Errorf("%w: limit %d and usage %d for key %s", ErrQuotaLimitExceeded, limit, usage, key)
}