Skip to content

Commit

Permalink
Merge pull request #7 from NHSDigital/AMB-0001-creating-batch-skeleton
Browse files Browse the repository at this point in the history
Creating batch skeleton
  • Loading branch information
CLJ2006 authored Jul 25, 2024
2 parents 702d2b0 + 3bd0fc5 commit 15e87a3
Show file tree
Hide file tree
Showing 15 changed files with 578 additions and 6 deletions.
11 changes: 11 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
"./batch",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}
11 changes: 11 additions & 0 deletions batch/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
".",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}
18 changes: 18 additions & 0 deletions batch/.vscode/settings.json.default
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"python.analysis.extraPaths": [
"./src"
],
"python.testing.unittestArgs": [
"-v",
"-s",
"./",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true,
"pylint.args": [
"--init-hook",
"import sys; sys.path.append('./src')"
]
}
11 changes: 11 additions & 0 deletions batch/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
build:
docker build -t imms-lambda-build -f lambda.Dockerfile .

package:build
mkdir -p build
docker run --rm -v $(shell pwd)/build:/build imms-lambda-build

test:
python -m unittest

.PHONY: build package test
1 change: 0 additions & 1 deletion batch/batch.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ RUN pip install "poetry~=1.5.0"
COPY poetry.lock pyproject.toml README.md ./
RUN poetry config virtualenvs.create false && poetry install --no-interaction --no-ansi --no-root --only main


# -----------------------------
FROM base as test

Expand Down
13 changes: 13 additions & 0 deletions batch/lamda.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#AWS Lambda Python image. should it be 3.10 or 3.8?
FROM public.ecr.aws/lambda/python:3.10 as base

RUN pip install "poetry~=1.5.0"

COPY poetry.lock pyproject.toml README.md ./
RUN poetry config virtualenvs.create false && poetry install --no-interaction --no-ansi --no-root --only main

#copy lambda code
COPY router_lambda_function.py ${LAMBDA_TASK_ROOT}

#Run lambda function
CMD [ "router_lambda_function.lambda_handler" ]
8 changes: 4 additions & 4 deletions batch/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion batch/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ structlog = "^24.1.0"

[build-system]
requires = ["poetry-core ~= 1.5.0"]
build-backend = "poetry.core.masonry.api"

build-backend = "poetry.core.masonry.api"
8 changes: 8 additions & 0 deletions batch/src/ods_patterns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ODS_PATTERNS = {'YGM41': 'EMIS',
'8J1100001': 'Pinnacle',
'8HK48': 'Sonar',
'YGA': 'TPP',
'0DE': 'AGEM-NIVS',
'0DF': 'NIMS',
'8HA94': 'EVA',
'X26': 'RAVS'}
150 changes: 150 additions & 0 deletions batch/src/router_lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import json
import boto3
import re
import csv
import os
import logging
from io import BytesIO, StringIO
from ods_patterns import ODS_PATTERNS
# Incoming file format DISEASETYPE_Vaccinations_version_ODSCODE_DATETIME.csv
# for example: Flu_Vaccinations_v5_YYY78_20240708T12130100.csv - ODS code has multiple lengths
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3', region_name='eu-west-2')
sqs_client = boto3.client('sqs', region_name='eu-west-2')


def get_environment():
return os.getenv("ENVIRONMENT")


def extract_ods_code(file_key):
supplier_match = re.search(r'_Vaccinations_v\d+_(\w+)_\d+T\d+\.csv$', file_key)
return supplier_match.group(1) if supplier_match else None


def identify_supplier(ods_code):
return ODS_PATTERNS.get(ods_code, None)


def identify_disease_type(file_key):
disease_match = re.search(r'^(\w+)_Vaccinations_', file_key)
return disease_match.group(1) if disease_match else None


def identify_timestamp(file_key):
timestamp_match = re.search(r'_(\d+T\d+)\.csv$', file_key)
return timestamp_match.group(1) if timestamp_match else None


def initial_file_validation(file_key, bucket_name):
# TO DO- Placeholder for initial file validation logic, currently populated with example
if "invalid" in file_key:
return False, ["Invalid content detected"]
elif "missing" in file_key:
return False, ["Missing required fields"]
else:
return True, []
# Temporary placeholder for validation success


