-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconsumer.go
115 lines (95 loc) · 2.62 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package migration
import (
"context"
"encoding/json"
"github.com/IBM/sarama"
"github.com/tidepool-org/clinic-worker/cdc"
"github.com/tidepool-org/go-common/events"
"go.uber.org/fx"
"go.uber.org/zap"
"strconv"
)
const (
migrationsTopic = "clinic.migrations"
)
var Module = fx.Provide(
NewRateLimiter,
NewMigrator,
fx.Annotated{
Group: "consumers",
Target: CreateConsumerGroup,
},
)
type MigrationCDCConsumer struct {
logger *zap.SugaredLogger
migrator Migrator
}
type Params struct {
fx.In
Logger *zap.SugaredLogger
Migrator Migrator
}
func CreateConsumerGroup(p Params) (events.EventConsumer, error) {
config, err := cdc.GetConfig()
if err != nil {
return nil, err
}
config.KafkaTopic = migrationsTopic
return events.NewFaultTolerantConsumerGroup(config, CreateConsumer(p))
}
func CreateConsumer(p Params) events.ConsumerFactory {
return func() (events.MessageConsumer, error) {
delegate, err := NewPatientCDCConsumer(p)
if err != nil {
return nil, err
}
return cdc.NewRetryingConsumer(delegate), nil
}
}
func NewPatientCDCConsumer(p Params) (events.MessageConsumer, error) {
return &MigrationCDCConsumer{
logger: p.Logger,
migrator: p.Migrator,
}, nil
}
func (p *MigrationCDCConsumer) Initialize(config *events.CloudEventsConfig) error {
return nil
}
func (p *MigrationCDCConsumer) HandleKafkaMessage(cm *sarama.ConsumerMessage) error {
if cm == nil {
return nil
}
return p.handleMessage(cm)
}
func (p *MigrationCDCConsumer) handleMessage(cm *sarama.ConsumerMessage) error {
p.logger.Debugw("handling kafka message", "offset", cm.Offset)
event := MigrationCDCEvent{
Offset: cm.Offset,
}
if err := p.unmarshalEvent(cm.Value, &event); err != nil {
p.logger.Warnw("unable to unmarshal message", "offset", cm.Offset, zap.Error(err))
return err
}
if err := p.handleCDCEvent(event); err != nil {
p.logger.Errorw("unable to process cdc event", "offset", cm.Offset, zap.Error(err))
return err
}
return nil
}
func (p *MigrationCDCConsumer) unmarshalEvent(value []byte, event *MigrationCDCEvent) error {
message, err := strconv.Unquote(string(value))
if err != nil {
return err
}
return json.Unmarshal([]byte(message), event)
}
func (p *MigrationCDCConsumer) handleCDCEvent(event MigrationCDCEvent) error {
if event.OperationType != cdc.OperationTypeInsert {
p.logger.Debugw("skipping handling of event", "offset", event.Offset)
return nil
}
p.logger.Infow("processing event", "event", event, "offset", event.Offset)
userId := event.FullDocument.UserId
clinicId := event.FullDocument.ClinicId.Value
return p.migrator.MigratePatients(context.Background(), userId, clinicId)
}