diff --git a/pubsubjobs/config.go b/pubsubjobs/config.go index 666fb4d..ffe0ff5 100644 --- a/pubsubjobs/config.go +++ b/pubsubjobs/config.go @@ -4,18 +4,20 @@ import "os" // pipeline rabbitmq info const ( - exchangeKey string = "exchange" + pref string = "prefetch" + skipTopicDeclaration string = "skip_topic_declaration" + topic string = "topic" ) // config is used to parse pipeline configuration type config struct { // global - ProjectID string `mapstructure:"project_id"` - Topic string `mapstructure:"topic"` - SkipTopicDeclaration bool `mapstructure:"skip_topic_declaration"` + ProjectID string `mapstructure:"project_id"` + Topic string `mapstructure:"topic"` + SkipTopicDeclaration bool `mapstructure:"skip_topic_declaration"` // local - Prefetch int `mapstructure:"prefetch"` + Prefetch int32 `mapstructure:"prefetch"` Priority int64 `mapstructure:"priority"` Host string `mapstructure:"host"` } diff --git a/pubsubjobs/driver.go b/pubsubjobs/driver.go index 8925478..0504540 100644 --- a/pubsubjobs/driver.go +++ b/pubsubjobs/driver.go @@ -2,12 +2,14 @@ package pubsubjobs import ( "context" + "strconv" "strings" "sync" "sync/atomic" "time" "cloud.google.com/go/pubsub" + "github.com/goccy/go-json" "github.com/roadrunner-server/api/v4/plugins/v3/jobs" "github.com/roadrunner-server/errors" jprop "go.opentelemetry.io/contrib/propagators/jaeger" @@ -36,14 +38,16 @@ type Driver struct { mu sync.Mutex cond sync.Cond - log *zap.Logger - pq jobs.Queue - pipeline atomic.Pointer[jobs.Pipeline] - tracer *sdktrace.TracerProvider - prop propagation.TextMapPropagator - consumeAll bool - skipDeclare bool - topic string + log *zap.Logger + pq jobs.Queue + pipeline atomic.Pointer[jobs.Pipeline] + tracer *sdktrace.TracerProvider + prop propagation.TextMapPropagator + consumeAll bool + skipDeclare bool + topic string + msgInFlight *int64 + msgInFlightLimit *int32 // if user invoke several resume operations listeners uint32 @@ -93,14 +97,16 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip // PARSE CONFIGURATION END ------- jb := &Driver{ - tracer: tracer, - prop: prop, - log: log, - skipDeclare: conf.SkipTopicDeclaration, - topic: conf.Topic, - pq: pq, - pauseCh: make(chan struct{}, 1), - cond: sync.Cond{L: &sync.Mutex{}}, + tracer: tracer, + prop: prop, + log: log, + skipDeclare: conf.SkipTopicDeclaration, + topic: conf.Topic, + pq: pq, + pauseCh: make(chan struct{}, 1), + cond: sync.Cond{L: &sync.Mutex{}}, + msgInFlightLimit: ptr(conf.Prefetch), + msgInFlight: ptr(int64(0)), } ctx := context.Background() @@ -145,14 +151,22 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap. // PARSE CONFIGURATION ------- jb := &Driver{ - prop: prop, - tracer: tracer, - log: log, - pq: pq, - pauseCh: make(chan struct{}, 1), - skipDeclare: conf.SkipTopicDeclaration, - topic: conf.Topic, - cond: sync.Cond{L: &sync.Mutex{}}, + prop: prop, + tracer: tracer, + log: log, + pq: pq, + pauseCh: make(chan struct{}, 1), + skipDeclare: pipe.Bool(skipTopicDeclaration, false), + topic: pipe.String(topic, "default"), + cond: sync.Cond{L: &sync.Mutex{}}, + msgInFlightLimit: ptr(int32(pipe.Int(pref, 10))), + msgInFlight: ptr(int64(0)), + } + + ctx := context.Background() + jb.client, err = pubsub.NewClient(ctx, conf.ProjectID) + if err != nil { + return nil, err } err = jb.manageTopic(context.Background()) @@ -174,7 +188,24 @@ func (d *Driver) Push(ctx context.Context, jb jobs.Message) error { ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "google_pub_sub_push") defer span.End() - result := d.client.Topic(d.topic).Publish(ctx, &pubsub.Message{Data: jb.Payload()}) + job := fromJob(jb) + + data, err := json.Marshal(job.headers) + if err != nil { + return err + } + + result := d.client.Topic(d.topic).Publish(ctx, &pubsub.Message{ + Data: jb.Payload(), + Attributes: map[string]string{ + jobs.RRID: job.Ident, + jobs.RRJob: job.Job, + jobs.RRDelay: strconv.Itoa(int(job.Options.Delay)), + jobs.RRHeaders: string(data), + jobs.RRPriority: strconv.Itoa(int(job.Options.Priority)), + jobs.RRAutoAck: btos(job.Options.AutoAck), + }, + }) id, err := result.Get(ctx) if err != nil { return err @@ -325,3 +356,7 @@ func (d *Driver) manageTopic(ctx context.Context) error { return nil } + +func ptr[T any](val T) *T { + return &val +} diff --git a/pubsubjobs/item.go b/pubsubjobs/item.go index 10db8c9..88065e8 100644 --- a/pubsubjobs/item.go +++ b/pubsubjobs/item.go @@ -1,18 +1,18 @@ package pubsubjobs import ( + "strconv" + "sync" + "sync/atomic" "time" "unsafe" + "cloud.google.com/go/pubsub" "github.com/goccy/go-json" + "go.uber.org/zap" - "github.com/roadrunner-server/api/v4/plugins/v1/jobs" -) - -var _ jobs.Acknowledger = (*Item)(nil) - -const ( - auto string = "deduced_by_rr" + "github.com/roadrunner-server/api/v4/plugins/v3/jobs" + "github.com/roadrunner-server/errors" ) type Item struct { @@ -23,7 +23,7 @@ type Item struct { // Payload is string data (usually JSON) passed to Job broker. Payload string `json:"payload"` // Headers with key-values pairs - Headers map[string][]string `json:"headers"` + headers map[string][]string `json:"headers"` // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } @@ -41,6 +41,11 @@ type Options struct { AutoAck bool `json:"auto_ack"` // AMQP Queue Queue string `json:"queue,omitempty"` + // Private ================ + cond *sync.Cond + message *pubsub.Message + msgInFlight *int64 + stopped *uint64 } // DelayDuration returns delay duration in a form of time.Duration. @@ -56,13 +61,17 @@ func (i *Item) Priority() int64 { return i.Options.Priority } +func (i *Item) GroupID() string { + return i.Options.Pipeline +} + // Body packs job payload into binary payload. func (i *Item) Body() []byte { return strToBytes(i.Payload) } -func (i *Item) Metadata() map[string][]string { - return i.Headers +func (i *Item) Headers() map[string][]string { + return i.headers } // Context packs job context (job, id) into binary payload. @@ -80,7 +89,7 @@ func (i *Item) Context() ([]byte, error) { ID: i.Ident, Job: i.Job, Driver: pluginName, - Headers: i.Headers, + Headers: i.headers, Queue: i.Options.Queue, Pipeline: i.Options.Pipeline, }, @@ -94,10 +103,37 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") + } + defer func() { + i.Options.cond.Signal() + atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) + }() + // just return in case of auto-ack + if i.Options.AutoAck { + return nil + } + + i.Options.message.Ack() return nil } func (i *Item) Nack() error { + if atomic.LoadUint64(i.Options.stopped) == 1 { + return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") + } + defer func() { + i.Options.cond.Signal() + atomic.AddInt64(i.Options.msgInFlight, ^int64(0)) + }() + // message already deleted + if i.Options.AutoAck { + return nil + } + + i.Options.message.Nack() + return nil } @@ -110,21 +146,20 @@ func (i *Item) Respond(_ []byte, _ string) error { return nil } -func fromJob(job jobs.Job) *Item { +func fromJob(job jobs.Message) *Item { return &Item{ Job: job.Name(), Ident: job.ID(), - Payload: job.Payload(), - Headers: job.Headers(), + Payload: string(job.Payload()), + headers: job.Headers(), Options: &Options{ Priority: job.Priority(), - Pipeline: job.Pipeline(), + Pipeline: job.GroupID(), Delay: job.Delay(), AutoAck: job.AutoAck(), }, } } - func bytesToStr(data []byte) string { if len(data) == 0 { return "" @@ -140,3 +175,82 @@ func strToBytes(data string) []byte { return unsafe.Slice(unsafe.StringData(data), len(data)) } + +func (c *Driver) unpack(message *pubsub.Message) *Item { + attributes := message.Attributes + + var rrid string + if val, ok := attributes[jobs.RRID]; ok { + rrid = val + } + + var rrj string + if val, ok := attributes[jobs.RRJob]; ok { + rrj = val + } + + h := make(map[string][]string) + if val, ok := attributes[jobs.RRHeaders]; ok { + err := json.Unmarshal([]byte(val), &h) + if err != nil { + c.log.Debug("failed to unpack the headers, not a JSON", zap.Error(err)) + } + } + + var autoAck bool + if val, ok := attributes[jobs.RRAutoAck]; ok { + autoAck = stob(val) + } + + var dl int + var err error + if val, ok := attributes[jobs.RRDelay]; ok { + dl, err = strconv.Atoi(val) + if err != nil { + c.log.Debug("failed to unpack the delay, not a number", zap.Error(err)) + } + } + + var priority int + if val, ok := attributes[jobs.RRPriority]; ok { + priority, err = strconv.Atoi(val) + if err != nil { + priority = int((*c.pipeline.Load()).Priority()) + c.log.Debug("failed to unpack the priority; inheriting the pipeline's default priority", zap.Error(err)) + } + } + + return &Item{ + Job: rrj, + Ident: rrid, + Payload: string(message.Data), + headers: h, + Options: &Options{ + AutoAck: autoAck, + Delay: int64(dl), + Priority: int64(priority), + Pipeline: (*c.pipeline.Load()).Name(), + // private + message: message, + msgInFlight: c.msgInFlight, + cond: &c.cond, + stopped: &c.stopped, + }, + } +} + +func btos(b bool) string { + if b { + return "true" + } + + return "false" +} + +func stob(s string) bool { + if s != "" { + return s == "true" + } + + return false +} diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index 40a8572..590ed0b 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -1,6 +1,13 @@ package pubsubjobs -import "context" +import ( + "context" + "sync/atomic" + + "cloud.google.com/go/pubsub" + "go.opentelemetry.io/otel/propagation" + "go.uber.org/zap" +) func (d *Driver) listen(ctx context.Context) { go func() { @@ -10,6 +17,43 @@ func (d *Driver) listen(ctx context.Context) { d.log.Debug("listener was stopped") return default: + s, err := d.client.Topic(d.topic).Subscriptions(ctx).Next() + if err != nil { + d.log.Error("subscription iteration", zap.Error(err)) + continue + } + + s.Receive(context.Background(), func(ctx context.Context, message *pubsub.Message) { + d.cond.L.Lock() + // lock when we hit the limit + for atomic.LoadInt64(d.msgInFlight) >= int64(atomic.LoadInt32(d.msgInFlightLimit)) { + d.log.Debug("prefetch limit was reached, waiting for the jobs to be processed", zap.Int64("current", atomic.LoadInt64(d.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(d.msgInFlightLimit))) + d.cond.Wait() + } + + d.log.Debug("receive message", zap.Stringp("ID", &message.ID)) + item := d.unpack(message) + + ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener") + if item.Options.AutoAck { + item.Ack() + d.log.Debug("auto ack is turned on, message acknowledged") + span.End() + } + + if item.headers == nil { + item.headers = make(map[string][]string, 2) + } + + d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers)) + + d.pq.Insert(item) + // increase the current number of messages + atomic.AddInt64(d.msgInFlight, 1) + d.log.Debug("message pushed to the priority queue", zap.Int64("current", atomic.LoadInt64(d.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(d.msgInFlightLimit))) + d.cond.L.Unlock() + span.End() + }) } } }() diff --git a/tests/configs/.rr-declare.yaml b/tests/configs/.rr-declare.yaml new file mode 100644 index 0000000..0663e4c --- /dev/null +++ b/tests/configs/.rr-declare.yaml @@ -0,0 +1,28 @@ +version: '3' + +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php php_test_files/jobs/jobs_ok.php" + relay: "pipes" + relay_timeout: "20s" + +google-pub-sub: + project_id: test + topic: rrTopic + skip_topic_declaration: false + host: 127.0.0.1:8085 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 10 + allocate_timeout: 60s + destroy_timeout: 60s \ No newline at end of file diff --git a/tests/configs/.rr-jobs-err.yaml b/tests/configs/.rr-jobs-err.yaml new file mode 100644 index 0000000..4eb37e1 --- /dev/null +++ b/tests/configs/.rr-jobs-err.yaml @@ -0,0 +1,30 @@ +version: '3' + +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php php_test_files/jobs/jobs_err.php" + relay: "pipes" + relay_timeout: "20s" + +google-pub-sub: + project_id: test + topic: rrTopic + skip_topic_declaration: false + host: 127.0.0.1:8085 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 10 + timeout: 60 + pipeline_size: 100000 + pool: + num_workers: 10 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s \ No newline at end of file diff --git a/tests/configs/.rr-pq.yaml b/tests/configs/.rr-pq.yaml new file mode 100644 index 0000000..4318edb --- /dev/null +++ b/tests/configs/.rr-pq.yaml @@ -0,0 +1,55 @@ +version: '3' + +rpc: + listen: tcp://127.0.0.1:6601 + +server: + command: "php php_test_files/jobs/jobs_ok_pq.php" + relay: "pipes" + +google-pub-sub: + project_id: test + topic: rrTopic + skip_topic_declaration: false + host: 127.0.0.1:8085 + +logs: + level: debug + encoding: console + mode: development + +jobs: + num_pollers: 2 + pipeline_size: 100000 + pool: + num_workers: 2 + allocate_timeout: 60s + destroy_timeout: 60s + + pipelines: + test-1-pq: + driver: google-pub-sub + config: + prefetch: 1000 + visibility_timeout: 0 + wait_time_seconds: 0 + queue: default-1-pq + attributes: + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 30 + tags: + test: "tag-pq" + + test-2-pq: + driver: google-pub-sub + config: + prefetch: 1000 + queue: default-2-pq + attributes: + MessageRetentionPeriod: 86400 + tags: + test: "tag" + consume: [ "test-1-pq", "test-2-pq" ] diff --git a/tests/helpers/helpers.go b/tests/helpers/helpers.go index a0e3804..0b93b27 100644 --- a/tests/helpers/helpers.go +++ b/tests/helpers/helpers.go @@ -1,9 +1,7 @@ package helpers import ( - "bytes" "net" - "net/http" "net/rpc" "testing" "time" @@ -162,42 +160,25 @@ func Stats(address string, state *jobState.State) func(t *testing.T) { } } -func EnableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":true}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func DisableProxy(name string, t *testing.T) { - buf := new(bytes.Buffer) - buf.WriteString(`{"enabled":false}`) - - resp, err := http.Post("http://127.0.0.1:8474/proxies/"+name, "application/json", buf) //nolint:noctx - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() - } -} - -func DeleteProxy(name string, t *testing.T) { - client := &http.Client{} - - req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:8474/proxies/"+name, nil) //nolint:noctx - require.NoError(t, err) +func DeclarePipe(queue string, address string, pipeline string) func(t *testing.T) { + return func(t *testing.T) { + conn, err := net.Dial("tcp", address) + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - resp, err := client.Do(req) - require.NoError(t, err) + pipe := &jobsProto.DeclareRequest{Pipeline: map[string]string{ + "driver": "google-pub-sub", + "name": pipeline, + "queue": queue, + "prefetch": "10", + "priority": "3", + "visibility_timeout": "0", + "wait_time_seconds": "3", + "tags": `{"key":"value"}`, + }} - require.NoError(t, err) - require.Equal(t, 204, resp.StatusCode) - if resp.Body != nil { - _ = resp.Body.Close() + er := &jobsProto.Empty{} + err = client.Call("jobs.Declare", pipe, er) + assert.NoError(t, err) } } diff --git a/tests/jobs_test.go b/tests/jobs_test.go index d2c95fa..66e8e13 100644 --- a/tests/jobs_test.go +++ b/tests/jobs_test.go @@ -10,17 +10,19 @@ import ( "time" "tests/helpers" + mocklogger "tests/mock" "github.com/roadrunner-server/config/v4" "github.com/roadrunner-server/endure/v2" + googlePubSub "github.com/roadrunner-server/google-pub-sub/v4" "github.com/roadrunner-server/informer/v4" "github.com/roadrunner-server/jobs/v4" "github.com/roadrunner-server/logger/v4" "github.com/roadrunner-server/resetter/v4" rpcPlugin "github.com/roadrunner-server/rpc/v4" "github.com/roadrunner-server/server/v4" - googlePubSub "github.com/roadrunner-server/google-pub-sub/v4" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) func TestInit(t *testing.T) { @@ -99,3 +101,255 @@ func TestInit(t *testing.T) { stopCh <- struct{}{} wg.Wait() } + +func TestDeclare(t *testing.T) { + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "2023.3.0", + Path: "configs/.rr-declare.yaml", + Prefix: "rr", + } + + err := cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.Plugin{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &googlePubSub.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", helpers.DeclarePipe("default", "127.0.0.1:6001", "test-3")) + t.Run("ConsumePipeline", helpers.ResumePipes("127.0.0.1:6001", "test-3")) + t.Run("PushPipeline", helpers.PushToPipe("test-3", false, "127.0.0.1:6001")) + time.Sleep(time.Second) + t.Run("PausePipeline", helpers.PausePipelines("127.0.0.1:6001", "test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() +} + +func TestJobsError(t *testing.T) { + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "2023.3.0", + Path: "configs/.rr-jobs-err.yaml", + Prefix: "rr", + } + + err := cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.Plugin{}, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &googlePubSub.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + t.Run("DeclarePipeline", helpers.DeclarePipe("default", "127.0.0.1:6001", "test-3")) + t.Run("ConsumePipeline", helpers.ResumePipes("127.0.0.1:6001", "test-3")) + t.Run("PushPipeline", helpers.PushToPipe("test-3", false, "127.0.0.1:6001")) + time.Sleep(time.Second * 25) + t.Run("PausePipeline", helpers.PausePipelines("127.0.0.1:6001", "test-3")) + time.Sleep(time.Second) + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6001", "test-3")) + + time.Sleep(time.Second * 5) + stopCh <- struct{}{} + wg.Wait() + + time.Sleep(time.Second * 5) +} + +func TestRemovePQ(t *testing.T) { + cont := endure.New(slog.LevelDebug) + + cfg := &config.Plugin{ + Version: "2023.2.0", + Path: "configs/.rr-pq.yaml", + Prefix: "rr", + } + + l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel) + err := cont.RegisterAll( + cfg, + &server.Plugin{}, + &rpcPlugin.Plugin{}, + l, + &jobs.Plugin{}, + &resetter.Plugin{}, + &informer.Plugin{}, + &googlePubSub.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 3) + + for i := 0; i < 10; i++ { + t.Run("PushPipeline", helpers.PushToPipe("test-1-pq", false, "127.0.0.1:6601")) + t.Run("PushPipeline", helpers.PushToPipe("test-2-pq", false, "127.0.0.1:6601")) + } + time.Sleep(time.Second * 3) + + t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6601", "test-1-pq", "test-2-pq")) + + stopCh <- struct{}{} + wg.Wait() + + assert.Equal(t, 0, oLogger.FilterMessageSnippet("job was processed successfully").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was started").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len()) + assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len()) + assert.Equal(t, 2, oLogger.FilterMessageSnippet("listener was stopped").Len()) +} diff --git a/tests/mock/logger.go b/tests/mock/logger.go new file mode 100644 index 0000000..9bc251f --- /dev/null +++ b/tests/mock/logger.go @@ -0,0 +1,64 @@ +package mocklogger + +import ( + "github.com/roadrunner-server/endure/v2/dep" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type ZapLoggerMock struct { + l *zap.Logger +} + +type Logger interface { + NamedLogger(string) *zap.Logger +} + +func ZapTestLogger(enab zapcore.LevelEnabler) (*ZapLoggerMock, *ObservedLogs) { + core, logs := New(enab) + obsLog := zap.New(core, zap.Development()) + + return &ZapLoggerMock{ + l: obsLog, + }, logs +} + +func (z *ZapLoggerMock) Init() error { + return nil +} + +func (z *ZapLoggerMock) Serve() chan error { + return make(chan error, 1) +} + +func (z *ZapLoggerMock) Stop() error { + return z.l.Sync() +} + +func (z *ZapLoggerMock) Provides() []*dep.Out { + return []*dep.Out{ + dep.Bind((*Logger)(nil), z.ProvideLogger), + } +} + +func (z *ZapLoggerMock) Weight() uint { + return 100 +} + +func (z *ZapLoggerMock) ProvideLogger() *Log { + return NewLogger(z.l) +} + +type Log struct { + base *zap.Logger +} + +func NewLogger(log *zap.Logger) *Log { + return &Log{ + base: log, + } +} + +func (l *Log) NamedLogger(string) *zap.Logger { + return l.base +} diff --git a/tests/mock/observer.go b/tests/mock/observer.go new file mode 100644 index 0000000..061cec7 --- /dev/null +++ b/tests/mock/observer.go @@ -0,0 +1,199 @@ +package mocklogger + +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import ( + "strings" + "sync" + "time" + + "go.uber.org/zap/zapcore" +) + +// An LoggedEntry is an encoding-agnostic representation of a log message. +// Field availability is context dependant. +type LoggedEntry struct { + zapcore.Entry + Context []zapcore.Field +} + +// ContextMap returns a map for all fields in Context. +func (e LoggedEntry) ContextMap() map[string]any { + encoder := zapcore.NewMapObjectEncoder() + for _, f := range e.Context { + f.AddTo(encoder) + } + return encoder.Fields +} + +// ObservedLogs is a concurrency-safe, ordered collection of observed logs. +type ObservedLogs struct { + mu sync.RWMutex + logs []LoggedEntry +} + +// Len returns the number of items in the collection. +func (o *ObservedLogs) Len() int { + o.mu.RLock() + n := len(o.logs) + o.mu.RUnlock() + return n +} + +// All returns a copy of all the observed logs. +func (o *ObservedLogs) All() []LoggedEntry { + o.mu.RLock() + ret := make([]LoggedEntry, len(o.logs)) + copy(ret, o.logs) + o.mu.RUnlock() + return ret +} + +// TakeAll returns a copy of all the observed logs, and truncates the observed +// slice. +func (o *ObservedLogs) TakeAll() []LoggedEntry { + o.mu.Lock() + ret := o.logs + o.logs = nil + o.mu.Unlock() + return ret +} + +// AllUntimed returns a copy of all the observed logs, but overwrites the +// observed timestamps with time.Time's zero value. This is useful when making +// assertions in tests. +func (o *ObservedLogs) AllUntimed() []LoggedEntry { + ret := o.All() + for i := range ret { + ret[i].Time = time.Time{} + } + return ret +} + +// FilterLevelExact filters entries to those logged at exactly the given level. +func (o *ObservedLogs) FilterLevelExact(level zapcore.Level) *ObservedLogs { + return o.Filter(func(e LoggedEntry) bool { + return e.Level == level + }) +} + +// FilterMessage filters entries to those that have the specified message. +func (o *ObservedLogs) FilterMessage(msg string) *ObservedLogs { + return o.Filter(func(e LoggedEntry) bool { + return e.Message == msg + }) +} + +// FilterMessageSnippet filters entries to those that have a message containing the specified snippet. +func (o *ObservedLogs) FilterMessageSnippet(snippet string) *ObservedLogs { + return o.Filter(func(e LoggedEntry) bool { + return strings.Contains(e.Message, snippet) + }) +} + +// FilterField filters entries to those that have the specified field. +func (o *ObservedLogs) FilterField(field zapcore.Field) *ObservedLogs { + return o.Filter(func(e LoggedEntry) bool { + for _, ctxField := range e.Context { + if ctxField.Equals(field) { + return true + } + } + return false + }) +} + +// FilterFieldKey filters entries to those that have the specified key. +func (o *ObservedLogs) FilterFieldKey(key string) *ObservedLogs { + return o.Filter(func(e LoggedEntry) bool { + for _, ctxField := range e.Context { + if ctxField.Key == key { + return true + } + } + return false + }) +} + +// Filter returns a copy of this ObservedLogs containing only those entries +// for which the provided function returns true. +func (o *ObservedLogs) Filter(keep func(LoggedEntry) bool) *ObservedLogs { + o.mu.RLock() + defer o.mu.RUnlock() + + var filtered []LoggedEntry + for _, entry := range o.logs { + if keep(entry) { + filtered = append(filtered, entry) + } + } + return &ObservedLogs{logs: filtered} +} + +func (o *ObservedLogs) add(log LoggedEntry) { + o.mu.Lock() + o.logs = append(o.logs, log) + o.mu.Unlock() +} + +// New creates a new Core that buffers logs in memory (without any encoding). +// It's particularly useful in tests. +func New(enab zapcore.LevelEnabler) (zapcore.Core, *ObservedLogs) { + ol := &ObservedLogs{} + return &contextObserver{ + LevelEnabler: enab, + logs: ol, + }, ol +} + +type contextObserver struct { + zapcore.LevelEnabler + logs *ObservedLogs + context []zapcore.Field +} + +func (co *contextObserver) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if co.Enabled(ent.Level) { + return ce.AddCore(ent, co) + } + return ce +} + +func (co *contextObserver) With(fields []zapcore.Field) zapcore.Core { + return &contextObserver{ + LevelEnabler: co.LevelEnabler, + logs: co.logs, + context: append(co.context[:len(co.context):len(co.context)], fields...), + } +} + +func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) error { + all := make([]zapcore.Field, 0, len(fields)+len(co.context)) + all = append(all, co.context...) + all = append(all, fields...) + co.logs.add(LoggedEntry{ent, all}) + + return nil +} + +func (co *contextObserver) Sync() error { + return nil +}