Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EM-933 Throttling errors withing send-message state machine #159

Draft
wants to merge 29 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
151abdf
EM-933 - dep updates
johnmarston-nhs Sep 16, 2024
a745c0d
EM-933: resolved warning about multiple event endpoints ebing set.
johnmarston-nhs Sep 17, 2024
da87b5b
EM-933. Added a lock table, locking mechanism and a test for a simple…
johnmarston-nhs Sep 18, 2024
39d0aa0
EM-933. Added failure to lock and associated test.
johnmarston-nhs Sep 19, 2024
deb3ffb
EM-933. Implemented lock release and tests.
johnmarston-nhs Sep 20, 2024
3ae251a
EM-933. Fixed linter issues.
johnmarston-nhs Sep 20, 2024
c2a5d6d
EM-933. TFSec pacification.
johnmarston-nhs Sep 20, 2024
8342d4d
EM-933. TFLint pacification.
johnmarston-nhs Sep 20, 2024
ff7491d
EM-933. Fixing existing tests to work with the new lock table and fie…
johnmarston-nhs Sep 20, 2024
4f69672
EM-933. Fixing existing tests to work with the new lock table and fie…
johnmarston-nhs Sep 20, 2024
8c92024
EM-933: version bumps.
johnmarston-nhs Oct 1, 2024
a07b466
EM-933: version bumps.
johnmarston-nhs Oct 1, 2024
cad5a86
EM-933. Remove obsolete tests around invocation directly fro EventBri…
johnmarston-nhs Oct 1, 2024
0a8e7b5
EM-933. Merged in changes from other dev branch
johnmarston-nhs Oct 1, 2024
09d13dc
EM-933: ruff fix
johnmarston-nhs Oct 1, 2024
3936ccb
EM-933. Proper release handling and mocked tests.
johnmarston-nhs Oct 2, 2024
849f5df
EM-933. Fixed some import ordering.
johnmarston-nhs Oct 2, 2024
9ba6520
EM-933. Comment rewording.
johnmarston-nhs Oct 2, 2024
ace6aee
EM-933. Build fixes.
johnmarston-nhs Oct 3, 2024
97b8c45
EM-933. handling missing lock details when releasing... + tests
johnmarston-nhs Oct 3, 2024
c68e057
EM-933. Linting.
johnmarston-nhs Oct 3, 2024
ff66289
EM-933. Added print statemente for test run.
johnmarston-nhs Oct 3, 2024
beab61a
EM-933. Added print statemente for test run.
johnmarston-nhs Oct 3, 2024
bc2fb2c
EM-933. Accepting that pulling out the lock row is the only way to fi…
johnmarston-nhs Oct 4, 2024
f5eb951
EM-993. Implemented fetch locking, renamed execution_id to owner_id i…
johnmarston-nhs Oct 4, 2024
3b5c725
EM-933 reverted in advertent change
johnmarston-nhs Oct 4, 2024
2188abc
EM-933. Fix typo in log call.
johnmarston-nhs Oct 9, 2024
c18ee39
EM-933. Fix test log exytractor to use new wording.
johnmarston-nhs Oct 11, 2024
1fe1e20
EM-933. Handle non-step function invocations for the poll application…
johnmarston-nhs Oct 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions module/dynamodb_table_locktable.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
locals {
locktable_name = "${local.name}-lock-table"
}

