diff --git a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java index 24597153..787bc3cb 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java @@ -2,6 +2,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.CONNECTOR; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; @@ -11,6 +12,7 @@ import com.michelin.ns4kafka.service.ResourceQuotaService; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import com.michelin.ns4kafka.util.exception.ResourceValidationException; +import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MutableHttpResponse; @@ -143,8 +145,13 @@ public Mono> apply(String namespace, @Valid @Body Connec return Mono.just(formatHttpResponse(connector, status)); } - sendEventLog(connector, status, existingConnector.map(Connector::getSpec).orElse(null), - connector.getSpec()); + sendEventLog( + connector, + status, + existingConnector.map(Connector::getSpec).orElse(null), + connector.getSpec(), + EMPTY_STRING + ); return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status)); }); @@ -180,7 +187,14 @@ public Mono> delete(String namespace, String connector, } Connector connectorToDelete = optionalConnector.get(); - sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null); + + sendEventLog( + connectorToDelete, + ApplyStatus.deleted, + connectorToDelete.getSpec(), + null, + EMPTY_STRING + ); return connectorService .delete(ns, optionalConnector.get()) @@ -264,7 +278,13 @@ public Flux importResources(String namespace, @QueryValue(defaultValu return unsynchronizedConnector; } - sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec()); + sendEventLog( + unsynchronizedConnector, + ApplyStatus.created, + null, + unsynchronizedConnector.getSpec(), + EMPTY_STRING + ); return connectorService.createOrUpdate(unsynchronizedConnector); }); diff --git a/src/main/java/com/michelin/ns4kafka/controller/ConsumerGroupController.java b/src/main/java/com/michelin/ns4kafka/controller/ConsumerGroupController.java index dec5960e..d5c06d9a 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/ConsumerGroupController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/ConsumerGroupController.java @@ -3,6 +3,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConsumerGroupOperation; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.CONSUMER_GROUP_RESET_OFFSET; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; @@ -86,7 +87,14 @@ public List resetOffsets(String namespace, St consumerGroupResetOffsets.getSpec().getMethod()); if (!dryrun) { - sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec()); + sendEventLog( + consumerGroupResetOffsets, + ApplyStatus.changed, + null, + consumerGroupResetOffsets.getSpec(), + EMPTY_STRING + ); + consumerGroupService.alterConsumerGroupOffsets(ns, consumerGroup, preparedOffsets); } diff --git a/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java b/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java index fb8a5141..1ee8be86 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/NamespaceController.java @@ -2,6 +2,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableValue; import static com.michelin.ns4kafka.util.enumation.Kind.NAMESPACE; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NonNamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; @@ -102,8 +103,13 @@ public HttpResponse apply(@Valid @Body Namespace namespace, return formatHttpResponse(namespace, status); } - sendEventLog(namespace, status, existingNamespace.map(Namespace::getSpec).orElse(null), - namespace.getSpec()); + sendEventLog( + namespace, + status, + existingNamespace.map(Namespace::getSpec).orElse(null), + namespace.getSpec(), + EMPTY_STRING + ); return formatHttpResponse(namespaceService.createOrUpdate(namespace), status); } @@ -136,7 +142,15 @@ public HttpResponse delete(String namespace, @QueryValue(defaultValue = "f } var namespaceToDelete = optionalNamespace.get(); - sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null); + + sendEventLog( + namespaceToDelete, + ApplyStatus.deleted, + namespaceToDelete.getSpec(), + null, + EMPTY_STRING + ); + namespaceService.delete(optionalNamespace.get()); return HttpResponse.noContent(); } diff --git a/src/main/java/com/michelin/ns4kafka/controller/RoleBindingController.java b/src/main/java/com/michelin/ns4kafka/controller/RoleBindingController.java index 38abe5bb..de08c4a3 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/RoleBindingController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/RoleBindingController.java @@ -1,5 +1,7 @@ package com.michelin.ns4kafka.controller; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; + import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; import com.michelin.ns4kafka.model.RoleBinding; @@ -88,8 +90,14 @@ public HttpResponse apply(String namespace, @Valid @Body RoleBindin return formatHttpResponse(roleBinding, status); } - sendEventLog(roleBinding, status, existingRoleBinding.map(RoleBinding::getSpec).orElse(null), - roleBinding.getSpec()); + sendEventLog( + roleBinding, + status, + existingRoleBinding.map(RoleBinding::getSpec).orElse(null), + roleBinding.getSpec(), + EMPTY_STRING + ); + roleBindingService.create(roleBinding); return formatHttpResponse(roleBinding, status); } @@ -116,7 +124,15 @@ public HttpResponse delete(String namespace, String name, } var roleBindingToDelete = roleBinding.get(); - sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null); + + sendEventLog( + roleBindingToDelete, + ApplyStatus.deleted, + roleBindingToDelete.getSpec(), + null, + EMPTY_STRING + ); + roleBindingService.delete(roleBindingToDelete); return HttpResponse.noContent(); } diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 1d5a9059..5538c264 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -2,6 +2,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.SCHEMA; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; @@ -72,7 +73,7 @@ public Mono get(String namespace, String subject) { return Mono.empty(); } - return schemaService.getLatestSubject(ns, subject); + return schemaService.getSubjectLatestVersion(ns, subject); } /** @@ -128,11 +129,14 @@ public Mono> apply(String namespace, @Valid @Body Schema sc return schemaService .register(ns, schema) .map(id -> { - sendEventLog(schema, status, + sendEventLog( + schema, + status, oldSchemas.isEmpty() ? null : oldSchemas.stream() - .max(Comparator.comparingInt( - (Schema s) -> s.getSpec().getId())), - schema.getSpec()); + .max(Comparator.comparingInt((Schema s) -> s.getSpec().getId())), + schema.getSpec(), + EMPTY_STRING + ); return formatHttpResponse(schema, status); }); @@ -142,16 +146,19 @@ public Mono> apply(String namespace, @Valid @Body Schema sc } /** - * Delete all schemas under the given subject. + * Delete all schema versions under the given subject, or a specific version of the schema if specified. * - * @param namespace The current namespace - * @param subject The current subject to delete - * @param dryrun Run in dry mode or not + * @param namespace The namespace + * @param subject The subject + * @param versionOptional The version of the schema to delete + * @param dryrun Run in dry mode or not * @return A HTTP response */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") - public Mono> delete(String namespace, @PathVariable String subject, + public Mono> delete(String namespace, + @PathVariable String subject, + @QueryValue("version") Optional versionOptional, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); @@ -160,11 +167,15 @@ public Mono> delete(String namespace, @PathVariable String su return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject))); } - return schemaService.getLatestSubject(ns, subject) + return versionOptional + // If version is specified, get the schema with the version + .map(version -> schemaService.getSubjectByVersion(ns, subject, version)) + // If version is not specified, get the latest schema + .orElseGet(() -> schemaService.getSubjectLatestVersion(ns, subject)) .map(Optional::of) .defaultIfEmpty(Optional.empty()) - .flatMap(latestSubjectOptional -> { - if (latestSubjectOptional.isEmpty()) { + .flatMap(subjectOptional -> { + if (subjectOptional.isEmpty()) { return Mono.just(HttpResponse.notFound()); } @@ -172,12 +183,22 @@ public Mono> delete(String namespace, @PathVariable String su return Mono.just(HttpResponse.noContent()); } - Schema schemaToDelete = latestSubjectOptional.get(); - sendEventLog(schemaToDelete, ApplyStatus.deleted, schemaToDelete.getSpec(), null); + return (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, subject) : + schemaService.deleteVersion(ns, subject, versionOptional.get())) + .map(deletedVersionIds -> { + Schema deletedSchema = subjectOptional.get(); - return schemaService - .delete(ns, subject) - .map(deletedSchemaIds -> HttpResponse.noContent()); + sendEventLog( + deletedSchema, + ApplyStatus.deleted, + deletedSchema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)).orElse(EMPTY_STRING) + ); + + return HttpResponse.noContent(); + }); }); } @@ -198,7 +219,7 @@ public Mono> config(String namespace, @Pa return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject))); } - return schemaService.getLatestSubject(ns, subject) + return schemaService.getSubjectLatestVersion(ns, subject) .map(Optional::of) .defaultIfEmpty(Optional.empty()) .flatMap(latestSubjectOptional -> { @@ -220,8 +241,13 @@ public Mono> config(String namespace, @Pa return schemaService .updateSubjectCompatibility(ns, latestSubjectOptional.get(), compatibility) .map(schemaCompatibility -> { - sendEventLog(latestSubjectOptional.get(), ApplyStatus.changed, - latestSubjectOptional.get().getSpec().getCompatibility(), compatibility); + sendEventLog( + latestSubjectOptional.get(), + ApplyStatus.changed, + latestSubjectOptional.get().getSpec().getCompatibility(), + compatibility, + EMPTY_STRING + ); return HttpResponse.ok(state); }); diff --git a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java index d8c17001..d19872c0 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java @@ -2,6 +2,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_STREAM; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.KafkaStream; @@ -93,8 +94,13 @@ HttpResponse apply(String namespace, @Body @Valid KafkaStream strea return formatHttpResponse(stream, status); } - sendEventLog(stream, status, existingStream.map(KafkaStream::getMetadata).orElse(null), - stream.getMetadata()); + sendEventLog( + stream, + status, + existingStream.map(KafkaStream::getMetadata).orElse(null), + stream.getMetadata(), + EMPTY_STRING + ); return formatHttpResponse(streamService.create(stream), status); } @@ -126,7 +132,15 @@ HttpResponse delete(String namespace, String stream, @QueryValue(defaultVa } var streamToDelete = optionalStream.get(); - sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null); + + sendEventLog( + streamToDelete, + ApplyStatus.deleted, + streamToDelete.getMetadata(), + null, + EMPTY_STRING + ); + streamService.delete(ns, optionalStream.get()); return HttpResponse.noContent(); } diff --git a/src/main/java/com/michelin/ns4kafka/controller/UserController.java b/src/main/java/com/michelin/ns4kafka/controller/UserController.java index 90b60c4c..f1700158 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/UserController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/UserController.java @@ -2,6 +2,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidKafkaUser; import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_RESET_PASSWORD; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.KafkaUserResetPassword; @@ -61,7 +62,14 @@ public HttpResponse resetPassword(String namespace, Stri .build()) .build(); - sendEventLog(response, ApplyStatus.changed, null, response.getSpec()); + sendEventLog( + response, + ApplyStatus.changed, + null, + response.getSpec(), + EMPTY_STRING + ); + return HttpResponse.ok(response); } } diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java index 7e92a8ef..6b6bb820 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java @@ -4,6 +4,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound; import static com.michelin.ns4kafka.util.enumation.Kind.ACCESS_CONTROL_ENTRY; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.AccessControlEntry; @@ -140,8 +141,13 @@ public HttpResponse apply(Authentication authentication, Str return formatHttpResponse(accessControlEntry, status); } - sendEventLog(accessControlEntry, status, existingAcl.map(AccessControlEntry::getSpec).orElse(null), - accessControlEntry.getSpec()); + sendEventLog( + accessControlEntry, + status, + existingAcl.map(AccessControlEntry::getSpec).orElse(null), + accessControlEntry.getSpec(), + EMPTY_STRING + ); return formatHttpResponse(aclService.create(accessControlEntry), status); } @@ -174,7 +180,13 @@ public HttpResponse delete(Authentication authentication, String namespace return HttpResponse.noContent(); } - sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null); + sendEventLog( + accessControlEntry, + ApplyStatus.deleted, + accessControlEntry.getSpec(), + null, + EMPTY_STRING + ); aclService.delete(accessControlEntry); return HttpResponse.noContent(); diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java index 91e5aad6..b0e8d489 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java @@ -4,6 +4,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotAllowed; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.enumation.Kind.CONNECT_CLUSTER; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; @@ -115,8 +116,13 @@ public Mono> apply(String namespace, @Body @Valid C return Mono.just(formatHttpResponse(connectCluster, status)); } - sendEventLog(connectCluster, status, - existingConnectCluster.map(ConnectCluster::getSpec).orElse(null), connectCluster.getSpec()); + sendEventLog( + connectCluster, + status, + existingConnectCluster.map(ConnectCluster::getSpec).orElse(null), + connectCluster.getSpec(), + EMPTY_STRING + ); return Mono.just(formatHttpResponse(connectClusterService.create(connectCluster), status)); }); @@ -161,7 +167,14 @@ public HttpResponse delete(String namespace, String connectCluster, } ConnectCluster connectClusterToDelete = optionalConnectCluster.get(); - sendEventLog(connectClusterToDelete, ApplyStatus.deleted, connectClusterToDelete.getSpec(), null); + + sendEventLog( + connectClusterToDelete, + ApplyStatus.deleted, + connectClusterToDelete.getSpec(), + null, + EMPTY_STRING + ); connectClusterService.delete(connectClusterToDelete); return HttpResponse.noContent(); diff --git a/src/main/java/com/michelin/ns4kafka/controller/generic/ResourceController.java b/src/main/java/com/michelin/ns4kafka/controller/generic/ResourceController.java index d730cf95..7a4eb0e3 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/generic/ResourceController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/generic/ResourceController.java @@ -1,10 +1,13 @@ package com.michelin.ns4kafka.controller.generic; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; + import com.michelin.ns4kafka.model.AuditLog; import com.michelin.ns4kafka.model.MetadataResource; import com.michelin.ns4kafka.security.ResourceBasedSecurityRule; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import io.micronaut.context.event.ApplicationEventPublisher; +import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpResponse; import io.micronaut.security.utils.SecurityService; import jakarta.inject.Inject; @@ -35,11 +38,23 @@ public HttpResponse formatHttpResponse(T body, ApplyStatus status) { * @param before the resource before the operation * @param after the resource after the operation */ - public void sendEventLog(MetadataResource resource, ApplyStatus operation, Object before, - Object after) { - AuditLog auditLog = new AuditLog(securityService.username().orElse(""), - securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN), Date.from(Instant.now()), - resource.getKind(), resource.getMetadata(), operation, before, after); + public void sendEventLog(MetadataResource resource, + ApplyStatus operation, + Object before, + Object after, + String version) { + AuditLog auditLog = new AuditLog( + securityService.username().orElse(EMPTY_STRING), + securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN), + Date.from(Instant.now()), + resource.getKind(), + resource.getMetadata(), + operation, + before, + after, + version + ); + applicationEventPublisher.publishEvent(auditLog); } } diff --git a/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java b/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java index 647062f5..8f8957d8 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java @@ -1,5 +1,7 @@ package com.michelin.ns4kafka.controller.quota; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; + import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.Namespace; import com.michelin.ns4kafka.model.quota.ResourceQuota; @@ -102,8 +104,13 @@ public HttpResponse apply(String namespace, @Body @Valid Resource return formatHttpResponse(quota, status); } - sendEventLog(quota, status, resourceQuotaOptional.map(ResourceQuota::getSpec).orElse(null), - quota.getSpec()); + sendEventLog( + quota, + status, + resourceQuotaOptional.map(ResourceQuota::getSpec).orElse(null), + quota.getSpec(), + EMPTY_STRING + ); return formatHttpResponse(resourceQuotaService.create(quota), status); } @@ -130,7 +137,15 @@ public HttpResponse delete(String namespace, String name, } ResourceQuota resourceQuotaToDelete = resourceQuota.get(); - sendEventLog(resourceQuotaToDelete, ApplyStatus.deleted, resourceQuotaToDelete.getSpec(), null); + + sendEventLog( + resourceQuotaToDelete, + ApplyStatus.deleted, + resourceQuotaToDelete.getSpec(), + null, + EMPTY_STRING + ); + resourceQuotaService.delete(resourceQuotaToDelete); return HttpResponse.noContent(); } diff --git a/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java b/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java index 9c644e6a..a8ff2f6c 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/topic/TopicController.java @@ -4,6 +4,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidTopicCollide; import static com.michelin.ns4kafka.util.enumation.Kind.TOPIC; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.michelin.ns4kafka.controller.generic.NamespacedResourceController; import com.michelin.ns4kafka.model.DeleteRecordsResponse; @@ -144,7 +145,13 @@ public HttpResponse apply(String namespace, @Valid @Body Topic topic, return formatHttpResponse(topic, status); } - sendEventLog(topic, status, existingTopic.map(Topic::getSpec).orElse(null), topic.getSpec()); + sendEventLog( + topic, + status, + existingTopic.map(Topic::getSpec).orElse(null), + topic.getSpec(), + EMPTY_STRING + ); return formatHttpResponse(topicService.create(topic), status); } @@ -178,7 +185,15 @@ public HttpResponse delete(String namespace, String topic, } Topic topicToDelete = optionalTopic.get(); - sendEventLog(topicToDelete, ApplyStatus.deleted, topicToDelete.getSpec(), null); + + sendEventLog( + topicToDelete, + ApplyStatus.deleted, + topicToDelete.getSpec(), + null, + EMPTY_STRING + ); + topicService.delete(optionalTopic.get()); return HttpResponse.noContent(); @@ -214,7 +229,14 @@ public List importResources(String namespace, @QueryValue(defaultValue = return unsynchronizedTopics .stream() .map(topic -> { - sendEventLog(topic, ApplyStatus.created, null, topic.getSpec()); + sendEventLog( + topic, + ApplyStatus.created, + null, + topic.getSpec(), + EMPTY_STRING + ); + return topicService.create(topic); }) .toList(); @@ -256,7 +278,14 @@ public List deleteRecords(String namespace, String topic, if (dryrun) { deletedRecords = recordsToDelete; } else { - sendEventLog(optionalTopic.get(), ApplyStatus.deleted, null, null); + sendEventLog( + optionalTopic.get(), + ApplyStatus.deleted, + null, + null, + EMPTY_STRING + ); + deletedRecords = topicService.deleteRecords(optionalTopic.get(), recordsToDelete); } diff --git a/src/main/java/com/michelin/ns4kafka/log/ConsoleLogListener.java b/src/main/java/com/michelin/ns4kafka/log/ConsoleLogListener.java index a3f569ca..6babe2a9 100644 --- a/src/main/java/com/michelin/ns4kafka/log/ConsoleLogListener.java +++ b/src/main/java/com/michelin/ns4kafka/log/ConsoleLogListener.java @@ -1,5 +1,7 @@ package com.michelin.ns4kafka.log; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; + import com.michelin.ns4kafka.model.AuditLog; import io.micronaut.context.annotation.Requires; import io.micronaut.context.event.ApplicationEventListener; @@ -17,12 +19,13 @@ public class ConsoleLogListener implements ApplicationEventListener { @Override public void onApplicationEvent(AuditLog event) { - log.info("{} {} {} {} {} in namespace {} on cluster {}.", + log.info("{} {} {} {}{} {} in namespace {} on cluster {}.", event.isAdmin() ? "Admin" : "User", event.getUser(), event.getOperation(), event.getKind(), event.getMetadata().getName(), + StringUtils.isEmpty(event.getVersion()) ? EMPTY_STRING : " version " + event.getVersion(), event.getMetadata().getNamespace(), event.getMetadata().getCluster() ); diff --git a/src/main/java/com/michelin/ns4kafka/model/AuditLog.java b/src/main/java/com/michelin/ns4kafka/model/AuditLog.java index f9174148..c3e3bb75 100644 --- a/src/main/java/com/michelin/ns4kafka/model/AuditLog.java +++ b/src/main/java/com/michelin/ns4kafka/model/AuditLog.java @@ -23,4 +23,5 @@ public class AuditLog { private ApplyStatus operation; private Object before; private Object after; + private String version; } diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index b6c716bb..597abc25 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -115,7 +115,7 @@ public Flux getAllSubjectVersions(Namespace namespace, String subject) { * @param version The version * @return A Subject */ - public Mono getSubject(Namespace namespace, String subject, String version) { + public Mono getSubjectByVersion(Namespace namespace, String subject, String version) { return schemaRegistryClient .getSubject(namespace.getMetadata().getCluster(), subject, version) .flatMap(latestSubjectOptional -> schemaRegistryClient @@ -151,8 +151,8 @@ public Mono getSubject(Namespace namespace, String subject, String versi * @param subject The subject * @return A schema */ - public Mono getLatestSubject(Namespace namespace, String subject) { - return getSubject(namespace, subject, "latest"); + public Mono getSubjectLatestVersion(Namespace namespace, String subject) { + return getSubjectByVersion(namespace, subject, "latest"); } /** @@ -194,7 +194,8 @@ public Mono> validateSchema(Namespace namespace, Schema schema) { */ private Mono> validateReferences(Namespace ns, Schema schema) { return Flux.fromIterable(schema.getSpec().getReferences()) - .flatMap(reference -> getSubject(ns, reference.getSubject(), String.valueOf(reference.getVersion())) + .flatMap(reference -> getSubjectByVersion(ns, reference.getSubject(), + String.valueOf(reference.getVersion())) .map(Optional::of) .defaultIfEmpty(Optional.empty()) .mapNotNull(schemaOptional -> { @@ -225,20 +226,37 @@ public Mono register(Namespace namespace, Schema schema) { } /** - * Delete all schemas under the given subject. + * Delete all the schema versions under the given subject. * - * @param namespace The current namespace - * @param subject The current subject to delete + * @param namespace The namespace + * @param subject The subject to delete * @return The list of deleted versions */ - public Mono delete(Namespace namespace, String subject) { + public Mono deleteAllVersions(Namespace namespace, String subject) { return schemaRegistryClient .deleteSubject(namespace.getMetadata().getCluster(), subject, false) - .flatMap(ids -> schemaRegistryClient + .flatMap(softDeletedVersionIds -> schemaRegistryClient .deleteSubject(namespace.getMetadata().getCluster(), subject, true)); } + /** + * Delete the schema version under the given subject. + * + * @param namespace The namespace + * @param subject The subject + * @param version The version of the schema to delete + * @return The latest subject after deletion + */ + public Mono deleteVersion(Namespace namespace, String subject, String version) { + return schemaRegistryClient + .deleteSubjectVersion(namespace.getMetadata().getCluster(), subject, version, false) + .flatMap(softDeletedVersionIds -> schemaRegistryClient + .deleteSubjectVersion(namespace.getMetadata().getCluster(), + subject, Integer.toString(softDeletedVersionIds), true) + ); + } + /** * Validate the schema compatibility. * @@ -317,7 +335,8 @@ public Mono> getSchemaReferences(Schema schema, Namespace na } return Flux.fromIterable(schema.getSpec().getReferences()) - .flatMap(reference -> getSubject(namespace, reference.getSubject(), String.valueOf(reference.getVersion()))) + .flatMap(reference -> getSubjectByVersion(namespace, reference.getSubject(), + String.valueOf(reference.getVersion()))) .collectMap(s -> s.getMetadata().getName(), s -> s.getSpec().getSchema()); } diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 300cae82..0060a20b 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -39,6 +39,7 @@ public class SchemaRegistryClient { private static final String SUBJECTS = "/subjects/"; private static final String CONFIG = "/config/"; + private static final String VERSIONS = "/versions/"; @Inject @Client(id = "schema-registry") @@ -71,7 +72,7 @@ public Flux getSubjects(String kafkaCluster) { public Mono getSubject(String kafkaCluster, String subject, String version) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/" + version))) + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, SchemaResponse.class)) .onErrorResume(HttpClientResponseException.class, @@ -95,7 +96,7 @@ public Flux getAllSubjectVersions(String kafkaCluster, String su .flatMap(ids -> Flux.fromIterable(Arrays.asList(ids)) .flatMap(id -> { HttpRequest requestVersion = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/" + id))) + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + id))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return httpClient.retrieve(requestVersion, SchemaResponse.class); @@ -137,6 +138,25 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea return Mono.from(httpClient.retrieve(request, Integer[].class)); } + /** + * Delete a subject. + * + * @param kafkaCluster The Kafka cluster + * @param subject The subject + * @param version The version + * @param hardDelete Should the subject be hard deleted or not + * @return The version of the deleted subject + */ + public Mono deleteSubjectVersion(String kafkaCluster, String subject, String version, + boolean hardDelete) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + MutableHttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version + + "?permanent=" + hardDelete))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, Integer.class)); + } + /** * Validate the schema compatibility. * diff --git a/src/main/java/com/michelin/ns4kafka/validation/ResourceValidator.java b/src/main/java/com/michelin/ns4kafka/validation/ResourceValidator.java index 3658207f..2008e206 100644 --- a/src/main/java/com/michelin/ns4kafka/validation/ResourceValidator.java +++ b/src/main/java/com/michelin/ns4kafka/validation/ResourceValidator.java @@ -6,6 +6,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidFieldValidationNull; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidFieldValidationNumber; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidFieldValidationOneOf; +import static io.micronaut.core.util.StringUtils.EMPTY_STRING; import com.fasterxml.jackson.annotation.JsonSetter; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -328,7 +329,7 @@ public void ensureValid(String name, Object value) { @Override public String toString() { if (validators == null) { - return ""; + return EMPTY_STRING; } StringBuilder desc = new StringBuilder(); for (ResourceValidator.Validator v : validators) { diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index caa7d191..7c0097e6 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -319,7 +319,7 @@ void shouldGetSchema() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())) + when(schemaService.getSubjectLatestVersion(namespace, schema.getMetadata().getName())) .thenReturn(Mono.just(schema)); StepVerifier.create(schemaController.get("myNamespace", "prefix.subject-value")) @@ -341,7 +341,7 @@ void shouldNotGetSchemaWhenNotOwner() { StepVerifier.create(schemaController.get("myNamespace", "prefix.subject-value")) .verifyComplete(); - verify(schemaService, never()).getLatestSubject(namespace, schema.getMetadata().getName()); + verify(schemaService, never()).getSubjectLatestVersion(namespace, schema.getMetadata().getName()); } @Test @@ -353,7 +353,7 @@ void shouldNotUpdateCompatibilityWhenSubjectNotExist() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) .thenReturn(Mono.empty()); StepVerifier.create( @@ -373,7 +373,7 @@ void shouldUpdateCompatibility() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) .thenReturn(Mono.just(schema)); when(schemaService.updateSubjectCompatibility(namespace, schema, Schema.Compatibility.FORWARD)) .thenReturn(Mono.just(SchemaCompatibilityResponse.builder() @@ -401,7 +401,7 @@ void shouldNotChangeCompatibility() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) .thenReturn(Mono.just(schema)); StepVerifier.create( @@ -442,7 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() { } @Test - void shouldNotDeleteSchemaWhenNotOwner() { + void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { Namespace namespace = buildNamespace(); when(namespaceService.findByName("myNamespace")) @@ -450,7 +450,30 @@ void shouldNotDeleteSchemaWhenNotOwner() { when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) .thenReturn(false); - StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", false)) + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " + + "namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); + + verify(schemaService, never()).getSubjectLatestVersion(any(), any()); + verify(schemaService, never()).deleteAllVersions(any(), any()); + } + + @Test + void shouldNotDeleteOneSchemaVersionWhenNotOwner() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) + .thenReturn(false); + + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) .consumeErrorWith(error -> { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); @@ -460,11 +483,12 @@ void shouldNotDeleteSchemaWhenNotOwner() { }) .verify(); - verify(schemaService, never()).updateSubjectCompatibility(any(), any(), any()); + verify(schemaService, never()).getSubjectByVersion(any(), any(), any()); + verify(schemaService, never()).deleteVersion(any(), any(), any()); } @Test - void shouldDeleteSchema() { + void shouldDeleteAllSchemaVersions() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -472,9 +496,9 @@ void shouldDeleteSchema() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) .thenReturn(Mono.just(schema)); - when(schemaService.delete(namespace, "prefix.subject-value")) + when(schemaService.deleteAllVersions(namespace, "prefix.subject-value")) .thenReturn(Mono.just(new Integer[1])); when(securityService.username()) .thenReturn(Optional.of("test-user")); @@ -482,31 +506,91 @@ void shouldDeleteSchema() { .thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); - StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", false)) + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.empty(), false)) .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) .verifyComplete(); - verify(schemaService).delete(namespace, "prefix.subject-value"); + verify(schemaService).deleteAllVersions(namespace, "prefix.subject-value"); } @Test - void shouldNotDeleteSchemaWhenEmpty() { + void shouldDeleteSchemaVersion() { Namespace namespace = buildNamespace(); + Schema schema1 = buildSchema(); when(namespaceService.findByName("myNamespace")) .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema1)); + when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(1)); + + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + @Test + void shouldNotDeleteAllSchemaVersionsWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) + .thenReturn(true); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotDeleteSchemaVersionWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) + .thenReturn(true); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) .thenReturn(Mono.empty()); - StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", false)) + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + + @Test + void shouldNotDeleteAllSchemaVersionsInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) + .thenReturn(true); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.empty(), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); } @Test - void shouldDeleteSchemaInDryRunMode() { + void shouldNotDeleteSchemaVersionInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -514,14 +598,14 @@ void shouldDeleteSchemaInDryRunMode() { .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) .thenReturn(true); - when(schemaService.getLatestSubject(namespace, "prefix.subject-value")) + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) .thenReturn(Mono.just(schema)); - StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", true)) + StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.of("1"), true)) .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) .verifyComplete(); - verify(schemaService, never()).delete(namespace, "prefix.subject-value"); + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); } private Namespace buildNamespace() { diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index 18ea97f1..f829f208 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -766,4 +766,156 @@ void shouldRegisterSameSchemaTwice() { assertEquals(2, schemaAfterPostOnRegistry.version()); assertEquals("ns1-subject3-value", schemaAfterPostOnRegistry.subject()); } + + @Test + void shouldDeleteSchema() { + Schema schemaV1 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + // Register V1 schema + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV1), Schema.class); + + Schema schemaV2 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + // Register V2 schema + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV2), Schema.class); + + Schema schemaV3 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}]}") + .build()) + .build(); + + // Register V3 schema + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV3), Schema.class); + + Schema schemaV4 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"age\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Age of the person\"}]}") + .build()) + .build(); + + // Register V4 schema + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV4), Schema.class); + + // Delete latest schema version + var deleteLatestVersionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=latest") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteLatestVersionResponse.getStatus()); + + // Get all schemas + var getSchemaAfterLatestVersionDeletionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .bearerAuth(token), Schema.class); + + // Expects v3 is returned by ns4kafka + assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().isPresent()); + assertEquals(3, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); + + // Delete old schema version + var deleteOldVersionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=1") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteOldVersionResponse.getStatus()); + + // Get all schemas + var getSchemaAfterOldVersionDeletionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .bearerAuth(token), Schema.class); + + // Expects v3 as returned schema + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); + assertEquals(3, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + + // Delete all remaining schema versions + var deleteAllVersionsResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteAllVersionsResponse.getStatus()); + + // Get all schemas + var getSchemaAfterAllVersionsDeletionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas") + .bearerAuth(token), Argument.listOf(SchemaList.class)); + + assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isPresent()); + assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get() + .stream() + .noneMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject4-value"))); + } } diff --git a/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java index c41d0dab..2bd7c4fc 100644 --- a/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java @@ -219,7 +219,7 @@ void shouldGetBySubjectAndVersion() { when(schemaRegistryClient.getCurrentCompatibilityBySubject(any(), any())).thenReturn( Mono.just(compatibilityResponse)); - StepVerifier.create(schemaService.getLatestSubject(namespace, "prefix.schema-one")) + StepVerifier.create(schemaService.getSubjectLatestVersion(namespace, "prefix.schema-one")) .consumeNextWith(latestSubject -> { assertEquals("prefix.schema-one", latestSubject.getMetadata().getName()); assertEquals("local", latestSubject.getMetadata().getCluster()); @@ -253,7 +253,7 @@ void shouldGetBySubjectAndVersionWhenEmpty() { when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(), "prefix.schema-one", "latest")) .thenReturn(Mono.empty()); - StepVerifier.create(schemaService.getLatestSubject(namespace, "prefix.schema-one")) + StepVerifier.create(schemaService.getSubjectLatestVersion(namespace, "prefix.schema-one")) .verifyComplete(); } @@ -271,7 +271,7 @@ void shouldRegisterSchema() { } @Test - void shouldDeleteSchema() { + void shouldDeleteSchemaAllVersions() { Namespace namespace = buildNamespace(); when(schemaRegistryClient.deleteSubject(namespace.getMetadata().getCluster(), @@ -280,7 +280,7 @@ void shouldDeleteSchema() { when(schemaRegistryClient.deleteSubject(namespace.getMetadata().getCluster(), "prefix.schema-one", true)).thenReturn(Mono.just(new Integer[] {1})); - StepVerifier.create(schemaService.delete(namespace, "prefix.schema-one")) + StepVerifier.create(schemaService.deleteAllVersions(namespace, "prefix.schema-one")) .consumeNextWith(ids -> { assertEquals(1, ids.length); assertEquals(1, ids[0]); @@ -294,6 +294,20 @@ void shouldDeleteSchema() { "prefix.schema-one", true); } + @Test + void shouldDeleteSchemaSpecificVersion() { + Namespace namespace = buildNamespace(); + when(schemaRegistryClient.deleteSubjectVersion(namespace.getMetadata().getCluster(), + "prefix.schema-one-value", "2", false)).thenReturn(Mono.just(2)); + + when(schemaRegistryClient.deleteSubjectVersion(namespace.getMetadata().getCluster(), + "prefix.schema-one-value", "2", true)).thenReturn(Mono.just(2)); + + StepVerifier.create(schemaService.deleteVersion(namespace, "prefix.schema-one-value", "2")) + .consumeNextWith(version -> assertEquals(2, version)) + .verifyComplete(); + } + @Test void shouldValidateSchemaCompatibility() { Namespace namespace = buildNamespace();