def send_to_supplier_queue(supplier, message_body):
# TO DO - will not send as no queue exists, only logs the error for now
account_id = os.getenv("ACCOUNT_ID")

queue_url = f"https://sqs.eu-west-2.amazonaws.com/{account_id}/{supplier}_metadata_queue"
print(queue_url)

try:
sqs_client.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message_body))
logger.info(f"Message sent to SQS queue for supplier {supplier}")
except sqs_client.exceptions.QueueDoesNotExist:
logger.error(f"queue {queue_url} does not exist")
return False
return True


def create_ack_file(bucket_name, file_key, ack_bucket_name, validation_passed, validation_errors):
# TO DO - Populate acknowledgement file with correct values once known
headers = ['MESSAGE_HEADER_ID', 'HEADER_RESPONSE_CODE', 'ISSUE_SEVERITY', 'ISSUE_CODE', 'RESPONSE_TYPE',
'RESPONSE_CODE', 'RESPONSE_DISPLAY', 'RECEIVED_TIME', 'MAILBOX_FROM', 'LOCAL_ID']
# Placeholder for data rows for success
if validation_passed:
data_rows = [['Value1', 'Value2', 'Value3', 'Value4', 'Value5',
'Value6', 'Value7', 'Value8', 'Value9', 'Value10']]
parts = file_key.split('.')
ack_filename = (f"{parts[0]}_response.csv")
# Placeholder for data rows for errors
else:
data_rows = [
['Value1', 'Error2', 'Value3', 'Error4', 'Value5',
'Value6', 'Value7', 'Value8', 'Value9', 'Value10']]
# construct acknowledgement file
parts = file_key.split('.')
ack_filename = (f"{parts[0]}_response.csv")
print(f"{data_rows}")
# Create CSV file with | delimiter, filetype .csv
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter='|')
csv_writer.writerow(headers)
csv_writer.writerows(data_rows)
# Upload the CSV.zip file to S3
csv_buffer.seek(0)
csv_bytes = BytesIO(csv_buffer.getvalue().encode('utf-8'))
s3_client.upload_fileobj(csv_bytes, ack_bucket_name, ack_filename)
print(f"{ack_bucket_name}")
print(f"{data_rows}")
logger.info(f"Uploaded acknowledgement file to {ack_bucket_name}")


def lambda_handler(event, context):
error_files = []

for record in event['Records']:
try:
bucket_name = record['s3']['bucket']['name']
file_key = record['s3']['object']['key']
ods_code = extract_ods_code(file_key)
disease_type = identify_disease_type(file_key)
timestamp = identify_timestamp(file_key)
supplier = identify_supplier(ods_code)
print(f"{supplier}")
if not supplier and ods_code:
logging.error(f"Supplier not found for ods code {ods_code}")
# TO DO- Perform initial file validation
validation_passed, validation_errors = initial_file_validation(file_key, bucket_name)
# Determine ack_bucket_name based on environment
ack_bucket_name = os.getenv("ACK_BUCKET_NAME")
# Create acknowledgment file
create_ack_file(bucket_name, file_key, ack_bucket_name, validation_passed, validation_errors)
# if validation passed, send message to SQS queue
if validation_passed:
message_body = {
'disease_type': disease_type,
'supplier': supplier,
'timestamp': timestamp
}
try:
send_to_supplier_queue(supplier, message_body)
except Exception:
logger.error(f"failed to send message to {supplier}_queue")

logger.info(f"File Metadata processed successfully for - {file_key}")

# Error handling for file processing
except ValueError as ve:
logging.error(f"Error in initial_file_validation'{file_key}': {str(ve)}")
create_ack_file(bucket_name, file_key, ack_bucket_name, False, [str(ve)])
except Exception as e:
logging.error(f"Error processing file'{file_key}': {str(e)}")
create_ack_file(bucket_name, file_key, ack_bucket_name, False, [str(e)])
error_files.append(file_key)

if error_files:
logger.error(f"Processing errors occurred for the following files: {', '.join(error_files)}")

logger.info("Completed processing all file metadata in current batch")
return {
'statusCode': 200,
'body': json.dumps('File processing for S3 bucket completed')
}
4 changes: 4 additions & 0 deletions batch/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import os
import sys

sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../src")
Loading

0 comments on commit 15e87a3

Please sign in to comment.