diff --git a/internal/bus/bus_redis.go b/internal/bus/bus_redis.go index e66f604..3a28b0d 100644 --- a/internal/bus/bus_redis.go +++ b/internal/bus/bus_redis.go @@ -223,14 +223,16 @@ func (q *redisWriteOpQueue) drain() { for q.ops.Len() > 0 { op := q.ops.PopFront() q.mu.Unlock() - op.run() + if err := op.run(); err != nil { + logger.Error(err, "redis write message failed") + } q.mu.Lock() } q.mu.Unlock() } type redisWriteOp interface { - run() + run() error } type redisPublishOp struct { @@ -239,8 +241,8 @@ type redisPublishOp struct { message []byte } -func (r *redisPublishOp) run() { - r.rc.Publish(r.ctx, r.channel, r.message) +func (r *redisPublishOp) run() error { + return r.rc.Publish(r.ctx, r.channel, r.message).Err() } type redisExecPublishOp struct { @@ -249,8 +251,9 @@ type redisExecPublishOp struct { ops *redisWriteOpQueue } -func (r *redisExecPublishOp) run() { +func (r *redisExecPublishOp) run() error { go r.exec() + return nil } func (r *redisExecPublishOp) exec() { @@ -268,7 +271,7 @@ type redisReconcileSubscriptionsOp struct { *redisMessageBus } -func (r *redisReconcileSubscriptionsOp) run() { +func (r *redisReconcileSubscriptionsOp) run() error { r.mu.Lock() for len(r.dirtyChannels) > 0 { subscribe := make(map[string]struct{}, len(r.dirtyChannels)) @@ -313,6 +316,7 @@ func (r *redisReconcileSubscriptionsOp) run() { } } r.mu.Unlock() + return nil } type redisSubList struct {