-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFunction.cs
135 lines (112 loc) · 6.2 KB
/
Function.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
using System;
using System.Collections.Generic;
using System.Threading;
using Amazon.Lambda.SQSEvents;
using Amazon.SQS;
using Amazon.SQS.Model;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using System.Linq;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace CompetingConsumerLambda
{
public class Function
{
private const string InputQueueUrl = "https://sqs.us-east-2.amazonaws.com/628654266155/PoCQueue";
private const string OutputQueueUrl = "https://sqs.us-east-2.amazonaws.com/628654266155/YetAnotherQueue";
private const string RetryCount = "retryCount";
private const int TimeOutSeconds = 3;
private const int DelayMillisecondsSeconds = 20;
private AmazonSQSClient SqSClient => new AmazonSQSClient();
// acknowledging of message is done by AWS
// because lambda is configured to be triggered !
public async Task FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
{
//actually sqsEvent.Records.Count = 1
var message = sqsEvent.Records.First();
context.Logger.Log($" ** starting new lambda instance at {DateTime.Now.ToLongTimeString()} ** ");
context.Logger.Log($"trying to handle message id = {message.MessageId} , body - {message.Body}");
context.Logger.Log($"** message attributes keys {string.Join(", ", message.MessageAttributes.Keys)}");
context.Logger.Log($"** message attributes' values {string.Join(", ", message?.MessageAttributes?.Values.Select(v=>v.StringValue))} ");
try
{
//check which task has finished execution
var completedWithinTime = Task.WaitAll(new []{Task.Run(() => this.CallUnpredictableExternalAPI(message, context.Logger))}, TimeSpan.FromSeconds(TimeOutSeconds));
if (completedWithinTime)//happy path
{
context.Logger.Log($"**** happy path, message {message.Body} was resent to another queue ******");
}
else//External call has timed out, so have to increase visibility timeout
{
context.Logger.Log($"**** external call thread timed out ******");
context.Logger.Log($"**** seconds left in lambda {context.RemainingTime.Seconds} ******");
await this.ResendMessageToAnotherQueue(InputQueueUrl, message, context.Logger, DelayMillisecondsSeconds);
}
}
//we are here if external call threw exception
catch (AggregateException e)
{
context.Logger.Log($"**** exception details {e} ******");
context.Logger.Log($"**** seconds left in lambda {context.RemainingTime.Seconds} ******");
await this.ResendMessageToAnotherQueue(InputQueueUrl, message, context.Logger, DelayMillisecondsSeconds);
}
context.Logger.Log("____________________ processing complete ____________________");
}
private async Task<SendMessageResponse> ResendMessageToAnotherQueue(string queue, SQSEvent.SQSMessage message, ILambdaLogger logger, int delaySeconds = 0)
{
logger.Log($" ** sending message {message.Body} to queue {queue} ** ");
var writeMessageRequest = new SendMessageRequest(queue, message.Body);
int retryCounter = 0;
if (message.MessageAttributes.ContainsKey(RetryCount))
{
retryCounter = Convert.ToInt32(message.MessageAttributes[RetryCount].StringValue);
retryCounter++;
}
writeMessageRequest.MessageAttributes = new Dictionary<string, MessageAttributeValue>();
writeMessageRequest.MessageAttributes.Add(RetryCount, new MessageAttributeValue() { DataType = "String", StringValue = (retryCounter).ToString() });
//Normalize distribution of incoming messages in time by function x2
writeMessageRequest.DelaySeconds = retryCounter * retryCounter * delaySeconds;
return await this.SqSClient.SendMessageAsync(writeMessageRequest);
}
/// <summary>
/// This method fails or times out with at least 20% chance
/// </summary>
private Task CallUnpredictableExternalAPI(SQSEvent.SQSMessage message, ILambdaLogger contextLogger)
{
contextLogger.Log(" ** inside CallExternalService **");
//about 10% of requests should explode
var seed = Guid.NewGuid().GetHashCode();
contextLogger.Log($" ** checking RND seed {seed} **");
var rnd = new Random(seed);
int failureChance = rnd.Next(1, 11);
if (failureChance == 5)
{
contextLogger.Log($" ** about to throw exception, failureChance = {failureChance} **");
throw new ApplicationException();
}
//about 10% of requests should time out
failureChance = rnd.Next(1, 11);
if (failureChance == 10)
{
contextLogger.Log($" ** about to freeze, failureChance = {failureChance} **");
Thread.Sleep(TimeOutSeconds*1000);
return Task.Run(()=>{});
}
//this is happy path
return this.ResendMessageToAnotherQueue(OutputQueueUrl,message, contextLogger);
}
private async Task ChangeVisibilityTimeOutAsyncTask(SQSEvent.SQSMessage message, ILambdaLogger contextLogger, int timeoutSeconds)
{
contextLogger.Log($" *** about to change visibility timeout to {timeoutSeconds} seconds***");
var batRequest = new ChangeMessageVisibilityRequest
{
ReceiptHandle = message.ReceiptHandle,
VisibilityTimeout = timeoutSeconds,
QueueUrl = InputQueueUrl
};
var response = await this.SqSClient.ChangeMessageVisibilityAsync(batRequest);
contextLogger.Log($" *** visibility timeout result status code is {response.HttpStatusCode} ***");
}
}
}