From d2e6782de96380f769f345b48d2c8101419c2b54 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Tue, 4 Feb 2025 15:26:22 -0700 Subject: [PATCH 1/6] setting up new consumer config for kafka --- .../routes/ElrProcessStatusComponent.java | 2 +- .../config/DataSourceConfig.java | 42 +++++++-- .../config/KafkaConsumerConfig.java | 93 ++++++++++++++++++- .../config/NbsDataSourceConfig.java | 39 ++++++-- .../service/KafkaConsumerService.java | 21 +++-- .../security/controller/TokenController.java | 2 +- .../src/main/resources/application.yaml | 18 +++- 7 files changed, 190 insertions(+), 27 deletions(-) diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/ElrProcessStatusComponent.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/ElrProcessStatusComponent.java index 889620e7c..0aaad3fb0 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/ElrProcessStatusComponent.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/camel/routes/ElrProcessStatusComponent.java @@ -80,7 +80,7 @@ public String process(String body){ status = body; } } - logger.info("ElrProcessStatusComponent status:{}", status); + logger.debug("ElrProcessStatusComponent status:{}", status); return status; } } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/DataSourceConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/DataSourceConfig.java index 087ef60fd..764e7812f 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/DataSourceConfig.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/DataSourceConfig.java @@ -1,5 +1,7 @@ package gov.cdc.dataingestion.config; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import jakarta.persistence.EntityManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,25 @@ public class DataSourceConfig { @Value("${spring.datasource.password}") private String password; + + @Value("${spring.datasource.hikari.maximum-pool-size:100}") + private int maximumPoolSize; + + @Value("${spring.datasource.hikari.minimum-idle:50}") + private int minimumIdle; + + @Value("${spring.datasource.hikari.idle-timeout:120000}") + private long idleTimeout; + + @Value("${spring.datasource.hikari.max-lifetime:1200000}") + private long maxLifetime; + + @Value("${spring.datasource.hikari.connection-timeout:300000}") + private long connectionTimeout; + + @Value("${spring.datasource.hikari.pool-name:OdseHikariCP}") + private String poolName; + @Bean() public DataSource dataSource() { String driverClassName = this.className; @@ -61,14 +82,23 @@ public DataSource dataSource() { String dbUserName = this.userName; String dbUserPassword = this.password; - DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create(); - dataSourceBuilder.driverClassName(driverClassName); - dataSourceBuilder.url(dbUrl); - dataSourceBuilder.username(dbUserName); - dataSourceBuilder.password(dbUserPassword); + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(driverClassName); + hikariConfig.setJdbcUrl(dbUrl); + hikariConfig.setUsername(dbUserName); + hikariConfig.setPassword(dbUserPassword); + + // HikariCP-specific settings + hikariConfig.setMaximumPoolSize(maximumPoolSize); + hikariConfig.setMinimumIdle(minimumIdle); + hikariConfig.setIdleTimeout(idleTimeout); + hikariConfig.setMaxLifetime(maxLifetime); + hikariConfig.setConnectionTimeout(connectionTimeout); + hikariConfig.setPoolName(poolName); + + return new HikariDataSource(hikariConfig); - return dataSourceBuilder.build(); } @Bean diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/KafkaConsumerConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/KafkaConsumerConfig.java index 351357281..ba912c9d5 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/KafkaConsumerConfig.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/KafkaConsumerConfig.java @@ -2,6 +2,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -25,8 +26,6 @@ * */ @SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) public class KafkaConsumerConfig { - @Value("${spring.kafka.group-id}") - private String groupId = ""; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers = ""; @@ -36,11 +35,37 @@ public class KafkaConsumerConfig { @Value("${spring.kafka.consumer.maxPollIntervalMs}") private String maxPollInterval = ""; + + @Value("${spring.kafka.thread}") + private Integer thread; + + @Value("${spring.kafka.group-id-default}") + private String groupIdDefault; + + @Value("${spring.kafka.group-id-raw}") + private String groupIdRaw; + + @Value("${spring.kafka.group-id-raw-xml}") + private String groupIdRawXml; + + @Value("${spring.kafka.group-id-validate}") + private String groupIdValidate; + + @Value("${spring.kafka.group-id-xml}") + private String groupIdXml; + + @Value("${spring.kafka.group-id-ecr-cda}") + private String groupIdEcrCda; + + @Value("${spring.kafka.group-id-dlt-manual}") + private String groupIdDltManual; + + @Bean public ConsumerFactory consumerFactory() { final Map config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdDefault); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval); @@ -56,4 +81,66 @@ public ConsumerFactory consumerFactory() { factory.setConsumerFactory(consumerFactory()); return factory; } + + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryRaw() { + return createContainerFactory(groupIdRaw); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryRawXml() { + return createContainerFactory(groupIdRawXml); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryValidate() { + return createContainerFactory(groupIdValidate); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryXml() { + return createContainerFactory(groupIdXml); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryEcrCda() { + return createContainerFactory(groupIdEcrCda); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactoryDltManual() { + return createContainerFactory(groupIdDltManual); + } + + + + private Map baseConsumerConfigs(String groupId) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); + config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // Fetch at least 1MB of data + config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for data + config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024); // Fetch up to 10MB per partition + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Max 500 records per poll + config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // Allow 5 minutes for processing + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Heartbeat every 10 seconds + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // Manual commit + return config; + } + + private ConsumerFactory createConsumerFactory(String groupId) { + return new DefaultKafkaConsumerFactory<>(baseConsumerConfigs(groupId)); + } + + private ConcurrentKafkaListenerContainerFactory createContainerFactory(String groupId) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(createConsumerFactory(groupId)); + factory.setConcurrency(thread); + return factory; + } } \ No newline at end of file diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/NbsDataSourceConfig.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/NbsDataSourceConfig.java index fd53dc8ec..18800cc33 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/NbsDataSourceConfig.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/config/NbsDataSourceConfig.java @@ -1,5 +1,7 @@ package gov.cdc.dataingestion.config; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import jakarta.persistence.EntityManagerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -47,16 +49,41 @@ public class NbsDataSourceConfig { @Value("${spring.datasource.password}") private String dbUserPassword; + @Value("${spring.datasource.hikari.maximum-pool-size:100}") + private int maximumPoolSize; + + @Value("${spring.datasource.hikari.minimum-idle:50}") + private int minimumIdle; + + @Value("${spring.datasource.hikari.idle-timeout:120000}") + private long idleTimeout; + + @Value("${spring.datasource.hikari.max-lifetime:1200000}") + private long maxLifetime; + + @Value("${spring.datasource.hikari.connection-timeout:300000}") + private long connectionTimeout; + + @Value("${spring.datasource.hikari.pool-name:OdseHikariCP}") + private String poolName; + @Bean(name = "nbsDataSource") public DataSource nbsDataSource() { - DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create(); + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(driverClassName); + hikariConfig.setJdbcUrl(dbUrl); + hikariConfig.setUsername(dbUserName); + hikariConfig.setPassword(dbUserPassword); - dataSourceBuilder.driverClassName(driverClassName); - dataSourceBuilder.url(dbUrl); - dataSourceBuilder.username(dbUserName); - dataSourceBuilder.password(dbUserPassword); + // HikariCP-specific settings + hikariConfig.setMaximumPoolSize(maximumPoolSize); + hikariConfig.setMinimumIdle(minimumIdle); + hikariConfig.setIdleTimeout(idleTimeout); + hikariConfig.setMaxLifetime(maxLifetime); + hikariConfig.setConnectionTimeout(connectionTimeout); + hikariConfig.setPoolName(poolName); - return dataSourceBuilder.build(); + return new HikariDataSource(hikariConfig); } @Bean(name = "nbsEntityManagerFactoryBuilder") diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index 6831f044f..6713f3aed 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -181,7 +181,8 @@ public KafkaConsumerService( ) @KafkaListener( - topics = "${kafka.raw.topic}" + topics = "${kafka.raw.topic}", + containerFactory = "kafkaListenerContainerFactoryRaw" ) public void handleMessageForRawElr(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @@ -231,7 +232,8 @@ public void handleMessageForRawElr(String message, ) @KafkaListener( - topics = "${kafka.raw.xml-topic}" + topics = "${kafka.raw.xml-topic}", + containerFactory = "kafkaListenerContainerFactoryRawXml" ) public void handleMessageForElrXml(String message, @Header(KafkaHeaders.RECEIVED_KEY) String messageId, @@ -300,7 +302,9 @@ public void handleMessageForElrXml(String message, JAXBException.class } ) - @KafkaListener(topics = "${kafka.validation.topic}") + @KafkaListener(topics = "${kafka.validation.topic}", + containerFactory = "kafkaListenerContainerFactoryValidate" + ) public void handleMessageForValidatedElr(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) { @@ -318,7 +322,8 @@ public void handleMessageForValidatedElr(String message, /** * XML Conversion * */ - @KafkaListener(topics = "${kafka.xml-conversion-prep.topic}") + @KafkaListener(topics = "${kafka.xml-conversion-prep.topic}", + containerFactory = "kafkaListenerContainerFactoryXml") public void handleMessageForXmlConversionElr(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaderValue.MESSAGE_OPERATION) String operation, @@ -353,7 +358,8 @@ public void handleMessageForXmlConversionElr(String message, ) @KafkaListener( - topics = "ecr_cda" + topics = "ecr_cda", + containerFactory = "kafkaListenerContainerFactoryEcrCda" ) public void handleMessageForPhdcEcrTransformToCda(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws EcrCdaXmlException { @@ -365,7 +371,8 @@ public void handleMessageForPhdcEcrTransformToCda(String message, } @KafkaListener( - topics = "xml_prep_dlt_manual" + topics = "xml_prep_dlt_manual", + containerFactory = "kafkaListenerContainerFactoryDltManual" ) public void handleDltManual( String message, @@ -509,7 +516,7 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str // Modified from debug ==> info to capture xml for analysis. // Please leave below at "info" level for the time being, before going live, // this will be changed to debug - log.info("rhapsodyXml: {}", rhapsodyXml); + log.debug("rhapsodyXml: {}", rhapsodyXml); boolean dataProcessingApplied = Boolean.parseBoolean(dataProcessingEnable); NbsInterfaceModel nbsInterfaceModel = nbsRepositoryServiceProvider.saveXmlMessage(message, rhapsodyXml, parsedMessage, dataProcessingApplied); diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/security/controller/TokenController.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/security/controller/TokenController.java index 92e4e3fcd..33a741a72 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/security/controller/TokenController.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/security/controller/TokenController.java @@ -46,7 +46,7 @@ public static RestTemplate restTemplate(RestTemplateBuilder builder) { ) @PostMapping("/api/auth/token") public ResponseEntity token(@RequestHeader("clientid") String clientId, @RequestHeader("clientsecret") String clientSecret) { - log.info("Token URL : " + authTokenUri); + log.debug("Token URL : " + authTokenUri); String accessToken = null; String postBody = "grant_type=client_credentials" + "&client_id=" + clientId diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index c5c82e2f9..2495bb21e 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -40,8 +40,13 @@ spring: password: ${NBS_DBPASSWORD} driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver hikari: - connection-timeout: 60000 - maximum-pool-size: 10 + maximum-pool-size: ${HIKARI_POOL_SIZE:40} + connection-timeout: ${HIKARI_CON_TIMEOUT:300000} + minimum-idle: ${HIKARI_MINIMUM_IDLE:20} + idle-timeout: ${HIKARI_IDLE_TIMEOUT:120000} + max-lifetime: ${HIKARI_MAX_LIFETIME:1200000} + pool-name: ${HIKARI_POOL_NAME:SQLServerHikariCP} + leak-detection-threshold: ${LEAK_DETECTION_THRESHOLD:300000} dataingest: url: jdbc:sqlserver://${NBS_DBSERVER};databaseName=NBS_DATAINGEST;encrypt=true;trustServerCertificate=true; msgoute: @@ -54,7 +59,14 @@ spring: ddl-auto: none kafka: bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092} - group-id: data-ingestion-group + thread: ${KAFKA_CONCURRENCY:10} + group-id-default: data-ingestion-group + group-id-raw: data-ingestion-group-raw + group-id-raw-xml: data-ingestion-group-raw-xml + group-id-validate: data-ingestion-group-validate + group-id-xml: data-ingestion-group-xml + group-id-ecr-cda: data-ingestion-group-ecr-cda + group-id-dlt-manual: data-ingestion-group-dlt-manual consumer: maxPollIntervalMs: 30000 From 94b8a591601d27e12f6694152ae15a531399972d Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Wed, 12 Feb 2025 13:43:59 -0700 Subject: [PATCH 2/6] update default value --- data-ingestion-service/src/main/resources/application.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-ingestion-service/src/main/resources/application.yaml b/data-ingestion-service/src/main/resources/application.yaml index 2495bb21e..0dc80aa1c 100644 --- a/data-ingestion-service/src/main/resources/application.yaml +++ b/data-ingestion-service/src/main/resources/application.yaml @@ -59,7 +59,7 @@ spring: ddl-auto: none kafka: bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092} - thread: ${KAFKA_CONCURRENCY:10} + thread: ${KAFKA_CONCURRENCY:1} group-id-default: data-ingestion-group group-id-raw: data-ingestion-group-raw group-id-raw-xml: data-ingestion-group-raw-xml From 55af263bc4ea6e1368b8a67843b761187df70189 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Wed, 12 Feb 2025 16:36:42 -0700 Subject: [PATCH 3/6] added logic to catch kafka rebalancing error --- .../service/KafkaConsumerService.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index 216705a58..c59620db1 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -223,7 +223,8 @@ public void handleMessageForRawElr(String message, public void handleMessageForElrXml(String message, @Header(KafkaHeaders.RECEIVED_KEY) String messageId, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, - @Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) { + @Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) + { timeMetricsBuilder.recordElrRawXmlEventTime(() -> { log.debug(topicDebugLog, messageId, topic); @@ -469,6 +470,15 @@ private void preparationForConversionHandler(String message, String dataProcessi public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) { String hl7Msg = ""; try { + Optional validatedELRModel = iValidatedELRRepository.findById(message); + if (validatedELRModel.isEmpty()) { + throw new XmlConversionException("Message Not Found in Validated"); + } + var statusCheck = iReportStatusRepository.findByRawMessageId(validatedELRModel.get().getRawId()); + if (statusCheck.isPresent() && statusCheck.get().getNbsInterfaceUid() != null) { + logger.info("Kafka Rebalancing Error Hit"); + return; + } if (operation.equalsIgnoreCase(EnumKafkaOperation.INJECTION.name())) { Optional validatedElrResponse = this.iValidatedELRRepository.findById(message); hl7Msg = validatedElrResponse.map(ValidatedELRModel::getRawMessage).orElse(""); @@ -504,7 +514,6 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str else { customMetricsBuilder.incrementXmlConversionRequestedSuccess(); ReportStatusIdData reportStatusIdData = new ReportStatusIdData(); - Optional validatedELRModel = iValidatedELRRepository.findById(message); reportStatusIdData.setRawMessageId(validatedELRModel.get().getRawId()); reportStatusIdData.setNbsInterfaceUid(nbsInterfaceModel.getNbsInterfaceUid()); reportStatusIdData.setCreatedBy(convertedToXmlTopic); @@ -538,17 +547,8 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str } } private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) { - - // Update: changed method to async process, intensive process in this method cause consumer lagging, delay and strange behavior - // TODO: considering breaking down this logic (NOTE) //NOSONAR - // PROCESS as follow: - // - HL7 -> XML - // xml conversion can be broke down into multiple smaller pipeline - // - Saving record to status table can also be broke to downstream pipeline -// CompletableFuture.runAsync(() -> {//Caused the classnotfoundexception.//NOSONAR - log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml"); - xmlConversionHandlerProcessing(message, operation, dataProcessingEnable); -// });//NOSONAR + log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml"); + xmlConversionHandlerProcessing(message, operation, dataProcessingEnable); } private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception { Optional rawElrResponse = this.iRawELRRepository.findById(message); From 978e6918dd07022bde6dbd9c4ae85b6c9e299436 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Thu, 13 Feb 2025 17:10:44 -0700 Subject: [PATCH 4/6] added perform topic --- .../kafka/integration/service/KafkaConsumerService.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java index c59620db1..4bca8ca11 100644 --- a/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java +++ b/data-ingestion-service/src/main/java/gov/cdc/dataingestion/kafka/integration/service/KafkaConsumerService.java @@ -250,10 +250,12 @@ public void handleMessageForElrXml(String message, iReportStatusRepository.save(reportStatusIdData); if (dataProcessingApplied) { - kafkaProducerService.sendMessageAfterConvertedToXml(String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "elr_unprocessed", 0); + kafkaProducerService.sendMessageAfterConvertedToXml( + String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0); } else { - kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); + kafkaProducerService.sendMessageAfterConvertedToXml( + nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); } }); @@ -526,7 +528,7 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str } if (dataProcessingApplied) { - kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "elr_unprocessed", 0); //NOSONAR + kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "dp_elr_unprocessed", 0); //NOSONAR } else { kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0); } From de52cb32c43992a42bf274960fd6214a9a9510a8 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Mon, 17 Feb 2025 09:54:34 -0700 Subject: [PATCH 5/6] code coverage --- .../config/KafkaConsumerConfigTest.java | 49 ------------------ .../service/KafkaConsumerServiceTest.java | 51 ++++++++++++++++++- 2 files changed, 49 insertions(+), 51 deletions(-) delete mode 100644 data-ingestion-service/src/test/java/gov/cdc/dataingestion/config/KafkaConsumerConfigTest.java diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/config/KafkaConsumerConfigTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/config/KafkaConsumerConfigTest.java deleted file mode 100644 index 5f4e8163d..000000000 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/config/KafkaConsumerConfigTest.java +++ /dev/null @@ -1,49 +0,0 @@ -package gov.cdc.dataingestion.config; - -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.utility.DockerImageName; - -import java.time.Duration; -/** - 1118 - require constructor complaint - 125 - comment complaint - 6126 - String block complaint - 1135 - todos complaint - * */ -@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"}) -class KafkaConsumerConfigTest { - - private static KafkaConsumerConfig kafkaConsumerConfig; - - @Container - public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")) - .withStartupTimeout(Duration.ofMinutes(5)); - - @BeforeAll - public static void setUp() { - kafkaContainer.start(); - kafkaConsumerConfig = new KafkaConsumerConfig(); - } - - @AfterAll - public static void tearDown() { - kafkaContainer.stop(); - } - - @Test - void kafkaListenerContainerFactory_ConfigurationIsValid() { - - // Act - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = - kafkaConsumerConfig.kafkaListenerContainerFactory(); - - // Assert - Assertions.assertNotNull(kafkaListenerContainerFactory); - } -} diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java index 312dcad67..e6a20b71f 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java @@ -17,6 +17,7 @@ import gov.cdc.dataingestion.nbs.services.EcrMsgQueryService; import gov.cdc.dataingestion.report.repository.IRawELRRepository; import gov.cdc.dataingestion.report.repository.model.RawERLModel; +import gov.cdc.dataingestion.reportstatus.model.ReportStatusIdData; import gov.cdc.dataingestion.reportstatus.repository.IReportStatusRepository; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7DuplicateValidator; import gov.cdc.dataingestion.validation.integration.validator.interfaces.IHL7v2Validator; @@ -53,8 +54,7 @@ import java.util.Properties; import java.util.UUID; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; /** @@ -453,6 +453,53 @@ void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { } + @Test + void xmlPreparationConsumerTestNewFlow_Exception() throws XmlConversionException { + when(iValidatedELRRepository.findById(any())).thenReturn(Optional.empty()); + kafkaConsumerService.xmlConversionHandlerProcessing("123", EnumKafkaOperation.INJECTION.name(), "true"); + + verify(iValidatedELRRepository, times(1)).findById(any()); + + } + + @Test + void xmlPreparationConsumerTestNewFlow_KafkaExistResult() { + + + // Produce a test message to the topic + String message = guidForTesting; + produceMessage(xmlPrepTopic, message, EnumKafkaOperation.INJECTION); + + // Consume the message + ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + + // Perform assertions + assertEquals(1, records.count()); + + ConsumerRecord firstRecord = records.iterator().next(); + String value = firstRecord.value(); + + ValidatedELRModel model = new ValidatedELRModel(); + model.setId(guidForTesting); + model.setRawMessage(testHL7Message); + + when(iValidatedELRRepository.findById(guidForTesting)).thenReturn(Optional.of(model)); + + var rpt = new ReportStatusIdData(); + rpt.setNbsInterfaceUid(1); + when(iReportStatusRepository. + findByRawMessageId(any())).thenReturn(Optional.of(rpt)); + + + + kafkaConsumerService.xmlConversionHandlerProcessing(value, EnumKafkaOperation.INJECTION.name(), "true"); + + verify(iReportStatusRepository, times(1)).findByRawMessageId(any()); + + } + + + @Test void xmlPreparationConsumerTestReInjection_Exception() { From 2439942b370616c033f1eca9e96ed88972e7f98b Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Mon, 17 Feb 2025 10:09:52 -0700 Subject: [PATCH 6/6] clean up --- .../dataingestion/kafka/service/KafkaConsumerServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java index e6a20b71f..04395fd03 100644 --- a/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java +++ b/data-ingestion-service/src/test/java/gov/cdc/dataingestion/kafka/service/KafkaConsumerServiceTest.java @@ -454,7 +454,7 @@ void xmlPreparationConsumerTestNewFlow() throws XmlConversionException { } @Test - void xmlPreparationConsumerTestNewFlow_Exception() throws XmlConversionException { + void xmlPreparationConsumerTestNewFlow_Exception() { when(iValidatedELRRepository.findById(any())).thenReturn(Optional.empty()); kafkaConsumerService.xmlConversionHandlerProcessing("123", EnumKafkaOperation.INJECTION.name(), "true");