diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 2d03714..40011f3 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -18,6 +18,6 @@ jobs: - name: Run linter uses: golangci/golangci-lint-action@v6.0.1 # Action page: with: - version: v1.58 # without patch version + version: v1.59 # without patch version only-new-issues: false # show only new issues if it's a pull request args: --timeout=10m --build-tags=race ./... diff --git a/.gitignore b/.gitignore index c67582c..4f660bc 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ vendor/ tests/php_test_files/composer.lock .idea -.DS_Store \ No newline at end of file +.DS_Store +**/composer.lock diff --git a/.golangci.yml b/.golangci.yml index 8ed2f68..9f0d252 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -36,7 +36,6 @@ linters: # All available linters list: 0 { + maps.Copy(i.headers, headers) + } + + // requeue the message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + // Nack on fail + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + nr, err2 := i.Options.message.NackWithResult().Get(ctx) + cancel() + return handleResult(stderr.Join(err, err2), nr) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ar, err := i.Options.message.AckWithResult().Get(ctx) + cancel() + if err != nil { + return handleResult(err, ar) + } + return nil } @@ -138,10 +215,10 @@ func (i *Item) Respond(_ []byte, _ string) error { func fromJob(job jobs.Message) *Item { return &Item{ - Job: job.Name(), - Ident: job.ID(), - Payload: string(job.Payload()), - Metadata: job.Headers(), + Job: job.Name(), + Ident: job.ID(), + Payload: job.Payload(), + headers: job.Headers(), Options: &Options{ Priority: job.Priority(), Pipeline: job.GroupID(), @@ -151,14 +228,6 @@ func fromJob(job jobs.Message) *Item { } } -func strToBytes(data string) []byte { - if data == "" { - return nil - } - - return unsafe.Slice(unsafe.StringData(data), len(data)) -} - func (d *Driver) unpack(message *pubsub.Message) *Item { attributes := message.Attributes @@ -204,18 +273,19 @@ func (d *Driver) unpack(message *pubsub.Message) *Item { } return &Item{ - Job: rrj, - Ident: rrid, - Payload: string(message.Data), - Metadata: h, + Job: rrj, + Ident: rrid, + Payload: message.Data, + headers: h, Options: &Options{ AutoAck: autoAck, Delay: int64(dl), Priority: int64(priority), Pipeline: (*d.pipeline.Load()).Name(), // private - message: message, - stopped: &d.stopped, + message: message, + stopped: &d.stopped, + requeueFn: d.handlePush, }, } } @@ -235,3 +305,21 @@ func stob(s string) bool { return false } + +func handleResult(err error, ar pubsub.AcknowledgeStatus) error { + switch ar { + case pubsub.AcknowledgeStatusSuccess: + // no error + return nil + case pubsub.AcknowledgeStatusPermissionDenied: + return fmt.Errorf("acknowledge status: PermissionDenied, err: %w", err) + case pubsub.AcknowledgeStatusFailedPrecondition: + return fmt.Errorf("acknowledge status: FailedPrecondition, err: %w", err) + case pubsub.AcknowledgeStatusInvalidAckID: + return fmt.Errorf("acknowledge status: InvalidAckID, err: %w", err) + case pubsub.AcknowledgeStatusOther: + return fmt.Errorf("acknowledge status: Other, err: %w", err) + default: + return err + } +} diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index d3e5dbc..b8dd803 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -2,14 +2,20 @@ package pubsubjobs import ( "context" + "errors" "sync/atomic" "cloud.google.com/go/pubsub" + "github.com/roadrunner-server/events" "go.opentelemetry.io/otel/propagation" "go.uber.org/zap" "google.golang.org/grpc/status" ) +const ( + restartStr string = "restart" +) + func (d *Driver) listen() { // context used to stop the listener d.atomicCtx() @@ -18,17 +24,17 @@ func (d *Driver) listen() { d.log.Debug("receive message", zap.Stringp("ID", &message.ID)) item := d.unpack(message) - ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(ctx, propagation.HeaderCarrier(item.Metadata)), "google_pub_sub_listener") + ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(ctx, propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener") if item.Options.AutoAck { message.Ack() d.log.Debug("auto ack is turned on, message acknowledged") } - if item.Metadata == nil { - item.Metadata = make(map[string][]string, 2) + if item.headers == nil { + item.headers = make(map[string][]string, 2) } - d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.Metadata)) + d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers)) d.pq.Insert(item) d.log.Debug("message pushed to the priority queue", zap.Uint64("queue size", d.pq.Len())) @@ -36,9 +42,13 @@ func (d *Driver) listen() { span.End() }) if err != nil { + if errors.Is(err, context.Canceled) { + atomic.StoreUint32(&d.listeners, 0) + return + } st := status.Convert(err) if st != nil && st.Message() == "grpc: the client connection is closing" { - // reduce number of listeners + // reduce the number of listeners if atomic.LoadUint32(&d.listeners) > 0 { atomic.AddUint32(&d.listeners, ^uint32(0)) } @@ -47,9 +57,72 @@ func (d *Driver) listen() { return } - d.log.Error("subscribing error", zap.Error(err)) + atomic.StoreUint32(&d.listeners, 0) + // the pipeline was stopped + if atomic.LoadUint64(&d.stopped) == 1 { + return + } + + // recreate pipeline on fail + pipe := (*d.pipeline.Load()).Name() + d.eventsCh <- events.NewEvent(events.EventJOBSDriverCommand, pipe, restartStr) + d.log.Error("subscribing error, restarting the pipeline", zap.Error(err), zap.String("pipeline", pipe)) } }() + + if d.dlsub != nil { + go func() { + err := d.dlsub.Receive(d.rctx, func(ctx context.Context, message *pubsub.Message) { + d.log.Debug("dead-letter receive message", zap.Stringp("ID", &message.ID)) + item := d.unpack(message) + + ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(ctx, propagation.HeaderCarrier(item.headers)), "google_pub_sub_dl_listener") + if item.Options.AutoAck { + message.Ack() + // it is not possible to requeue a message from the dead-letter queue when auto ack is turned on + d.log.Debug("dead-letter auto ack is turned on, message acknowledged") + } + + if item.headers == nil { + item.headers = make(map[string][]string, 2) + } + + d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers)) + + d.pq.Insert(item) + d.log.Debug("dead-letter message pushed to the priority queue", zap.Uint64("queue size", d.pq.Len())) + + span.End() + }) + if err != nil { + if errors.Is(err, context.Canceled) { + atomic.StoreUint32(&d.listeners, 0) + return + } + st := status.Convert(err) + if st != nil && st.Message() == "grpc: the client connection is closing" { + // reduce the number of listeners + if atomic.LoadUint32(&d.listeners) > 0 { + atomic.AddUint32(&d.listeners, ^uint32(0)) + } + + d.log.Debug("dead-letter listener was stopped") + return + } + + atomic.StoreUint32(&d.listeners, 0) + + // the pipeline was stopped + if atomic.LoadUint64(&d.stopped) == 1 { + return + } + // recreate pipeline on fail + pipe := (*d.pipeline.Load()).Name() + d.eventsCh <- events.NewEvent(events.EventJOBSDriverCommand, pipe, restartStr) + d.log.Error("dead-letter subscribing error", zap.Error(err), zap.String("pipeline", pipe)) + } + }() + } } func (d *Driver) atomicCtx() { diff --git a/tests/configs/.rr-init.yaml b/tests/configs/.rr-init.yaml index 4d07079..82b03f3 100644 --- a/tests/configs/.rr-init.yaml +++ b/tests/configs/.rr-init.yaml @@ -1,4 +1,4 @@ -version: '3' +version: "3" rpc: listen: tcp://127.0.0.1:6001 @@ -33,6 +33,7 @@ jobs: project_id: test topic: rrTopic1 skip_topic_declaration: false + dead_letter_topic: "dead-letter-topic" tags: test: "tag" @@ -44,4 +45,4 @@ jobs: skip_topic_declaration: false tags: test: "tag" - consume: [ "test-1", "test-2" ] + consume: ["test-1", "test-2"] diff --git a/tests/configs/.rr-pq.yaml b/tests/configs/.rr-pq.yaml index 4dd3a2b..3c04eed 100644 --- a/tests/configs/.rr-pq.yaml +++ b/tests/configs/.rr-pq.yaml @@ -1,4 +1,4 @@ -version: '3' +version: "3" rpc: listen: tcp://127.0.0.1:6601 @@ -42,4 +42,4 @@ jobs: skip_topic_declaration: false tags: test: "tag" - consume: [ "test-3", "test-4" ] + consume: ["test-3", "test-4"] diff --git a/tests/go.mod b/tests/go.mod index 3eda788..6740878 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -1,35 +1,35 @@ module tests -go 1.22.4 +go 1.22.5 -replace github.com/roadrunner-server/google-pub-sub/v4 => ../ +replace github.com/roadrunner-server/google-pub-sub/v5 => ../ require ( - cloud.google.com/go/pubsub v1.39.0 + cloud.google.com/go/pubsub v1.40.0 github.com/google/uuid v1.6.0 - github.com/roadrunner-server/api/v4 v4.12.0 - github.com/roadrunner-server/config/v4 v4.9.2 + github.com/roadrunner-server/api/v4 v4.15.0 + github.com/roadrunner-server/config/v4 v4.9.3 github.com/roadrunner-server/endure/v2 v2.4.5 - github.com/roadrunner-server/google-pub-sub/v4 v4.0.0-20240315194731-a530eba2bb5f + github.com/roadrunner-server/google-pub-sub/v4 v4.0.1 github.com/roadrunner-server/goridge/v3 v3.8.2 - github.com/roadrunner-server/informer/v4 v4.5.4 - github.com/roadrunner-server/jobs/v4 v4.9.4 - github.com/roadrunner-server/logger/v4 v4.4.4 - github.com/roadrunner-server/resetter/v4 v4.3.4 - github.com/roadrunner-server/rpc/v4 v4.4.4 - github.com/roadrunner-server/server/v4 v4.8.4 + github.com/roadrunner-server/informer/v4 v4.5.5 + github.com/roadrunner-server/jobs/v4 v4.10.0 + github.com/roadrunner-server/logger/v4 v4.4.5 + github.com/roadrunner-server/resetter/v4 v4.3.5 + github.com/roadrunner-server/rpc/v4 v4.4.5 + github.com/roadrunner-server/server/v4 v4.8.5 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 - google.golang.org/api v0.185.0 - google.golang.org/grpc v1.64.0 + google.golang.org/api v0.187.0 + google.golang.org/grpc v1.65.0 ) require ( cloud.google.com/go v0.115.0 // indirect - cloud.google.com/go/auth v0.5.1 // indirect + cloud.google.com/go/auth v0.6.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect - cloud.google.com/go/compute/metadata v0.3.0 // indirect - cloud.google.com/go/iam v1.1.8 // indirect + cloud.google.com/go/compute/metadata v0.4.0 // indirect + cloud.google.com/go/iam v1.1.10 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -51,15 +51,16 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.54.0 // indirect + github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/roadrunner-server/errors v1.4.0 // indirect - github.com/roadrunner-server/sdk/v4 v4.7.3 // indirect - github.com/roadrunner-server/tcplisten v1.4.0 // indirect + github.com/roadrunner-server/sdk/v4 v4.8.0 // indirect + github.com/roadrunner-server/tcplisten v1.5.0 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect @@ -75,13 +76,13 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect - go.opentelemetry.io/contrib/propagators/jaeger v1.27.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect - go.opentelemetry.io/otel/metric v1.27.0 // indirect - go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/trace v1.27.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect + go.opentelemetry.io/contrib/propagators/jaeger v1.28.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect @@ -91,9 +92,9 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect + google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/tests/go.sum b/tests/go.sum index 00162c6..48ce5b4 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -1,20 +1,20 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.115.0 h1:CnFSK6Xo3lDYRoBKEcAtia6VSC837/ZkJuRduSFnr14= cloud.google.com/go v0.115.0/go.mod h1:8jIM5vVgoAEoiVxQ/O4BFTfHqulPZgs/ufEzMcFMdWU= -cloud.google.com/go/auth v0.5.1 h1:0QNO7VThG54LUzKiQxv8C6x1YX7lUrzlAa1nVLF8CIw= -cloud.google.com/go/auth v0.5.1/go.mod h1:vbZT8GjzDf3AVqCcQmqeeM32U9HBFc32vVVAbwDsa6s= +cloud.google.com/go/auth v0.6.1 h1:T0Zw1XM5c1GlpN2HYr2s+m3vr1p2wy+8VN+Z1FKxW38= +cloud.google.com/go/auth v0.6.1/go.mod h1:eFHG7zDzbXHKmjJddFG/rBlcGp6t25SwRUiEQSlO4x4= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= -cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= -cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= -cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0= -cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE= -cloud.google.com/go/kms v1.17.1 h1:5k0wXqkxL+YcXd4viQzTqCgzzVKKxzgrK+rCZJytEQs= -cloud.google.com/go/kms v1.17.1/go.mod h1:DCMnCF/apA6fZk5Cj4XsD979OyHAqFasPuA5Sd0kGlQ= -cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuAKilhU= -cloud.google.com/go/longrunning v0.5.7/go.mod h1:8GClkudohy1Fxm3owmBGid8W0pSgodEMwEAztp38Xng= -cloud.google.com/go/pubsub v1.39.0 h1:qt1+S6H+wwW8Q/YvDwM8lJnq+iIFgFEgaD/7h3lMsAI= -cloud.google.com/go/pubsub v1.39.0/go.mod h1:FrEnrSGU6L0Kh3iBaAbIUM8KMR7LqyEkMboVxGXCT+s= +cloud.google.com/go/compute/metadata v0.4.0 h1:vHzJCWaM4g8XIcm8kopr3XmDA4Gy/lblD3EhhSux05c= +cloud.google.com/go/compute/metadata v0.4.0/go.mod h1:SIQh1Kkb4ZJ8zJ874fqVkslA29PRXuleyj6vOzlbK7M= +cloud.google.com/go/iam v1.1.10 h1:ZSAr64oEhQSClwBL670MsJAW5/RLiC6kfw3Bqmd5ZDI= +cloud.google.com/go/iam v1.1.10/go.mod h1:iEgMq62sg8zx446GCaijmA2Miwg5o3UbO+nI47WHJps= +cloud.google.com/go/kms v1.18.1 h1:tz1oSpKokgn1+FF7mEMMmsu0FVHQebZjtKetX3fbYdo= +cloud.google.com/go/kms v1.18.1/go.mod h1:fOsmW0fzDVYXM0AOJWmpB0gFVOVgC33giwYi0kcTdBA= +cloud.google.com/go/longrunning v0.5.8 h1:QThI5BFSlYlS7K0wnABCdmKsXbG/htLc3nTPzrfOgeU= +cloud.google.com/go/longrunning v0.5.8/go.mod h1:oJDErR/mm5h44gzsfjQlxd6jyjFvuBPOxR1TLy2+cQk= +cloud.google.com/go/pubsub v1.40.0 h1:0LdP+zj5XaPAGtWr2V6r88VXJlmtaB/+fde1q3TU8M0= +cloud.google.com/go/pubsub v1.40.0/go.mod h1:BVJI4sI2FyXp36KFKvFwcfDRDfR8MiLT8mMhmIhdAeA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -99,6 +99,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -109,36 +111,38 @@ github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJL github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= -github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/roadrunner-server/api/v4 v4.12.0 h1:N8AC+b7uzrDpTPnFTBVWNIs9ZMV42hwKDCo29X84iS8= -github.com/roadrunner-server/api/v4 v4.12.0/go.mod h1:nLV2f4O7tDh5DaMDff4oX1bNJ9erz7eyq+4TajgKGck= -github.com/roadrunner-server/config/v4 v4.9.2 h1:AvZj6b9Jz8dR1FB3Suugcn+8Cok2WnIla4iznbuwmKo= -github.com/roadrunner-server/config/v4 v4.9.2/go.mod h1:yYYIkw1u2Hq8521v/qoqDuOHwOKH9X8Mx+t+TWX7UME= +github.com/roadrunner-server/api/v4 v4.15.0 h1:/CUbaHb9AT2PSWQFhjUxnuSC5p0ypkACrFgiIvNHsoo= +github.com/roadrunner-server/api/v4 v4.15.0/go.mod h1:fAiaKHRVkt49S302Jhj60jd+nUNfs8iyjKE5rIoQ+kE= +github.com/roadrunner-server/config/v4 v4.9.3 h1:YZ2OkNT8OLwG/ulc2+GcQJzCCclGe5sy/onnInAxGY4= +github.com/roadrunner-server/config/v4 v4.9.3/go.mod h1:fcIWS7ZlG5h4fg1uE97lE0BVZRTS+/gLuDdNLPRNoIY= github.com/roadrunner-server/endure/v2 v2.4.5 h1:GoZm/1HjKCKm8TpaP/Pm2KbN0X9gLyN840cA3Fn/TCE= github.com/roadrunner-server/endure/v2 v2.4.5/go.mod h1:83UvLdt+RNxELTSna+SZMWQiu+Thj6wOz6hmlp65XFI= github.com/roadrunner-server/errors v1.4.0 h1:Odjg3VZrj1q5Y8ILwoN+JgERyv0pkhrWPNOM4h68iQ8= github.com/roadrunner-server/errors v1.4.0/go.mod h1:78PvraAFj+Sxy5nDmo0S+h6rEMLFIDszWZxA3B0sPAs= +github.com/roadrunner-server/google-pub-sub/v4 v4.0.1 h1:/w52FHQS4BCByL2bKjQExzPDTBQGc6jWefbTum2eYXU= +github.com/roadrunner-server/google-pub-sub/v4 v4.0.1/go.mod h1:r67PNusyAvwQvYL6kG/Jcv2ftEAL8PZyOn/j5Iq7pLk= github.com/roadrunner-server/goridge/v3 v3.8.2 h1:4TpIJAMylMIVTva/L/STB4ZvYNVoQ77+Syr6abxj95c= github.com/roadrunner-server/goridge/v3 v3.8.2/go.mod h1:7IIDW50j1saxnOxktFeUPpkSIfyvM/dYopTrbGWXboA= -github.com/roadrunner-server/informer/v4 v4.5.4 h1:mDYYR3xgYNDeFl9hvJPo0fw1byAe/LYsO643WFVgPY0= -github.com/roadrunner-server/informer/v4 v4.5.4/go.mod h1:48pRPU0bFaWLbQkIkNT142wFzPnJr25Bg0GyOzUiqpk= -github.com/roadrunner-server/jobs/v4 v4.9.4 h1:HBFVi/yZAzKihvad2a3rQTTrtqAhGJ6nqa2vefCD7RE= -github.com/roadrunner-server/jobs/v4 v4.9.4/go.mod h1:jQ3ZqNMLt1KYNEKOaRzjhUDpZSnZmiedUgxBEVEkqYM= -github.com/roadrunner-server/logger/v4 v4.4.4 h1:GM+eMhrBIaNR0TP94krK3H0rLOiVEP3woYhQTKDTUxA= -github.com/roadrunner-server/logger/v4 v4.4.4/go.mod h1:gV0HHkfuLW/2DH1jtRUCFCkYd7FWOg8cP26TnPXUavY= -github.com/roadrunner-server/resetter/v4 v4.3.4 h1:vbEnrka/i+PmnDcr+meijE+DW+c34o7POj3oLdpDQww= -github.com/roadrunner-server/resetter/v4 v4.3.4/go.mod h1:YWE7nD1t8Lcyu3HmlniBWmBkJeszAUfjjuQOqrU/7kk= -github.com/roadrunner-server/rpc/v4 v4.4.4 h1:6uAG2Q/QDvOO/+fhGwnEjlCu1C57wtIRoOPxkHWDZMA= -github.com/roadrunner-server/rpc/v4 v4.4.4/go.mod h1:IYOiuBhE2LD/244uySz5s4aB7PFggt0BoN+Fsh1PEJY= -github.com/roadrunner-server/sdk/v4 v4.7.3 h1:w007xSk96SDFH/IJ3ZsP3AkU5opnw2/g+cBjQxd+WGA= -github.com/roadrunner-server/sdk/v4 v4.7.3/go.mod h1:pOHmaPzvxOn/xhKC9tHibHYSyNXWlUDQvp7pcRiuDGE= -github.com/roadrunner-server/server/v4 v4.8.4 h1:R9PUH+M09MmAdbCyIya1vaaZ+zD8l96CLrpe5YpcHCc= -github.com/roadrunner-server/server/v4 v4.8.4/go.mod h1:62B5R5WWVJsAGYeqPWnYD2w22mIwWfGYQZleRzzi7ho= -github.com/roadrunner-server/tcplisten v1.4.0 h1:yWo09zktv/CSV6VywLfw4pwNcUchgTiIrW4uIICtO5M= -github.com/roadrunner-server/tcplisten v1.4.0/go.mod h1:A6+VSnW2ETGnN/e/CMdP63ZXqQDaC0UDMU6QmyuB0yM= +github.com/roadrunner-server/informer/v4 v4.5.5 h1:WERgFpHlElVIU1TJgpB+HI7C9rU7dKp1bM88qvoEyUI= +github.com/roadrunner-server/informer/v4 v4.5.5/go.mod h1:48pRPU0bFaWLbQkIkNT142wFzPnJr25Bg0GyOzUiqpk= +github.com/roadrunner-server/jobs/v4 v4.10.0 h1:XU0rT+rOdOpebxzAA5rO/RbmnFmM+ScfIohKVCzsqas= +github.com/roadrunner-server/jobs/v4 v4.10.0/go.mod h1:G+GopnW5mK5DKiiLNp0UpW+t+tw7pIZSn2ipFkWIQos= +github.com/roadrunner-server/logger/v4 v4.4.5 h1:KST5ElyLB01H9k3/KliYG8L88JVQi6su+ajhEORypVg= +github.com/roadrunner-server/logger/v4 v4.4.5/go.mod h1:gV0HHkfuLW/2DH1jtRUCFCkYd7FWOg8cP26TnPXUavY= +github.com/roadrunner-server/resetter/v4 v4.3.5 h1:nN7RQSs4IZqI8G7/PyT85RDrRVQ13xwDCG3ERlukaHE= +github.com/roadrunner-server/resetter/v4 v4.3.5/go.mod h1:YWE7nD1t8Lcyu3HmlniBWmBkJeszAUfjjuQOqrU/7kk= +github.com/roadrunner-server/rpc/v4 v4.4.5 h1:8Ojui6+lPy8deLyoZlarJ5/UYsq0XRTiaunjdtBrH2k= +github.com/roadrunner-server/rpc/v4 v4.4.5/go.mod h1:IYOiuBhE2LD/244uySz5s4aB7PFggt0BoN+Fsh1PEJY= +github.com/roadrunner-server/sdk/v4 v4.8.0 h1:MpvRRlqJCjjqeCYah5K1ce64Z9zFqnuTYa1SgdhKJ5Y= +github.com/roadrunner-server/sdk/v4 v4.8.0/go.mod h1:CD9AE/wsGfhI7nbx+Bl727oQiGjh62ExrRJ/DhBg91s= +github.com/roadrunner-server/server/v4 v4.8.5 h1:L/lCbPNthQsmjyu1XBBZlps+TmQ2rPTXtO37Dl3Qtz4= +github.com/roadrunner-server/server/v4 v4.8.5/go.mod h1:NCD4/SO5WJTwYipb7QfqcyXBfsQ0sp/DOHdfAK2egu8= +github.com/roadrunner-server/tcplisten v1.5.0 h1:kwJZAAjPt250fBpnglUJ9mKYlbMeKEXzHwOQ4dk205Q= +github.com/roadrunner-server/tcplisten v1.5.0/go.mod h1:QSPMB61jFvCcvuRLAMhVIcrJhA6QoJAG+rSkz/vsT04= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3N51bwOk= @@ -183,20 +187,20 @@ go.einride.tech/aip v0.67.1 h1:d/4TW92OxXBngkSOwWS2CH5rez869KpKMaN44mdxkFI= go.einride.tech/aip v0.67.1/go.mod h1:ZGX4/zKw8dcgzdLsrvpOOGxfxI2QSk12SlP7d6c0/XI= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 h1:vS1Ao/R55RNV4O7TA2Qopok8yN+X0LIP6RVWLFkprck= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 h1:9l89oX4ba9kHbBol3Xin3leYJ+252h0zszDtBwyKe2A= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0/go.mod h1:XLZfZboOJWHNKUv7eH0inh0E9VV6eWDFB/9yJyTLPp0= -go.opentelemetry.io/contrib/propagators/jaeger v1.27.0 h1:tJPpZAEsihJgRTnXrPjY3rjED8Av3EJdi1kvKCi1yMc= -go.opentelemetry.io/contrib/propagators/jaeger v1.27.0/go.mod h1:5uPAMHJnlTktQbCCdWSX5PfK8CocD25mycIsZV/iFiU= -go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= -go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= -go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= -go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= -go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI= -go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= -go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= -go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0/go.mod h1:azvtTADFQJA8mX80jIH/akaE7h+dbm/sVuaHqN13w74= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= +go.opentelemetry.io/contrib/propagators/jaeger v1.28.0 h1:xQ3ktSVS128JWIaN1DiPGIjcH+GsvkibIAVRWFjS9eM= +go.opentelemetry.io/contrib/propagators/jaeger v1.28.0/go.mod h1:O9HIyI2kVBrFoEwQZ0IN6PHXykGoit4mZV2aEjkTRH4= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -251,26 +255,26 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/api v0.185.0 h1:ENEKk1k4jW8SmmaT6RE+ZasxmxezCrD5Vw4npvr+pAU= -google.golang.org/api v0.185.0/go.mod h1:HNfvIkJGlgrIlrbYkAm9W9IdkmKZjOTVh33YltygGbg= +google.golang.org/api v0.187.0 h1:Mxs7VATVC2v7CY+7Xwm4ndkX71hpElcvx0D1Ji/p1eo= +google.golang.org/api v0.187.0/go.mod h1:KIHlTc4x7N7gKKuVsdmfBXN13yEEWXWFURWY6SBp2gk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4 h1:CUiCqkPw1nNrNQzCCG4WA65m0nAmQiwXHpub3dNyruU= -google.golang.org/genproto v0.0.0-20240617180043-68d350f18fd4/go.mod h1:EvuUDCulqGgV80RvP1BHuom+smhX4qtlhnNatHuroGQ= -google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc= -google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 h1:6whtk83KtD3FkGrVb2hFXuQ+ZMbCNdakARIn/aHMmG8= +google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094/go.mod h1:Zs4wYw8z1zr6RNF4cwYb31mvN/EGaKAdQjNCF3DW6K4= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 h1:0+ozOGcrp+Y8Aq8TLNN2Aliibms5LEzsq99ZZmAGYm0= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094/go.mod h1:fJ/e3If/Q67Mj99hin0hMhiNyCRmt6BQ2aWIJshUSJw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/tests/jobs_test.go b/tests/jobs_test.go index 213320e..893030a 100644 --- a/tests/jobs_test.go +++ b/tests/jobs_test.go @@ -208,11 +208,12 @@ func TestJobsError(t *testing.T) { Prefix: "rr", } + l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) err = cont.RegisterAll( cfg, &server.Plugin{}, &rpcPlugin.Plugin{}, - &logger.Plugin{}, + l, &jobs.Plugin{}, &resetter.Plugin{}, &informer.Plugin{}, @@ -279,6 +280,14 @@ func TestJobsError(t *testing.T) { stopCh <- struct{}{} wg.Wait() + require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) + require.Equal(t, 4, oLogger.FilterMessageSnippet("job processing was started").Len()) + require.Equal(t, 4, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was paused").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was resumed").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) + require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len()) + time.Sleep(time.Second * 5) } diff --git a/tests/php_test_files/jobs/jobs_err.php b/tests/php_test_files/jobs/jobs_err.php index 6f5420c..71ca773 100644 --- a/tests/php_test_files/jobs/jobs_err.php +++ b/tests/php_test_files/jobs/jobs_err.php @@ -5,7 +5,7 @@ use Spiral\RoadRunner\Jobs\Consumer; use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer; -ini_set('display_errors', 'stderr'); +ini_set("display_errors", "stderr"); require dirname(__DIR__) . "/vendor/autoload.php"; $consumer = new Spiral\RoadRunner\Jobs\Consumer(); @@ -13,14 +13,17 @@ while ($task = $consumer->waitTask()) { try { $headers = $task->getHeaders(); - $total_attempts = (int)$task->getHeaderLine("attempts") + 1; + $total_attempts = (int) $task->getHeaderLine("attempts") + 1; if ($total_attempts > 3) { - $task->complete(); + $task->ack(); } else { - $task->withHeader("attempts",$total_attempts)->withDelay(5)->fail("failed", true); + $task + ->withHeader("attempts", $total_attempts) + ->withDelay(5) + ->requeue("failed"); } } catch (\Throwable $e) { - $rr->error((string)$e); + $rr->error((string) $e); } } diff --git a/tests/php_test_files/jobs/jobs_ok.php b/tests/php_test_files/jobs/jobs_ok.php index e51920a..fb9f112 100644 --- a/tests/php_test_files/jobs/jobs_ok.php +++ b/tests/php_test_files/jobs/jobs_ok.php @@ -5,15 +5,15 @@ use Spiral\RoadRunner\Jobs\Consumer; use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer; -ini_set('display_errors', 'stderr'); +ini_set("display_errors", "stderr"); require dirname(__DIR__) . "/vendor/autoload.php"; $consumer = new Spiral\RoadRunner\Jobs\Consumer(); while ($task = $consumer->waitTask()) { try { - $task->complete(); + $task->ack(); } catch (\Throwable $e) { - $rr->error((string)$e); + $rr->error((string) $e); } } diff --git a/tests/php_test_files/jobs/jobs_ok_pq.php b/tests/php_test_files/jobs/jobs_ok_pq.php index 075970e..8ccf02b 100644 --- a/tests/php_test_files/jobs/jobs_ok_pq.php +++ b/tests/php_test_files/jobs/jobs_ok_pq.php @@ -5,16 +5,16 @@ use Spiral\RoadRunner\Jobs\Consumer; use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer; -ini_set('display_errors', 'stderr'); +ini_set("display_errors", "stderr"); require dirname(__DIR__) . "/vendor/autoload.php"; $consumer = new Spiral\RoadRunner\Jobs\Consumer(); while ($task = $consumer->waitTask()) { try { - sleep(15); - $task->complete(); + sleep(15); + $task->ack(); } catch (\Throwable $e) { - $rr->error((string)$e); + $rr->error((string) $e); } } diff --git a/tests/pkgs.txt b/tests/pkgs.txt new file mode 100644 index 0000000..b53e695 --- /dev/null +++ b/tests/pkgs.txt @@ -0,0 +1 @@ +github.com/roadrunner-server/amqp/v5,github.com/roadrunner-server/app-logger/v5,github.com/roadrunner-server/beanstalk/v5,github.com/roadrunner-server/boltdb/v5,github.com/roadrunner-server/centrifuge/v5,github.com/roadrunner-server/config/v5,github.com/roadrunner-server/fileserver/v5,github.com/roadrunner-server/grpc/v5,github.com/roadrunner-server/gzip/v5,github.com/roadrunner-server/headers/v5,github.com/roadrunner-server/http/v5,github.com/roadrunner-server/informer/v5,github.com/roadrunner-server/jobs/v5,github.com/roadrunner-server/kafka/v5,github.com/roadrunner-server/kv/v5,github.com/roadrunner-server/logger/v5,github.com/roadrunner-server/lock/v5,github.com/roadrunner-server/memcached/v5,github.com/roadrunner-server/memory/v5,github.com/roadrunner-server/metrics/v5,github.com/roadrunner-server/nats/v5,github.com/roadrunner-server/otel/v5,github.com/roadrunner-server/prometheus/v5,github.com/roadrunner-server/proxy_ip_parser/v5,github.com/roadrunner-server/redis/v5,github.com/roadrunner-server/resetter/v5,github.com/roadrunner-server/rpc/v5,github.com/roadrunner-server/send/v5,github.com/roadrunner-server/server/v5,github.com/roadrunner-server/service/v5,github.com/roadrunner-server/sqs/v5,github.com/roadrunner-server/static/v5,github.com/roadrunner-server/status/v5,github.com/roadrunner-server/tcp/v5