Skip to content

Commit

Permalink
CI fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cv65kr committed Feb 10, 2024
1 parent 148f9b0 commit c1dda19
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ jobs:
mkdir ./coverage-ci
docker compose -f env/docker-compose-emulator-local.yaml up -d
sleep 30
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/sqs.out -covermode=atomic jobs_sqs_test.go jobs_sqs_fifo_test.go
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/pubsub.out -covermode=atomic jobs_test.go
- name: Run unit tests with coverage
run: |
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat ./tests/pkgs.txt) -coverprofile=./tests/coverage-ci/sqs_u.out -covermode=atomic ./...
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat ./tests/pkgs.txt) -coverprofile=./tests/coverage-ci/pubsub_u.out -covermode=atomic ./...
- name: Archive code coverage results
uses: actions/upload-artifact@v4
Expand Down
2 changes: 1 addition & 1 deletion pubsubjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (d *Driver) Push(ctx context.Context, jb jobs.Message) error {

job := fromJob(jb)

data, err := json.Marshal(job.headers)
data, err := json.Marshal(job.Metadata)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pubsubjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Item struct {
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
headers map[string][]string `json:"headers"`
Metadata map[string][]string `json:"headers"`
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (i *Item) Body() []byte {
}

func (i *Item) Headers() map[string][]string {
return i.headers
return i.Metadata
}

// Context packs job context (job, id) into binary payload.
Expand All @@ -89,7 +89,7 @@ func (i *Item) Context() ([]byte, error) {
ID: i.Ident,
Job: i.Job,
Driver: pluginName,
Headers: i.headers,
Headers: i.Metadata,
Queue: i.Options.Queue,
Pipeline: i.Options.Pipeline,
},
Expand Down Expand Up @@ -152,7 +152,7 @@ func fromJob(job jobs.Message) *Item {
Job: job.Name(),
Ident: job.ID(),
Payload: string(job.Payload()),
headers: job.Headers(),
Metadata: job.Headers(),
Options: &Options{
Priority: job.Priority(),
Pipeline: job.GroupID(),
Expand Down Expand Up @@ -218,7 +218,7 @@ func (c *Driver) unpack(message *pubsub.Message) *Item {
Job: rrj,
Ident: rrid,
Payload: string(message.Data),
headers: h,
Metadata: h,
Options: &Options{
AutoAck: autoAck,
Delay: int64(dl),
Expand Down
8 changes: 4 additions & 4 deletions pubsubjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *Driver) listen(ctx context.Context) {
d.log.Debug("receive message", zap.Stringp("ID", &message.ID))
item := d.unpack(message)

ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "google_pub_sub_listener")
ctxspan, span := d.tracer.Tracer(tracerName).Start(d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.Metadata)), "google_pub_sub_listener")
if item.Options.AutoAck {
_, err := message.AckWithResult().Get(ctx)
if err != nil {
Expand All @@ -40,11 +40,11 @@ func (d *Driver) listen(ctx context.Context) {
d.log.Debug("auto ack is turned on, message acknowledged")
}

if item.headers == nil {
item.headers = make(map[string][]string, 2)
if item.Metadata == nil {
item.Metadata = make(map[string][]string, 2)
}

d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.headers))
d.prop.Inject(ctxspan, propagation.HeaderCarrier(item.Metadata))

d.pq.Insert(item)
// increase the current number of messages
Expand Down

0 comments on commit c1dda19

Please sign in to comment.