Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DI - Setting up support for multi partition and Kafka concurrency #300

Merged
merged 7 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public String process(String body){
status = body;
}
}
logger.info("ElrProcessStatusComponent status:{}", status);
logger.debug("ElrProcessStatusComponent status:{}", status);
return status;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cdc.dataingestion.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -43,21 +45,48 @@ public class DataSourceConfig {
@Value("${spring.datasource.password}")
private String password;


@Value("${spring.datasource.hikari.maximum-pool-size:100}")
private int maximumPoolSize;

@Value("${spring.datasource.hikari.minimum-idle:50}")
private int minimumIdle;

@Value("${spring.datasource.hikari.idle-timeout:120000}")
private long idleTimeout;

@Value("${spring.datasource.hikari.max-lifetime:1200000}")
private long maxLifetime;

@Value("${spring.datasource.hikari.connection-timeout:300000}")
private long connectionTimeout;

@Value("${spring.datasource.hikari.pool-name:OdseHikariCP}")
private String poolName;

@Bean()
public DataSource dataSource() {
String driverClassName = this.className;
String url = this.dbUrl;
String dbUserName = this.userName;
String dbUserPassword = this.password;

DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create();
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(driverClassName);
hikariConfig.setJdbcUrl(dbUrl);
hikariConfig.setUsername(dbUserName);
hikariConfig.setPassword(dbUserPassword);

// HikariCP-specific settings
hikariConfig.setMaximumPoolSize(maximumPoolSize);
hikariConfig.setMinimumIdle(minimumIdle);
hikariConfig.setIdleTimeout(idleTimeout);
hikariConfig.setMaxLifetime(maxLifetime);
hikariConfig.setConnectionTimeout(connectionTimeout);
hikariConfig.setPoolName(poolName);

dataSourceBuilder.driverClassName(driverClassName);
dataSourceBuilder.url(url);
dataSourceBuilder.username(dbUserName);
dataSourceBuilder.password(dbUserPassword);
return new HikariDataSource(hikariConfig);

return dataSourceBuilder.build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
Expand All @@ -25,8 +26,6 @@
* */
@SuppressWarnings({"java:S1118","java:S125", "java:S6126", "java:S1135"})
public class KafkaConsumerConfig {
@Value("${spring.kafka.group-id}")
private String groupId = "";

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers = "";
Expand All @@ -36,11 +35,37 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.maxPollIntervalMs}")
private String maxPollInterval = "";


@Value("${spring.kafka.thread}")
private Integer thread;

@Value("${spring.kafka.group-id-default}")
private String groupIdDefault;

@Value("${spring.kafka.group-id-raw}")
private String groupIdRaw;

@Value("${spring.kafka.group-id-raw-xml}")
private String groupIdRawXml;

@Value("${spring.kafka.group-id-validate}")
private String groupIdValidate;

@Value("${spring.kafka.group-id-xml}")
private String groupIdXml;

@Value("${spring.kafka.group-id-ecr-cda}")
private String groupIdEcrCda;

@Value("${spring.kafka.group-id-dlt-manual}")
private String groupIdDltManual;


@Bean
public ConsumerFactory<String, String> consumerFactory() {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdDefault);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
Expand All @@ -56,4 +81,66 @@ public ConsumerFactory<String, String> consumerFactory() {
factory.setConsumerFactory(consumerFactory());
return factory;
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRaw() {
return createContainerFactory(groupIdRaw);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRawXml() {
return createContainerFactory(groupIdRawXml);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryValidate() {
return createContainerFactory(groupIdValidate);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryXml() {
return createContainerFactory(groupIdXml);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryEcrCda() {
return createContainerFactory(groupIdEcrCda);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryDltManual() {
return createContainerFactory(groupIdDltManual);
}



private Map<String, Object> baseConsumerConfigs(String groupId) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // Fetch at least 1MB of data
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for data
config.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10 * 1024 * 1024); // Fetch up to 10MB per partition
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // Max 500 records per poll
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); // Allow 5 minutes for processing
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30-second session timeout
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // Heartbeat every 10 seconds
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // Manual commit
return config;
}

private ConsumerFactory<String, String> createConsumerFactory(String groupId) {
return new DefaultKafkaConsumerFactory<>(baseConsumerConfigs(groupId));
}

private ConcurrentKafkaListenerContainerFactory<String, String> createContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory(groupId));
factory.setConcurrency(thread);
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cdc.dataingestion.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand Down Expand Up @@ -40,16 +42,42 @@ public class NbsDataSourceConfig {
@Value("${spring.datasource.password}")
private String dbUserPassword;

@Value("${spring.datasource.hikari.maximum-pool-size:100}")
private int maximumPoolSize;

@Value("${spring.datasource.hikari.minimum-idle:50}")
private int minimumIdle;

@Value("${spring.datasource.hikari.idle-timeout:120000}")
private long idleTimeout;

@Value("${spring.datasource.hikari.max-lifetime:1200000}")
private long maxLifetime;

@Value("${spring.datasource.hikari.connection-timeout:300000}")
private long connectionTimeout;

@Value("${spring.datasource.hikari.pool-name:OdseHikariCP}")
private String poolName;

@Bean(name = "nbsDataSource")
public DataSource nbsDataSource() {
DataSourceBuilder<?> dataSourceBuilder = DataSourceBuilder.create();
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setDriverClassName(driverClassName);
hikariConfig.setJdbcUrl(dbUrl);
hikariConfig.setUsername(dbUserName);
hikariConfig.setPassword(dbUserPassword);


dataSourceBuilder.driverClassName(driverClassName);
dataSourceBuilder.url(dbUrl);
dataSourceBuilder.username(dbUserName);
dataSourceBuilder.password(dbUserPassword);
// HikariCP-specific settings
hikariConfig.setMaximumPoolSize(maximumPoolSize);
hikariConfig.setMinimumIdle(minimumIdle);
hikariConfig.setIdleTimeout(idleTimeout);
hikariConfig.setMaxLifetime(maxLifetime);
hikariConfig.setConnectionTimeout(connectionTimeout);
hikariConfig.setPoolName(poolName);

return dataSourceBuilder.build();
return new HikariDataSource(hikariConfig);
}

@Bean(name = "nbsTemplate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public KafkaConsumerService(

)
@KafkaListener(
topics = "${kafka.raw.topic}"
topics = "${kafka.raw.topic}",
containerFactory = "kafkaListenerContainerFactoryRaw"
)
public void handleMessageForRawElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Expand Down Expand Up @@ -216,12 +217,14 @@ public void handleMessageForRawElr(String message,

)
@KafkaListener(
topics = "${kafka.raw.xml-topic}"
topics = "${kafka.raw.xml-topic}",
containerFactory = "kafkaListenerContainerFactoryRawXml"
)
public void handleMessageForElrXml(String message,
@Header(KafkaHeaders.RECEIVED_KEY) String messageId,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) {
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable)
{

timeMetricsBuilder.recordElrRawXmlEventTime(() -> {
log.debug(topicDebugLog, messageId, topic);
Expand All @@ -247,10 +250,12 @@ public void handleMessageForElrXml(String message,
iReportStatusRepository.save(reportStatusIdData);

if (dataProcessingApplied) {
kafkaProducerService.sendMessageAfterConvertedToXml(String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "elr_unprocessed", 0);
kafkaProducerService.sendMessageAfterConvertedToXml(
String.valueOf(nbsInterfaceModel.getNbsInterfaceUid()), "dp_elr_unprocessed", 0);
}
else {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
kafkaProducerService.sendMessageAfterConvertedToXml(
nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
}
});

Expand Down Expand Up @@ -285,7 +290,9 @@ public void handleMessageForElrXml(String message,
JAXBException.class
}
)
@KafkaListener(topics = "${kafka.validation.topic}")
@KafkaListener(topics = "${kafka.validation.topic}",
containerFactory = "kafkaListenerContainerFactoryValidate"
)
public void handleMessageForValidatedElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.DATA_PROCESSING_ENABLE) String dataProcessingEnable) {
Expand All @@ -303,7 +310,8 @@ public void handleMessageForValidatedElr(String message,
/**
* XML Conversion
* */
@KafkaListener(topics = "${kafka.xml-conversion-prep.topic}")
@KafkaListener(topics = "${kafka.xml-conversion-prep.topic}",
containerFactory = "kafkaListenerContainerFactoryXml")
public void handleMessageForXmlConversionElr(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaderValue.MESSAGE_OPERATION) String operation,
Expand Down Expand Up @@ -337,9 +345,9 @@ public void handleMessageForXmlConversionElr(String message,
}

)

@KafkaListener(
topics = "xml_prep_dlt_manual"
topics = "xml_prep_dlt_manual",
containerFactory = "kafkaListenerContainerFactoryDltManual"
)
public void handleDltManual(
String message,
Expand Down Expand Up @@ -464,6 +472,15 @@ private void preparationForConversionHandler(String message, String dataProcessi
public void xmlConversionHandlerProcessing(String message, String operation, String dataProcessingEnable) {
String hl7Msg = "";
try {
Optional<ValidatedELRModel> validatedELRModel = iValidatedELRRepository.findById(message);
if (validatedELRModel.isEmpty()) {
throw new XmlConversionException("Message Not Found in Validated");
}
var statusCheck = iReportStatusRepository.findByRawMessageId(validatedELRModel.get().getRawId());
if (statusCheck.isPresent() && statusCheck.get().getNbsInterfaceUid() != null) {
logger.info("Kafka Rebalancing Error Hit");
return;
}
if (operation.equalsIgnoreCase(EnumKafkaOperation.INJECTION.name())) {
Optional<ValidatedELRModel> validatedElrResponse = this.iValidatedELRRepository.findById(message);
hl7Msg = validatedElrResponse.map(ValidatedELRModel::getRawMessage).orElse("");
Expand All @@ -483,7 +500,7 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
// Modified from debug ==> info to capture xml for analysis.
// Please leave below at "info" level for the time being, before going live,
// this will be changed to debug
log.info("rhapsodyXml: {}", rhapsodyXml);
log.debug("rhapsodyXml: {}", rhapsodyXml);

boolean dataProcessingApplied = Boolean.parseBoolean(dataProcessingEnable);
NbsInterfaceModel nbsInterfaceModel = nbsRepositoryServiceProvider.saveXmlMessage(message, rhapsodyXml, parsedMessage, dataProcessingApplied);
Expand All @@ -499,7 +516,6 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
else {
customMetricsBuilder.incrementXmlConversionRequestedSuccess();
ReportStatusIdData reportStatusIdData = new ReportStatusIdData();
Optional<ValidatedELRModel> validatedELRModel = iValidatedELRRepository.findById(message);
reportStatusIdData.setRawMessageId(validatedELRModel.get().getRawId());
reportStatusIdData.setNbsInterfaceUid(nbsInterfaceModel.getNbsInterfaceUid());
reportStatusIdData.setCreatedBy(convertedToXmlTopic);
Expand All @@ -512,7 +528,7 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
}

if (dataProcessingApplied) {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "elr_unprocessed", 0); //NOSONAR
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), "dp_elr_unprocessed", 0); //NOSONAR
} else {
kafkaProducerService.sendMessageAfterConvertedToXml(nbsInterfaceModel.getNbsInterfaceUid().toString(), convertedToXmlTopic, 0);
}
Expand All @@ -533,17 +549,8 @@ public void xmlConversionHandlerProcessing(String message, String operation, Str
}
}
private void xmlConversionHandler(String message, String operation, String dataProcessingEnable) {

// Update: changed method to async process, intensive process in this method cause consumer lagging, delay and strange behavior
// TODO: considering breaking down this logic (NOTE) //NOSONAR
// PROCESS as follow:
// - HL7 -> XML
// xml conversion can be broke down into multiple smaller pipeline
// - Saving record to status table can also be broke to downstream pipeline
// CompletableFuture.runAsync(() -> {//Caused the classnotfoundexception.//NOSONAR
log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml");
xmlConversionHandlerProcessing(message, operation, dataProcessingEnable);
// });//NOSONAR
log.debug("Received message id will be retrieved from db and associated hl7 will be converted to xml");
xmlConversionHandlerProcessing(message, operation, dataProcessingEnable);
}
private void validationHandler(String message, boolean hl7ValidationActivated, String dataProcessingEnable) throws DuplicateHL7FileFoundException, DiHL7Exception {
Optional<RawERLModel> rawElrResponse = this.iRawELRRepository.findById(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static RestTemplate restTemplate(RestTemplateBuilder builder) {
)
@PostMapping("/api/auth/token")
public ResponseEntity<String> token(@RequestHeader("clientid") String clientId, @RequestHeader("clientsecret") String clientSecret) {
log.info("Token URL : " + authTokenUri);
log.debug("Token URL : " + authTokenUri);
String accessToken = null;
String postBody = "grant_type=client_credentials" +
"&client_id=" + clientId
Expand Down
Loading
Loading