diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 2df8787..6e33100 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -73,11 +73,11 @@ jobs: mkdir ./coverage-ci docker compose -f env/docker-compose-emulator-local.yaml up -d sleep 30 - go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/sqs.out -covermode=atomic jobs_sqs_test.go jobs_sqs_fifo_test.go + go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/pubsub.out -covermode=atomic jobs_test.go - name: Run unit tests with coverage run: | - go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat ./tests/pkgs.txt) -coverprofile=./tests/coverage-ci/sqs_u.out -covermode=atomic ./... + go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat ./tests/pkgs.txt) -coverprofile=./tests/coverage-ci/pubsub_u.out -covermode=atomic ./... - name: Archive code coverage results uses: actions/upload-artifact@v4 diff --git a/pubsubjobs/driver.go b/pubsubjobs/driver.go index 18cae17..77ad086 100644 --- a/pubsubjobs/driver.go +++ b/pubsubjobs/driver.go @@ -190,7 +190,7 @@ func (d *Driver) Push(ctx context.Context, jb jobs.Message) error { job := fromJob(jb) - data, err := json.Marshal(job.headers) + data, err := json.Marshal(job.Metadata) if err != nil { return err } diff --git a/pubsubjobs/item.go b/pubsubjobs/item.go index c6784c0..7bf76cd 100644 --- a/pubsubjobs/item.go +++ b/pubsubjobs/item.go @@ -23,7 +23,7 @@ type Item struct { // Payload is string data (usually JSON) passed to Job broker. Payload string `json:"payload"` // Headers with key-values pairs - headers map[string][]string `json:"headers"` + Metadata map[string][]string `json:"headers"` // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` } @@ -71,7 +71,7 @@ func (i *Item) Body() []byte { } func (i *Item) Headers() map[string][]string { - return i.headers + return i.Metadata } // Context packs job context (job, id) into binary payload. @@ -89,7 +89,7 @@ func (i *Item) Context() ([]byte, error) { ID: i.Ident, Job: i.Job, Driver: pluginName, - Headers: i.headers, + Headers: i.Metadata, Queue: i.Options.Queue, Pipeline: i.Options.Pipeline, }, @@ -152,7 +152,7 @@ func fromJob(job jobs.Message) *Item { Job: job.Name(), Ident: job.ID(), Payload: string(job.Payload()), - headers: job.Headers(), + Metadata: job.Headers(), Options: &Options{ Priority: job.Priority(), Pipeline: job.GroupID(), @@ -218,7 +218,7 @@ func (c *Driver) unpack(message *pubsub.Message) *Item { Job: rrj, Ident: rrid, Payload: string(message.Data), - headers: h, + Metadata: h, Options: &Options{ AutoAck: autoAck, Delay: int64(dl), diff --git a/pubsubjobs/listener.go b/pubsubjobs/listener.go index ef4f08d..41c3bac 100644 --- a/pubsubjobs/listener.go +++ b/pubsubjobs/listener.go @@ -28,7 +28,7 @@ func (d *Driver) listen(ctx context.Context) { d.log.Debug("receive message", zap.Stringp("ID", &message.ID)) item := d.unpack(message) - ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener") + ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Metadata)), "google_pub_sub_listener") if item.Options.AutoAck { _, err := message.AckWithResult().Get(ctx) if err != nil { @@ -40,11 +40,11 @@ func (d *Driver) listen(ctx context.Context) { d.log.Debug("auto ack is turned on, message acknowledged") } - if item.headers == nil { - item.headers = make(map[string][]string, 2) + if item.Metadata == nil { + item.Metadata = make(map[string][]string, 2) } - d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers)) + d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.Metadata)) d.pq.Insert(item) // increase the current number of messages