Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MODSOURCE-840] Extend domain event with source MARC #664

Merged
merged 18 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@
</dependencies>

<properties>
<testcontainers.version>1.18.3</testcontainers.version>
<testcontainers.version>1.20.4</testcontainers.version>
<basedir>${project.parent.basedir}</basedir>
<ramlfiles_path>${project.parent.basedir}/ramls</ramlfiles_path>
<jooq.version>3.16.19</jooq.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.folio.consumers;

import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collections;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.RecordType;
Expand Down Expand Up @@ -34,6 +37,10 @@ public AuthorityDomainKafkaHandler(RecordService recordService) {
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
log.trace("handle:: Handling kafka record: '{}'", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var okapiHeaders = toOkapiHeaders(kafkaHeaders);

String authorityId = consumerRecord.key();
if (isUnexpectedDomainEvent(consumerRecord)) {
log.trace("handle:: Expected only {} domain type. Skipping authority domain kafka record [ID: '{}']",
Expand All @@ -48,7 +55,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
logInput(authorityId, eventSubType, tenantId);
return (switch (eventSubType) {
case SOFT_DELETE -> performSoftDelete(authorityId, tenantId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be a delete event on soft delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, we decided to send an update on the soft delete of Marc's records, but during further refinements, requirements could be changes

case HARD_DELETE -> performHardDelete(authorityId, tenantId);
case HARD_DELETE -> performHardDelete(authorityId, okapiHeaders);
}).onFailure(throwable -> logError(authorityId, eventSubType, tenantId));
}

Expand All @@ -66,8 +73,8 @@ private Future<String> performSoftDelete(String authorityId, String tenantId) {
}).map(authorityId);
}

private Future<String> performHardDelete(String authorityId, String tenantId) {
return recordService.deleteRecordsByExternalId(authorityId, tenantId).map(authorityId);
private Future<String> performHardDelete(String authorityId, Map<String, String> okapiHeaders) {
return recordService.deleteRecordsByExternalId(authorityId, okapiHeaders).map(authorityId);
}

private void logError(String authorityId, EventSubType subType, String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
* Deletes in transaction all records associated with externalId
*
* @param externalId external id
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with true if succeeded
*/
Future<Boolean> deleteRecordsByExternalId(String externalId, String tenantId);
Future<Boolean> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders);

/**
* Performs purge the 'DELETED' records.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.folio.services.util.AdditionalFieldsUtil.TAG_005;
import static org.folio.services.util.AdditionalFieldsUtil.TAG_00X_PREFIX;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -20,6 +21,7 @@
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.marc4j.MarcException;
import org.marc4j.MarcJsonReader;
import org.marc4j.MarcJsonWriter;
Expand Down Expand Up @@ -254,4 +256,13 @@ private static List<String> getSourceFields(String source) {
}
return sourceFields;
}

public static <T> T clone(T obj, Class<T> type) {
try {
final ObjectMapper jsonMapper = ObjectMapperTool.getMapper();
return jsonMapper.readValue(jsonMapper.writeValueAsString(obj), type);
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
* Deletes records by external id
*
* @param externalId external id
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with true if succeeded
*/
Future<Void> deleteRecordsByExternalId(String externalId, String tenantId);
Future<Void> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders);

/**
* Creates new updated Record with incremented generation linked to a new Snapshot, and sets OLD status to the "old" Record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ public Future<Boolean> deleteRecordsBySnapshotId(String snapshotId, String tenan
}

@Override
public Future<Void> deleteRecordsByExternalId(String externalId, String tenantId) {
return recordDao.deleteRecordsByExternalId(externalId, tenantId).map(b -> null);
public Future<Void> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders) {
return recordDao.deleteRecordsByExternalId(externalId, okapiHeaders).map(b -> null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_CREATED;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_DELETED;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_UPDATED;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.MarcUtil;
import org.folio.rest.jaxrs.model.Record;
import org.folio.services.kafka.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -31,39 +35,59 @@ public class RecordDomainEventPublisher {
private KafkaSender kafkaSender;

public void publishRecordCreated(Record created, Map<String, String> okapiHeaders) {
publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED);
publishRecord(new DomainEventPayload(null, simplifyRecord(created)), okapiHeaders, SOURCE_RECORD_CREATED);
}

public void publishRecordUpdated(Record updated, Map<String, String> okapiHeaders) {
publishRecord(updated, okapiHeaders, SOURCE_RECORD_UPDATED);
public void publishRecordUpdated(Record old, Record updated, Map<String, String> okapiHeaders) {
publishRecord(new DomainEventPayload(simplifyRecord(old), simplifyRecord(updated)), okapiHeaders, SOURCE_RECORD_UPDATED);
}

private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, SourceRecordDomainEventType eventType) {
if (!domainEventsEnabled || notValidForPublishing(aRecord)) {
public void publishRecordDeleted(Record deleted, Map<String, String> okapiHeaders) {
publishRecord(new DomainEventPayload(simplifyRecord(deleted), null), okapiHeaders, SOURCE_RECORD_DELETED);
}

private void publishRecord(DomainEventPayload domainEventPayload, Map<String, String> okapiHeaders, SourceRecordDomainEventType eventType) {
if (!domainEventsEnabled || notValidForPublishing(domainEventPayload)) {
return;
}
try {
Record aRecord = domainEventPayload.newRecord() != null ? domainEventPayload.newRecord() : domainEventPayload.oldRecord();
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
var jsonContent = JsonObject.mapFrom(aRecord);
var jsonContent = JsonObject.mapFrom(domainEventPayload);
kafkaSender.sendEventToKafka(okapiHeaders.get(OKAPI_TENANT_HEADER), jsonContent.encode(),
eventType.name(), kafkaHeaders, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
LOG.warn("Exception during Record domain event sending", e);
}
}

private boolean notValidForPublishing(Record aRecord) {
private boolean notValidForPublishing(DomainEventPayload domainEventPayload) {
if (domainEventPayload.newRecord() == null && domainEventPayload.oldRecord() == null) {
LOG.warn("Old and new records are null and won't be sent as domain event");
return true;
}
if (domainEventPayload.newRecord() != null && notValidRecord(domainEventPayload.newRecord())) {
return true;
}
return domainEventPayload.oldRecord() != null && notValidRecord(domainEventPayload.oldRecord());
}

private static boolean notValidRecord(Record aRecord) {
if (isNull(aRecord)) {
LOG.warn("Record is null and won't be sent as domain event");
return true;
}
if (isNull(aRecord.getRecordType())) {
LOG.error("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId());
LOG.warn("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getParsedRecord())) {
LOG.error("Record [with id {}] contains no parsed record and won't be sent as domain event", aRecord.getId());
LOG.warn("Record [with id {}] contains no parsed record and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getParsedRecord().getContent())) {
LOG.error("Record [with id {}] contains no parsed record content and won't be sent as domain event",
LOG.warn("Record [with id {}] contains no parsed record content and won't be sent as domain event",
aRecord.getId());
return true;
}
Expand All @@ -79,4 +103,15 @@ private List<KafkaHeader> getKafkaHeaders(Map<String, String> okapiHeaders, Reco
);
}

@JsonInclude(JsonInclude.Include.NON_NULL)
private record DomainEventPayload(@JsonProperty("old") Record oldRecord, @JsonProperty("new") Record newRecord) {}

private Record simplifyRecord(Record aRecord) {
if (aRecord != null) {
return MarcUtil.clone(aRecord, Record.class)
.withErrorRecord(null)
.withRawRecord(null);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.folio.services.domainevent;

public enum SourceRecordDomainEventType {
SOURCE_RECORD_CREATED, SOURCE_RECORD_UPDATED
SOURCE_RECORD_CREATED, SOURCE_RECORD_UPDATED, SOURCE_RECORD_DELETED
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.folio;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.folio.dbschema.ObjectMapperTool;

import java.io.File;
import java.io.IOException;
Expand All @@ -25,4 +28,13 @@ public static String resourceToString(String resource) {
throw new UncheckedIOException(e);
}
}

public static <T> T clone(T obj, Class<T> type) {
try {
final ObjectMapper jsonMapper = ObjectMapperTool.getMapper();
return jsonMapper.readValue(jsonMapper.writeValueAsString(obj), type);
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.folio.rest.jaxrs.model.Snapshot;
import org.folio.rest.jaxrs.model.SourceRecord;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.services.AbstractLBServiceTest;
import org.folio.services.RecordService;
import org.folio.services.RecordServiceImpl;
Expand Down Expand Up @@ -156,6 +157,9 @@ public void shouldHardDeleteMarcAuthorityRecordOnHardDeleteDomainEvent(TestConte
private ConsumerRecord<String, String> getConsumerRecord(HashMap<String, String> payload) {
ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("topic", 1, 1, recordId, Json.encode(payload));
consumerRecord.headers().add(new RecordHeader("domain-event-type", "DELETE".getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_URL_HEADER, OKAPI_URL.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TENANT_HEADER, TENANT_ID.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TOKEN_HEADER, TOKEN.getBytes(StandardCharsets.UTF_8)));
return consumerRecord;
}

Expand Down
Loading