#tfsec:ignore:aws-dynamodb-enable-at-rest-encryption
resource "aws_dynamodb_table" "lock_table" {
name = local.locktable_name
billing_mode = "PROVISIONED"
read_capacity = 20
write_capacity = 20
hash_key = "LockName"
stream_enabled = false
point_in_time_recovery {
enabled = true
}

server_side_encryption {
enabled = true
}

attribute {
name = "LockName"
type = "S"
}

attribute {
name = "LockType"
type = "S"
}

attribute {
name = "LockOwner"
type = "S"
}

global_secondary_index {
name = "LockTypeOwnerTableIndex"
hash_key = "LockType"
range_key = "LockOwner"
write_capacity = 5
read_capacity = 5
projection_type = "KEYS_ONLY"
}
}
2 changes: 2 additions & 0 deletions module/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ locals {
MESH_URL = local.mesh_url[var.mesh_env]
MESH_BUCKET = aws_s3_bucket.mesh.bucket

DDB_LOCK_TABLE_NAME = aws_dynamodb_table.lock_table.name

CHUNK_SIZE = var.chunk_size
CRUMB_SIZE = var.crumb_size == null ? var.chunk_size : var.crumb_size
NEVER_COMPRESS = var.never_compress
Expand Down
5 changes: 4 additions & 1 deletion module/stepfunctions_get_messages.tf
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ resource "aws_sfn_state_machine" "get_messages" {
OutputPath = "$.Payload"
Parameters = {
FunctionName = "${aws_lambda_function.poll_mailbox.arn}:${aws_lambda_function.poll_mailbox.version}"
"Payload.$" = "$"
Payload = {
"EventDetail.$" = "$"
"ExecutionId.$" = "$$.Execution.Id"
}
}
Resource = "arn:aws:states:::lambda:invoke"
Retry = [
Expand Down
5 changes: 4 additions & 1 deletion module/stepfunctions_send_message.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ resource "aws_sfn_state_machine" "send_message" {
OutputPath = "$.Payload"
Parameters = {
FunctionName = "${aws_lambda_function.check_send_parameters.arn}:${aws_lambda_function.check_send_parameters.version}"
"Payload.$" = "$"
Payload = {
"EventDetail.$" = "$"
"ExecutionId.$" = "$$.Execution.Id"
}
}
Resource = "arn:aws:states:::lambda:invoke"
Retry = [
Expand Down
1,743 changes: 925 additions & 818 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ mypy = "^1.4.0"
coverage = "^7.2.7"
pytest = "^8.1.1"
pytest-asyncio = "^0.23.6"
moto = {extras = ["s3", "ssm", "stepfunctions", "secretsmanager"], version = "^5.0.5"}
moto = {extras = ["s3", "ssm", "stepfunctions", "secretsmanager", "dynamodb"], version = "^5.0.14"}
boto3-stubs = {extras = ["s3", "ssm", "secretsmanager", "dynamodb", "stepfunctions", "sqs", "lambda", "logs",], version = "^1.34.32"}
ruff = "^0"
petname = "^2.6"
black = "^24"
tox = "^4.18"
freezegun = "^1.5.1"

[tool.poetry.group.local.dependencies]

Expand Down Expand Up @@ -169,6 +170,7 @@ legacy_tox_ini = """
moto
boto3
petname
freezegun
commands =
pytest
"""
Expand Down
36 changes: 34 additions & 2 deletions src/cloudlogbase.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ Log Text = Will send file='{file}' filesize="{file_size}" from bucket='{bucket}'
Log Level = INFO
Log Text = For send for mapping destMailbox='{dest_mailbox}' from srcMailbox='{src_mailbox}' with workflowId='{workflow_id}'

[MESHSEND0004b]
Log Level = INFO
Log Text = Debug info: debug_str='{debug_str}'

[MESHSEND0005]
Log Level = INFO
Log Text = Send chunk started for file='{file}' from bucket='{bucket}' chunk='{chunk_num}' of max_chunk='{max_chunk}'
Expand All @@ -122,6 +126,22 @@ Log Text = Sent chunk='{chunk_num}' of max_chunk='{max_chunk}' for file='{file}'
Log Level = INFO
Log Text = chunk='{chunk_num}' is the final chunk out of max_chunk='{max_chunk}' for file='{file}' from bucket='{bucket}' and has been sent

[MESHSEND0009]
Log Level = INFO
Log Text = Acquiring lock_name='{lock_name}' with owner_id='{owner_id}'

[MESHSEND0010]
Log Level = INFO
Log Text = Released lock_name='{lock_name}' with owner_id='{owner_id}'

[MESHSEND0011]
Log Level = ERROR
Log Text = Failed to release lock_name='{lock_name}' as owner_id='{owner_id}' because it was owned by lock_owner='{lock_owner}'

[MESHSEND0012]
Log Level = INFO
Log Text = Skipping lock release due to missing lock_name='{lock_name}' or owner_id='{owner_id}'

[MESHPOLL0001]
Log Level = INFO
Log Text = mailbox='{mailbox}' has polled message_count='{message_count}' many messages
Expand All @@ -132,7 +152,11 @@ Log Text = msg='{error}' raised when polling mailbox='{mailbox}'

[MESHPOLL0002a]
Log Level = WARN
Log Text = Performing sending singleton check for mailbox='{mailbox) sf_mailbox='{sf_mailbox}' missing a mailbox
Log Text = Performing sending singleton check for mailbox='{mailbox)' sf_mailbox='{sf_mailbox}' missing a mailbox

[MESHPOLL0003]
Log Level = INFO
Log Text = Acquiring lock_name='{lock_name}' for owner_id='{owner_id}'

[MESHFETCH0001]
Log Level = INFO
Expand Down Expand Up @@ -220,4 +244,12 @@ Log Text = Message with message_id='{message_id}' acknowledged and successfully

[MESHFETCH0013]
Log Level = INFO
Log Text = File with multiple chunks received message_id='{message_id}'
Log Text = File with multiple chunks received message_id='{message_id}'

[MESHFETCH0014]
Log Level = INFO
Log Text = Released lock_name='{lock_name}' with owner_id='{owner_id}'

[MESHSEND0015]
Log Level = INFO
Log Text = Skipping lock release due to missing lock_name='{lock_name}' or owner_id='{owner_id}'
43 changes: 34 additions & 9 deletions src/mesh_check_send_parameters_application.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dataclasses import asdict
from functools import partial
from http import HTTPStatus
from typing import Any

from shared.application import MESHLambdaApplication
from shared.common import SingletonCheckFailure, return_failure, singleton_check
from shared.common import (
SingletonCheckFailure,
acquire_lock,
return_failure,
)
from shared.send_parameters import get_send_parameters
from spine_aws_common.utilities import human_readable_bytes

Expand All @@ -22,11 +25,23 @@ def _get_internal_id(self):
return self._create_new_internal_id()

def start(self):

self.log_object.write_log(
"MESHSEND0004b",
None,
{"debug_str": str(self.event.raw_event)},
)

event_details = self.event["EventDetail"]["detail"]
owner_id = (
self.event.get("ExecutionId") or f"intenalID_{self._get_internal_id()}"
)

# in case of crash, set to internal server error so next stage fails
self.response = {"statusCode": int(HTTPStatus.INTERNAL_SERVER_ERROR)}

bucket = self.event["detail"]["requestParameters"]["bucketName"]
key = self.event["detail"]["requestParameters"]["key"]
bucket = event_details["requestParameters"]["bucketName"]
key = event_details["requestParameters"]["key"]

self.log_object.write_log("MESHSEND0001", None, {"bucket": bucket, "file": key})

Expand Down Expand Up @@ -54,11 +69,19 @@ def start(self):
},
)
try:
check = partial(self.is_send_for_same_file, send_params=send_params)
singleton_check(
self.config.send_message_step_function_arn,
check,
self.sfn,

lock_name = f"SendLock_{send_params.s3_bucket}_{send_params.s3_key}"

self.log_object.write_log(
"MESHSEND0009",
None,
{"lock_name": lock_name, "owner_id": owner_id},
)

acquire_lock(
self.ddb_client,
lock_name,
owner_id,
)

except SingletonCheckFailure as e:
Expand Down Expand Up @@ -104,6 +127,8 @@ def start(self):
"message_id": None,
"current_byte_position": 0,
"send_params": asdict(send_params),
"lock_name": lock_name,
"owner_id": owner_id,
},
}

Expand Down
22 changes: 18 additions & 4 deletions src/mesh_fetch_message_chunk_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from requests import Response
from requests.structures import CaseInsensitiveDict
from shared.application import INBOUND_BUCKET, INBOUND_FOLDER, MESHLambdaApplication
from shared.common import nullsafe_quote
from shared.common import nullsafe_quote, release_lock
from shared.config import MiB

_METADATA_HEADERS = {
Expand Down Expand Up @@ -87,6 +87,8 @@ def initialise(self):
self.log_object.internal_id = self.internal_id
self.s3_bucket = self.input.get("s3_bucket", "") # will be empty on first chunk
self.s3_key = self.input.get("s3_key", "") # will be empty on first chunk
self.lock_name = self.response.get("lock_name")
self.owner_id = self.response.get("owner_id")

@property
def http_response(self) -> Response:
Expand Down Expand Up @@ -117,9 +119,21 @@ def start(self):
self._ensure_s3_bucket_and_key(is_report)
if is_report or self.number_of_chunks < 2:
self._handle_un_chunked_message(is_report)
return

self._handle_multiple_chunk_message()
else:
self._handle_multiple_chunk_message()

if self.owner_id and self.lock_name:
release_lock(
self.ddb_client,
self.lock_name,
self.owner_id,
)
else:
self.log_object.write_log(
"MESHSEND0015",
None,
{"lock_name": self.lock_name, "owner_id": self.owner_id},
)

def _retrieve_current_chunk(self):
self._http_response = self.get_chunk(
Expand Down
38 changes: 33 additions & 5 deletions src/mesh_poll_mailbox_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from aws_lambda_powertools.shared.functions import strtobool
from requests import HTTPError
from shared.application import MESHLambdaApplication
from shared.common import SingletonCheckFailure, return_failure, singleton_check
from shared.common import (
SingletonCheckFailure,
acquire_lock,
return_failure,
)


class HandshakeFailure(Exception):
Expand All @@ -28,13 +32,25 @@ def __init__(self, additional_log_config=None, load_ssm_params=False):

self.handshake: bool = False
self.response: dict[str, Any] = {}
self.execution_id = None

def initialise(self):
# initialise
self.mailbox_id = self.event["mailbox"]
self.handshake = bool(strtobool(self.event.get("handshake", "false")))
self.response = {}

print("POLL_EVENT", self.event.raw_event)
print("EXECUTION_ID", self.execution_id)

def process_event(self, event):
event_detail = event.get("EventDetail", {})
if event_detail:
# Enhanced event detail format from the step function
self.execution_id = event.get("ExecutionId")

return self.EVENT_TYPE(event_detail or event)

def start(self):
# in case of crash
self.response = {"statusCode": int(HTTPStatus.INTERNAL_SERVER_ERROR)}
Expand All @@ -48,10 +64,20 @@ def start(self):
return

try:
singleton_check(
self.config.get_messages_step_function_arn,
self.is_same_mailbox_check,
self.sfn,
lock_name = f"FetchLock_{self.mailbox_id}"

owner_id = self.execution_id or f"internalID_{self._get_internal_id()}"

self.log_object.write_log(
"MESHPOLL0003",
None,
{"lock_name": lock_name, "owner_id": owner_id},
)

acquire_lock(
self.ddb_client,
lock_name,
owner_id,
)

except SingletonCheckFailure as e:
Expand Down Expand Up @@ -104,6 +130,8 @@ def start(self):
"internal_id": self.log_object.internal_id,
"message_count": message_count,
"message_list": output_list,
"lock_name": lock_name,
"execution_id": self.execution_id,
},
# Parameters for a follow-up iteration through the messages in this execution
"mailbox": self.mailbox_id,
Expand Down
Loading
Loading