diff --git a/src/main/java/otter/jet/reader/ReaderConfiguration.java b/src/main/java/otter/jet/reader/ReaderConfiguration.java index 5f59ab6..96e936d 100644 --- a/src/main/java/otter/jet/reader/ReaderConfiguration.java +++ b/src/main/java/otter/jet/reader/ReaderConfiguration.java @@ -26,6 +26,7 @@ public ReaderService readerService( createNatsServerUrl(natsServerHost, natsServerPort), messageDeserializer, readerConfigurationProperties.getSubject(), + readerConfigurationProperties.getStartDate(), messageStore); } diff --git a/src/main/java/otter/jet/reader/ReaderConfigurationProperties.java b/src/main/java/otter/jet/reader/ReaderConfigurationProperties.java index 6aa245e..231074b 100644 --- a/src/main/java/otter/jet/reader/ReaderConfigurationProperties.java +++ b/src/main/java/otter/jet/reader/ReaderConfigurationProperties.java @@ -6,6 +6,11 @@ @ConfigurationProperties(prefix = "read") public class ReaderConfigurationProperties { private String subject = "*"; + private String startDate = ""; + + public String getStartDate() { + return startDate; + } public String getSubject() { return subject; @@ -15,6 +20,10 @@ public void setSubject(String subject) { this.subject = subject; } + public void setStartDate(String startDate) { + this.startDate = startDate; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/otter/jet/reader/ReaderService.java b/src/main/java/otter/jet/reader/ReaderService.java index 309e1f6..e567ab9 100644 --- a/src/main/java/otter/jet/reader/ReaderService.java +++ b/src/main/java/otter/jet/reader/ReaderService.java @@ -5,13 +5,17 @@ import io.nats.client.JetStreamApiException; import io.nats.client.Message; import io.nats.client.Nats; +import io.nats.client.PushSubscribeOptions; import io.nats.client.Subscription; - +import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.DeliverPolicy; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.concurrent.Executor; import java.util.concurrent.Executors; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -22,18 +26,22 @@ public class ReaderService { private static final Logger LOG = LoggerFactory.getLogger(ReaderService.class); private static final String NO_MATCHING_STREAM_CODE = "SUB-90007"; + private static final ZonedDateTime LOWEST_DATE = ZonedDateTime.of(1000, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC")); private final String natsServerUrl; private final MessageDeserializer messageDeserializer; private final String subject; private final MessageStore messageStore; + private final ZonedDateTime startDate; private final Executor executorService = Executors.newSingleThreadExecutor(); public ReaderService(String natsServerUrl, MessageDeserializer messageDeserializer, String subject, + String startDate, MessageStore messageStore) { + this.startDate = parseStartDate(startDate); this.natsServerUrl = natsServerUrl; this.messageDeserializer = messageDeserializer; this.subject = subject; @@ -46,6 +54,13 @@ public void startReadingMessages() { startMessageListener(); } + private static ZonedDateTime parseStartDate(String startDate) { + if(startDate.isBlank()){ + return LOWEST_DATE; + } + return ZonedDateTime.parse(startDate, DateTimeFormatter.ISO_DATE_TIME); + } + private void startMessageListener() { executorService.execute( () -> { @@ -71,7 +86,11 @@ private Subscription tryToSubscribe(JetStream jetStream) throws IOException, JetStreamApiException, InterruptedException { try { - return jetStream.subscribe(subject); + var options = PushSubscribeOptions.builder() + .configuration(getConsumerConfiguration(startDate)) + .build(); + return jetStream.subscribe(subject, options); + } catch (IllegalStateException e) { if (e.getMessage().contains(NO_MATCHING_STREAM_CODE)) { // No matching streams for subject // try again after 5 seconds @@ -86,6 +105,10 @@ private Subscription tryToSubscribe(JetStream jetStream) } } + private ConsumerConfiguration getConsumerConfiguration(ZonedDateTime startDate) { + return ConsumerConfiguration.builder().startTime(startDate).deliverPolicy(DeliverPolicy.ByStartTime).build(); + } + private void continuouslyReadMessages( Subscription subscription, MessageDeserializer messageDeserializer) throws InterruptedException { while (true) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a3aefe8..5fc6dd9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,6 +5,7 @@ read: subject: "*" store: limit: 1000 +# startDate: "2020-01-01T00:00:00Z" # format is "yyyy-MM-dd'T'HH:mm:ss'Z'" server: port: 1111 diff --git a/src/main/resources/templates/msgs-page.ftlh b/src/main/resources/templates/msgs-page.ftlh index 3624277..6b15aac 100644 --- a/src/main/resources/templates/msgs-page.ftlh +++ b/src/main/resources/templates/msgs-page.ftlh @@ -57,6 +57,11 @@ color: #333; } + .message__content.collapsed { + max-height: 0; + padding: 0; + } + /* Styles for the collapsible content */ .message__content { background: white; @@ -147,13 +152,13 @@ } @@ -223,19 +228,19 @@