-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
135 lines (123 loc) · 3.54 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package qstash
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// Publisher for the qstash queue
type Publisher struct {
token string
url string
topic string
client interface {
Do(*http.Request) (*http.Response, error)
}
uuid interface {
NewV4() (string, error)
}
verbose bool
}
// NewPublisher creates a new qstash publisher
func NewPublisher(topic string, opts ...PublisherOption) (*Publisher, error) {
// Apply the options
var os PublisherOptions
if err := os.apply(append(opts, withTopic(topic))...); err != nil {
return nil, err
}
return &Publisher{
token: os.QStashToken,
url: os.QStashURL,
topic: os.topic,
uuid: new(uuid),
client: &httpClient{
client: &http.Client{
Timeout: os.Client.Timeout,
},
MaxBackOff: os.Client.MaxBackOff,
MinBackOff: os.Client.MinBackOff,
Retries: os.Client.Retries,
},
verbose: os.Verbose,
}, nil
}
// Publish publishes a message to the QStash
func (q *Publisher) Publish(ctx context.Context, m *Message, opts ...PublishOption) error {
// Parse the publish options
var os PublishOptions
if opts != nil {
if err := os.apply(opts...); err != nil {
return fmt.Errorf("bad options: %w", err)
}
}
// Create the request
r, err := http.NewRequest(
"POST",
fmt.Sprintf("%s/%s", q.url, q.topic),
bytes.NewBuffer(m.Body),
)
if err != nil {
return fmt.Errorf("could not create request %w", err)
}
// Validate and add the optional message headers
if m.Headers != nil {
for k := range m.Headers {
if !strings.HasPrefix(strings.ToLower(k), "upstash-forward-") {
return fmt.Errorf("headers must start with 'Upstash-Forward-'")
}
}
r.Header = m.Headers
}
// Determine the deduplication id
if hasID := len(m.ID) > 0; hasID && os.ContentBasedDeduplication {
return fmt.Errorf("you cannot set 'content based deduplication' and pass a custom deduplication id")
} else if os.ContentBasedDeduplication {
r.Header.Set("Upstash-Content-Based-Deduplication", "true")
} else if hasID {
r.Header.Set("Upstash-Deduplication-ID", m.ID)
} else if deduplicationID, err := q.uuid.NewV4(); err != nil {
return fmt.Errorf("could not generate uuid %w", err)
} else {
// By default, generate a uuid to allow for retries on publish
r.Header.Set("Upstash-Deduplication-ID", deduplicationID)
}
// Set the standard request headers
r.Header.Set("Authorization", fmt.Sprintf("Bearer %s", q.token))
r.Header.Set("Content-Type", "application/json")
// Configure scheduling and retry functionality
if os.Delay > 0 {
r.Header.Set("Upstash-Delay", os.Delay.String())
}
if os.Retries > 0 {
r.Header.Set("Upstash-Retries", strconv.Itoa(os.Retries))
}
// Publish the message
rsp, err := q.client.Do(r.WithContext(ctx))
if err != nil {
return fmt.Errorf("could not complete request %w", err)
} else if rsp.StatusCode < 200 || rsp.StatusCode > 299 {
bs, _ := io.ReadAll(rsp.Body)
rsp.Body.Close()
return fmt.Errorf("bad request status %d: %s", rsp.StatusCode, string(bs))
}
// Return the message id
var body struct {
MessageID string `json:"messageId"`
}
defer rsp.Body.Close()
if err := json.NewDecoder(rsp.Body).Decode(&body); err != nil {
return fmt.Errorf("could not decode response %w", err)
}
m.ID = body.MessageID
// Success
return nil
}
// PublishWithDelay publishes a message to the QStash with a delay
func (q *Publisher) PublishWithDelay(ctx context.Context, message *Message, delay time.Duration, opts ...PublishOption) error {
return q.Publish(ctx, message, append(opts, WithDelay(delay))...)
}