Skip to content

Commit

Permalink
Cndit 1027 (#130)
Browse files Browse the repository at this point in the history
* added patient

* update person

* updated

* added part of migration for patient

* move thing around

* rename

* renaming

* added mass repository for person

* dumping patient code, but there are lot of thing need to test

* move patient stuff around

* clean up patient

* more patient code

* more minor changes

* more core code, and tested the flow

* simplify the person insertion process

* cleanup

* added transactional for odse

* update di to direct the flow to data processing

* cleanup

* update flow to sending xml payload to dp

* ignore new code in di

* cleanup for soarq

* move thing around for easier unit testing
  • Loading branch information
ndduc01 authored Feb 17, 2024
1 parent 3e4cc66 commit 6e51805
Show file tree
Hide file tree
Showing 92 changed files with 3,733 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public ProducerFactory<String, String> producerFactory() {
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ private KafkaHeaderValue() {
public static final String MESSAGE_VERSION = "MESSAGE_VERSION";

public static final String MESSAGE_OPERATION = "MESSAGE_OPERATION";
public static final String DATA_PROCESSING_ENABLE = "DATA_PROCESSING_ENABLE";
public static final String DATA_TYPE = "DATA_TYPE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import jakarta.xml.bind.JAXBException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,14 +186,15 @@ public KafkaConsumerService(
)
public void handleMessageForRawElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.MESSAGE_VALIDATION_ACTIVE) String messageValidationActive) throws DuplicateHL7FileFoundException, DiHL7Exception {
@Header(KafkaHeaderValue.MESSAGE_VALIDATION_ACTIVE) String messageValidationActive,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception {
log.debug(topicDebugLog, message, topic);
boolean hl7ValidationActivated = false;

if (messageValidationActive != null && messageValidationActive.equalsIgnoreCase("true")) {
hl7ValidationActivated = true;
}
validationHandler(message, hl7ValidationActivated);
validationHandler(message, hl7ValidationActivated, dataProcessingEnable);
}

/**
Expand Down Expand Up @@ -226,9 +228,10 @@ public void handleMessageForRawElr(String message,
)
@KafkaListener(topics = "${kafka.validation.topic}")
public void handleMessageForValidatedElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws ConversionPrepareException {
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) throws ConversionPrepareException {
log.debug(topicDebugLog, message, topic);
preparationForConversionHandler(message);
preparationForConversionHandler(message, dataProcessingEnable);
}

/**
Expand All @@ -237,9 +240,10 @@ public void handleMessageForValidatedElr(String message,
@KafkaListener(topics = "${kafka.xml-conversion-prep.topic}")
public void handleMessageForXmlConversionElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.MESSAGE_OPERATION) String operation) {
@Header(KafkaHeaderValue.MESSAGE_OPERATION) String operation,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) {
log.debug(topicDebugLog, message, topic);
xmlConversionHandler(message, operation);
xmlConversionHandler(message, operation, dataProcessingEnable);
}

/**
Expand Down Expand Up @@ -326,7 +330,7 @@ public void handleDltManual(
}

//region DLT HANDLER
@DltHandler
@DltHandler()
public void handleDlt(
String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Expand Down Expand Up @@ -419,90 +423,105 @@ private String getDltErrorSource(String incomingTopic) {
return erroredSource;
}

private void preparationForConversionHandler(String message) throws ConversionPrepareException {
private void preparationForConversionHandler(String message, String dataProcessingEnable) throws ConversionPrepareException {
Optional<ValidatedELRModel> validatedElrResponse = this.iValidatedELRRepository.findById(message);
if(validatedElrResponse.isPresent()) {
kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepXmlTopic, TopicPreparationType.XML, 0);
kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepFhirTopic, TopicPreparationType.FHIR, 0);
kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepXmlTopic, TopicPreparationType.XML, 0, dataProcessingEnable);
kafkaProducerService.sendMessagePreparationTopic(validatedElrResponse.get(), prepFhirTopic, TopicPreparationType.FHIR, 0, dataProcessingEnable);
} else {
throw new ConversionPrepareException("Validation ELR Record Not Found");
}
}
private void xmlConversionHandler(String message, String operation) {

// Update: changed method to async process, intensive process in this method cause consumer lagging, delay and strange behavior
// TODO: considering breaking down this logic (NOTE) //NOSONAR
// PROCESS as follow:
// - HL7 -> XML
// xml conversion can be broke down into multiple smaller pipeline
// - Saving record to status table can also be broke to downstream pipeline
CompletableFuture.runAsync(() -> {
log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml");

String hl7Msg = "";
try {
if (operation.equalsIgnoreCase(EnumKafkaOperation.INJECTION.name())) {
Optional<ValidatedELRModel> validatedElrResponse = this.iValidatedELRRepository.findById(message);
hl7Msg = validatedElrResponse.map(ValidatedELRModel::getRawMessage).orElse("");
/**
* make this public so we can add unit test for now.
* we need to implementation interface pattern for NBS convert and transformation classes. it better for unit testing
* */
public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) {
String hl7Msg = "";
try {
if (operation.equalsIgnoreCase(EnumKafkaOperation.INJECTION.name())) {
Optional<ValidatedELRModel> validatedElrResponse = this.iValidatedELRRepository.findById(message);
hl7Msg = validatedElrResponse.map(ValidatedELRModel::getRawMessage).orElse("");
} else {
Optional<ElrDeadLetterModel> response = this.elrDeadLetterRepository.findById(message);
if (response.isPresent()) {
var validMessage = iHl7v2Validator.messageStringValidation(response.get().getMessage());
validMessage = iHl7v2Validator.processFhsMessage(validMessage);
hl7Msg = validMessage;
} else {
Optional<ElrDeadLetterModel> response = this.elrDeadLetterRepository.findById(message);
if (response.isPresent()) {
var validMessage = iHl7v2Validator.messageStringValidation(response.get().getMessage());
validMessage = iHl7v2Validator.processFhsMessage(validMessage);
hl7Msg = validMessage;
} else {
throw new XmlConversionException(errorDltMessage);
}
throw new XmlConversionException(errorDltMessage);
}
HL7ParsedMessage<OruR1> parsedMessage = Hl7ToRhapsodysXmlConverter.getInstance().parsedStringToHL7(hl7Msg);
String rhapsodyXml = Hl7ToRhapsodysXmlConverter.getInstance().convert(message, parsedMessage);
}
HL7ParsedMessage<OruR1> parsedMessage = Hl7ToRhapsodysXmlConverter.getInstance().parsedStringToHL7(hl7Msg);
String rhapsodyXml = Hl7ToRhapsodysXmlConverter.getInstance().convert(message, parsedMessage);

// Modified from debug ==> info to capture xml for analysis.
// Please leave below at "info" level for the time being, before going live,
// this will be changed to debug
log.info("rhapsodyXml: {}", rhapsodyXml);
// Modified from debug ==> info to capture xml for analysis.
// Please leave below at "info" level for the time being, before going live,
// this will be changed to debug
log.info("rhapsodyXml: {}", rhapsodyXml);

NbsInterfaceModel nbsInterfaceModel = nbsRepositoryServiceProvider.saveXmlMessage(message, rhapsodyXml, parsedMessage);
boolean dataProcessingApplied = Boolean.parseBoolean(dataProcessingEnable);
NbsInterfaceModel nbsInterfaceModel = nbsRepositoryServiceProvider.saveXmlMessage(message, rhapsodyXml, parsedMessage, dataProcessingApplied);

customMetricsBuilder.incrementXmlConversionRequested();
// Once the XML is saved to the NBS_Interface table, we get the ID to save it
// in the Data Ingestion elr_record_status_id table, so that we can get the status
// of the record straight-forward from the NBS_Interface table.
customMetricsBuilder.incrementXmlConversionRequested();
// Once the XML is saved to the NBS_Interface table, we get the ID to save it
// in the Data Ingestion elr_record_status_id table, so that we can get the status
// of the record straight-forward from the NBS_Interface table.

if(nbsInterfaceModel == null) {
customMetricsBuilder.incrementXmlConversionRequestedFailure();
}
else {
customMetricsBuilder.incrementXmlConversionRequestedSuccess();
ReportStatusIdData reportStatusIdData = new ReportStatusIdData();
Optional<ValidatedELRModel> validatedELRModel = iValidatedELRRepository.findById(message);
reportStatusIdData.setRawMessageId(validatedELRModel.get().getRawId());
reportStatusIdData.setNbsInterfaceUid(nbsInterfaceModel.getNbsInterfaceUid());
reportStatusIdData.setCreatedBy(convertedToXmlTopic);
reportStatusIdData.setUpdatedBy(convertedToXmlTopic);

var timestamp = getCurrentTimeStamp();
reportStatusIdData.setCreatedOn(timestamp);
reportStatusIdData.setUpdatedOn(timestamp);
iReportStatusRepository.save(reportStatusIdData);
}
if(nbsInterfaceModel == null) {
customMetricsBuilder.incrementXmlConversionRequestedFailure();
}
else {
customMetricsBuilder.incrementXmlConversionRequestedSuccess();
ReportStatusIdData reportStatusIdData = new ReportStatusIdData();
Optional<ValidatedELRModel> validatedELRModel = iValidatedELRRepository.findById(message);
reportStatusIdData.setRawMessageId(validatedELRModel.get().getRawId());
reportStatusIdData.setNbsInterfaceUid(nbsInterfaceModel.getNbsInterfaceUid());
reportStatusIdData.setCreatedBy(convertedToXmlTopic);
reportStatusIdData.setUpdatedBy(convertedToXmlTopic);

var timestamp = getCurrentTimeStamp();
reportStatusIdData.setCreatedOn(timestamp);
reportStatusIdData.setUpdatedOn(timestamp);
iReportStatusRepository.save(reportStatusIdData);
}

if (dataProcessingApplied) {
Gson gson = new Gson();
String strGson = gson.toJson(nbsInterfaceModel);

kafkaProducerService.sendMessageAfterConvertedToXml(rhapsodyXml, convertedToXmlTopic, 0);

} catch (Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String stackTrace = sw.toString();
// Handle any exceptions here
kafkaProducerService.sendMessageDlt(
message, "xml_prep_dlt_manual", 0 ,
stackTrace,prepXmlTopic
);
kafkaProducerService.sendMessageAfterConvertedToXml(strGson, "elr_processing_micro", 0); //NOSONAR
} else {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
}

} catch (Exception e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String stackTrace = sw.toString();
// Handle any exceptions here
kafkaProducerService.sendMessageDlt(
message, "xml_prep_dlt_manual", 0 ,
stackTrace,prepXmlTopic
);
}
}
private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) {

// Update: changed method to async process, intensive process in this method cause consumer lagging, delay and strange behavior
// TODO: considering breaking down this logic (NOTE) //NOSONAR
// PROCESS as follow:
// - HL7 -> XML
// xml conversion can be broke down into multiple smaller pipeline
// - Saving record to status table can also be broke to downstream pipeline
CompletableFuture.runAsync(() -> {
log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml");
xmlConversionHandlerProcessing(message, operation, dataProcessingEnable);
});
}
private void validationHandler(String message, boolean hl7ValidationActivated) throws DuplicateHL7FileFoundException, DiHL7Exception {
private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception {
Optional<RawERLModel> rawElrResponse = this.iRawELRRepository.findById(message);
RawERLModel elrModel;
if (!rawElrResponse.isEmpty()) {
Expand All @@ -525,7 +544,7 @@ private void validationHandler(String message, boolean hl7ValidationActivated) t
// Duplication check
iHL7DuplicateValidator.validateHL7Document(hl7ValidatedModel);
saveValidatedELRMessage(hl7ValidatedModel);
kafkaProducerService.sendMessageAfterValidatingMessage(hl7ValidatedModel, validatedTopic, 0);
kafkaProducerService.sendMessageAfterValidatingMessage(hl7ValidatedModel, validatedTopic, 0, dataProcessingEnable);
break;
case KafkaHeaderValue.MESSAGE_TYPE_CSV:
// TODO: implement csv validation, this is not in the scope of data ingestion at the moment
Expand Down
Loading

0 comments on commit 6e51805

Please sign in to comment.