Skip to content

Commit

Permalink
Delete a specific version of a schema (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv authored Aug 28, 2024
1 parent b42607c commit 119d1b4
Show file tree
Hide file tree
Showing 20 changed files with 576 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.CONNECTOR;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.Namespace;
Expand All @@ -11,6 +12,7 @@
import com.michelin.ns4kafka.service.ResourceQuotaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
Expand Down Expand Up @@ -143,8 +145,13 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
return Mono.just(formatHttpResponse(connector, status));
}

sendEventLog(connector, status, existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec());
sendEventLog(
connector,
status,
existingConnector.<Object>map(Connector::getSpec).orElse(null),
connector.getSpec(),
EMPTY_STRING
);

return Mono.just(formatHttpResponse(connectorService.createOrUpdate(connector), status));
});
Expand Down Expand Up @@ -180,7 +187,14 @@ public Mono<HttpResponse<Void>> delete(String namespace, String connector,
}

Connector connectorToDelete = optionalConnector.get();
sendEventLog(connectorToDelete, ApplyStatus.deleted, connectorToDelete.getSpec(), null);

sendEventLog(
connectorToDelete,
ApplyStatus.deleted,
connectorToDelete.getSpec(),
null,
EMPTY_STRING
);

return connectorService
.delete(ns, optionalConnector.get())
Expand Down Expand Up @@ -264,7 +278,13 @@ public Flux<Connector> importResources(String namespace, @QueryValue(defaultValu
return unsynchronizedConnector;
}

sendEventLog(unsynchronizedConnector, ApplyStatus.created, null, unsynchronizedConnector.getSpec());
sendEventLog(
unsynchronizedConnector,
ApplyStatus.created,
null,
unsynchronizedConnector.getSpec(),
EMPTY_STRING
);

return connectorService.createOrUpdate(unsynchronizedConnector);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConsumerGroupOperation;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.CONSUMER_GROUP_RESET_OFFSET;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.Namespace;
Expand Down Expand Up @@ -86,7 +87,14 @@ public List<ConsumerGroupResetOffsetsResponse> resetOffsets(String namespace, St
consumerGroupResetOffsets.getSpec().getMethod());

if (!dryrun) {
sendEventLog(consumerGroupResetOffsets, ApplyStatus.changed, null, consumerGroupResetOffsets.getSpec());
sendEventLog(
consumerGroupResetOffsets,
ApplyStatus.changed,
null,
consumerGroupResetOffsets.getSpec(),
EMPTY_STRING
);

consumerGroupService.alterConsumerGroupOffsets(ns, consumerGroup, preparedOffsets);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableValue;
import static com.michelin.ns4kafka.util.enumation.Kind.NAMESPACE;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NonNamespacedResourceController;
import com.michelin.ns4kafka.model.Namespace;
Expand Down Expand Up @@ -102,8 +103,13 @@ public HttpResponse<Namespace> apply(@Valid @Body Namespace namespace,
return formatHttpResponse(namespace, status);
}

sendEventLog(namespace, status, existingNamespace.<Object>map(Namespace::getSpec).orElse(null),
namespace.getSpec());
sendEventLog(
namespace,
status,
existingNamespace.<Object>map(Namespace::getSpec).orElse(null),
namespace.getSpec(),
EMPTY_STRING
);

return formatHttpResponse(namespaceService.createOrUpdate(namespace), status);
}
Expand Down Expand Up @@ -136,7 +142,15 @@ public HttpResponse<Void> delete(String namespace, @QueryValue(defaultValue = "f
}

var namespaceToDelete = optionalNamespace.get();
sendEventLog(namespaceToDelete, ApplyStatus.deleted, namespaceToDelete.getSpec(), null);

sendEventLog(
namespaceToDelete,
ApplyStatus.deleted,
namespaceToDelete.getSpec(),
null,
EMPTY_STRING
);

namespaceService.delete(optionalNamespace.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.michelin.ns4kafka.controller;

import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.model.RoleBinding;
Expand Down Expand Up @@ -88,8 +90,14 @@ public HttpResponse<RoleBinding> apply(String namespace, @Valid @Body RoleBindin
return formatHttpResponse(roleBinding, status);
}

sendEventLog(roleBinding, status, existingRoleBinding.<Object>map(RoleBinding::getSpec).orElse(null),
roleBinding.getSpec());
sendEventLog(
roleBinding,
status,
existingRoleBinding.<Object>map(RoleBinding::getSpec).orElse(null),
roleBinding.getSpec(),
EMPTY_STRING
);

roleBindingService.create(roleBinding);
return formatHttpResponse(roleBinding, status);
}
Expand All @@ -116,7 +124,15 @@ public HttpResponse<Void> delete(String namespace, String name,
}

var roleBindingToDelete = roleBinding.get();
sendEventLog(roleBindingToDelete, ApplyStatus.deleted, roleBindingToDelete.getSpec(), null);

sendEventLog(
roleBindingToDelete,
ApplyStatus.deleted,
roleBindingToDelete.getSpec(),
null,
EMPTY_STRING
);

roleBindingService.delete(roleBindingToDelete);
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.SCHEMA;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.Namespace;
Expand Down Expand Up @@ -72,7 +73,7 @@ public Mono<Schema> get(String namespace, String subject) {
return Mono.empty();
}

return schemaService.getLatestSubject(ns, subject);
return schemaService.getSubjectLatestVersion(ns, subject);
}

/**
Expand Down Expand Up @@ -128,11 +129,14 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
return schemaService
.register(ns, schema)
.map(id -> {
sendEventLog(schema, status,
sendEventLog(
schema,
status,
oldSchemas.isEmpty() ? null : oldSchemas.stream()
.max(Comparator.comparingInt(
(Schema s) -> s.getSpec().getId())),
schema.getSpec());
.max(Comparator.comparingInt((Schema s) -> s.getSpec().getId())),
schema.getSpec(),
EMPTY_STRING
);

return formatHttpResponse(schema, status);
});
Expand All @@ -142,16 +146,19 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schemas under the given subject.
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
*
* @param namespace The current namespace
* @param subject The current subject to delete
* @param dryrun Run in dry mode or not
* @param namespace The namespace
* @param subject The subject
* @param versionOptional The version of the schema to delete
* @param dryrun Run in dry mode or not
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String subject,
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

Expand All @@ -160,24 +167,38 @@ public Mono<HttpResponse<Void>> delete(String namespace, @PathVariable String su
return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject)));
}

return schemaService.getLatestSubject(ns, subject)
return versionOptional
// If version is specified, get the schema with the version
.map(version -> schemaService.getSubjectByVersion(ns, subject, version))
// If version is not specified, get the latest schema
.orElseGet(() -> schemaService.getSubjectLatestVersion(ns, subject))
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
if (latestSubjectOptional.isEmpty()) {
.flatMap(subjectOptional -> {
if (subjectOptional.isEmpty()) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

Schema schemaToDelete = latestSubjectOptional.get();
sendEventLog(schemaToDelete, ApplyStatus.deleted, schemaToDelete.getSpec(), null);
return (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, subject) :
schemaService.deleteVersion(ns, subject, versionOptional.get()))
.map(deletedVersionIds -> {
Schema deletedSchema = subjectOptional.get();

return schemaService
.delete(ns, subject)
.map(deletedSchemaIds -> HttpResponse.noContent());
sendEventLog(
deletedSchema,
ApplyStatus.deleted,
deletedSchema.getSpec(),
null,
versionOptional.map(v -> String.valueOf(deletedVersionIds)).orElse(EMPTY_STRING)
);

return HttpResponse.noContent();
});
});
}

Expand All @@ -198,7 +219,7 @@ public Mono<HttpResponse<SchemaCompatibilityState>> config(String namespace, @Pa
return Mono.error(new ResourceValidationException(SCHEMA, subject, invalidOwner(subject)));
}

return schemaService.getLatestSubject(ns, subject)
return schemaService.getSubjectLatestVersion(ns, subject)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
Expand All @@ -220,8 +241,13 @@ public Mono<HttpResponse<SchemaCompatibilityState>> config(String namespace, @Pa
return schemaService
.updateSubjectCompatibility(ns, latestSubjectOptional.get(), compatibility)
.map(schemaCompatibility -> {
sendEventLog(latestSubjectOptional.get(), ApplyStatus.changed,
latestSubjectOptional.get().getSpec().getCompatibility(), compatibility);
sendEventLog(
latestSubjectOptional.get(),
ApplyStatus.changed,
latestSubjectOptional.get().getSpec().getCompatibility(),
compatibility,
EMPTY_STRING
);

return HttpResponse.ok(state);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_STREAM;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.KafkaStream;
Expand Down Expand Up @@ -93,8 +94,13 @@ HttpResponse<KafkaStream> apply(String namespace, @Body @Valid KafkaStream strea
return formatHttpResponse(stream, status);
}

sendEventLog(stream, status, existingStream.<Object>map(KafkaStream::getMetadata).orElse(null),
stream.getMetadata());
sendEventLog(
stream,
status,
existingStream.<Object>map(KafkaStream::getMetadata).orElse(null),
stream.getMetadata(),
EMPTY_STRING
);

return formatHttpResponse(streamService.create(stream), status);
}
Expand Down Expand Up @@ -126,7 +132,15 @@ HttpResponse<Void> delete(String namespace, String stream, @QueryValue(defaultVa
}

var streamToDelete = optionalStream.get();
sendEventLog(streamToDelete, ApplyStatus.deleted, streamToDelete.getMetadata(), null);

sendEventLog(
streamToDelete,
ApplyStatus.deleted,
streamToDelete.getMetadata(),
null,
EMPTY_STRING
);

streamService.delete(ns, optionalStream.get());
return HttpResponse.noContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidKafkaUser;
import static com.michelin.ns4kafka.util.enumation.Kind.KAFKA_USER_RESET_PASSWORD;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;

import com.michelin.ns4kafka.controller.generic.NamespacedResourceController;
import com.michelin.ns4kafka.model.KafkaUserResetPassword;
Expand Down Expand Up @@ -61,7 +62,14 @@ public HttpResponse<KafkaUserResetPassword> resetPassword(String namespace, Stri
.build())
.build();

sendEventLog(response, ApplyStatus.changed, null, response.getSpec());
sendEventLog(
response,
ApplyStatus.changed,
null,
response.getSpec(),
EMPTY_STRING
);

return HttpResponse.ok(response);
}
}
Loading

0 comments on commit 119d1b4

Please sign in to comment.