Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: append more context to errors from the consumer #559

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS receive message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
);
}
}
Expand Down Expand Up @@ -479,6 +480,8 @@ export class Consumer extends TypedEventEmitter {
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
message,
),
message,
);
Expand Down Expand Up @@ -514,6 +517,8 @@ export class Consumer extends TypedEventEmitter {
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
messages,
),
messages,
);
Expand Down Expand Up @@ -549,12 +554,14 @@ export class Consumer extends TypedEventEmitter {
throw toTimeoutError(
err,
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
message,
);
}
if (err instanceof Error) {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
message,
);
}
throw err;
Expand All @@ -581,6 +588,7 @@ export class Consumer extends TypedEventEmitter {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
messages,
);
}
throw err;
Expand Down Expand Up @@ -616,6 +624,8 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
message,
);
}
}
Expand Down Expand Up @@ -654,6 +664,8 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
messages,
);
}
}
Expand Down
45 changes: 43 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Message } from "@aws-sdk/client-sqs";

import { AWSError } from "./types.js";

class SQSError extends Error {
Expand All @@ -9,6 +11,8 @@ class SQSError extends Error {
fault: AWSError["$fault"];
response?: AWSError["$response"];
metadata?: AWSError["$metadata"];
queueUrl?: string;
messageIds?: string[];

constructor(message: string) {
super(message);
Expand All @@ -17,24 +21,28 @@ class SQSError extends Error {
}

class TimeoutError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "Operation timed out.") {
super(message);
this.message = message;
this.name = "TimeoutError";
this.messageIds = [];
}
}

class StandardError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "An unexpected error occurred:") {
super(message);
this.message = message;
this.name = "StandardError";
this.messageIds = [];
}
}

Expand Down Expand Up @@ -64,6 +72,17 @@ function isConnectionError(err: Error): boolean {
return false;
}

/**
* Gets the message IDs from the message.
* @param message The message that was received from SQS.
*/
function getMessageIds(message: Message | Message[]): string[] {
if (Array.isArray(message)) {
return message.map((m) => m.MessageId);
}
return [message.MessageId];
}

/**
* Formats an AWSError the the SQSError type.
* @param err The error object that was received.
Expand All @@ -73,6 +92,8 @@ function toSQSError(
err: AWSError,
message: string,
extendedAWSErrors: boolean,
queueUrl?: string,
sqsMessage?: Message | Message[],
): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
Expand All @@ -87,18 +108,32 @@ function toSQSError(
sqsError.metadata = err.$metadata;
}

if (queueUrl) {
sqsError.queueUrl = queueUrl;
}

if (sqsMessage) {
sqsError.messageIds = getMessageIds(sqsMessage);
}

return sqsError;
}

/**
* Formats an Error to the StandardError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toStandardError(err: Error, message: string): StandardError {
function toStandardError(
err: Error,
message: string,
sqsMessage: Message | Message[],
): StandardError {
const error = new StandardError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand All @@ -107,11 +142,17 @@ function toStandardError(err: Error, message: string): StandardError {
* Formats an Error to the TimeoutError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toTimeoutError(err: TimeoutError, message: string): TimeoutError {
function toTimeoutError(
err: TimeoutError,
message: string,
sqsMessage: Message | Message[],
): TimeoutError {
const error = new TimeoutError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand Down
153 changes: 153 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ describe("Consumer", () => {
"Unexpected message handler failure: Processing error",
);
assert.equal(message.MessageId, "123");
assert.deepEqual((err as any).messageIds, ["123"]);
});

it("fires an `error` event when an `SQSError` occurs processing a message", async () => {
Expand Down Expand Up @@ -1674,6 +1675,8 @@ describe("Consumer", () => {

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1"]);
});

it("emit error when changing visibility timeout fails for batch handler functions", async () => {
Expand Down Expand Up @@ -1706,6 +1709,156 @@ describe("Consumer", () => {

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1", "2"]);
});

it("includes messageIds in timeout errors", async () => {
const handleMessageTimeout = 500;
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 1000)),
handleMessageTimeout,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "timeout_error"),
clock.tickAsync(handleMessageTimeout),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
`Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`,
);
assert.deepEqual(err.messageIds, ["123"]);
});

it("includes messageIds in batch processing errors", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
],
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () => {
throw new Error("Batch processing error");
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(100),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
"Unexpected message handler failure: Batch processing error",
);
assert.deepEqual(err.messageIds, ["1", "2"]);
});

it("includes queueUrl and messageIds in SQS errors when deleting message", async () => {
const deleteErr = new Error("Delete error");
deleteErr.name = "SQSError";

handleMessage.resolves(null);
sqs.send.withArgs(mockDeleteMessage).rejects(deleteErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(100),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "SQS delete message failed: Delete error");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["123"]);
});

it("includes queueUrl and messageIds in SQS errors when changing visibility timeout", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
],
});
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 75000)),
sqs,
visibilityTimeout: 40,
heartbeatInterval: 30,
});

const receiveErr = new MockSQSError("failed");
sqs.send.withArgs(mockChangeMessageVisibility).rejects(receiveErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(75000),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1"]);
});

it("includes queueUrl and messageIds in batch SQS errors", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
],
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () =>
new Promise((resolve) => setTimeout(resolve, 75000)),
sqs,
batchSize: 2,
visibilityTimeout: 40,
heartbeatInterval: 30,
});

const receiveErr = new MockSQSError("failed");
sqs.send.withArgs(mockChangeMessageVisibilityBatch).rejects(receiveErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(75000),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1", "2"]);
});
});

Expand Down