Skip to content

Commit

Permalink
Add error logging to redisWriteOpQueue (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean-Der authored Mar 27, 2024
1 parent 0d14376 commit cec3a0e
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions internal/bus/bus_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,16 @@ func (q *redisWriteOpQueue) drain() {
for q.ops.Len() > 0 {
op := q.ops.PopFront()
q.mu.Unlock()
op.run()
if err := op.run(); err != nil {
logger.Error(err, "redis write message failed")
}
q.mu.Lock()
}
q.mu.Unlock()
}

type redisWriteOp interface {
run()
run() error
}

type redisPublishOp struct {
Expand All @@ -239,8 +241,8 @@ type redisPublishOp struct {
message []byte
}

func (r *redisPublishOp) run() {
r.rc.Publish(r.ctx, r.channel, r.message)
func (r *redisPublishOp) run() error {
return r.rc.Publish(r.ctx, r.channel, r.message).Err()
}

type redisExecPublishOp struct {
Expand All @@ -249,8 +251,9 @@ type redisExecPublishOp struct {
ops *redisWriteOpQueue
}

func (r *redisExecPublishOp) run() {
func (r *redisExecPublishOp) run() error {
go r.exec()
return nil
}

func (r *redisExecPublishOp) exec() {
Expand All @@ -268,7 +271,7 @@ type redisReconcileSubscriptionsOp struct {
*redisMessageBus
}

func (r *redisReconcileSubscriptionsOp) run() {
func (r *redisReconcileSubscriptionsOp) run() error {
r.mu.Lock()
for len(r.dirtyChannels) > 0 {
subscribe := make(map[string]struct{}, len(r.dirtyChannels))
Expand Down Expand Up @@ -313,6 +316,7 @@ func (r *redisReconcileSubscriptionsOp) run() {
}
}
r.mu.Unlock()
return nil
}

type redisSubList struct {
Expand Down

0 comments on commit cec3a0e

Please sign in to comment.