Skip to content

Commit

Permalink
Add start date for reading sms
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Rabiej committed Oct 11, 2024
1 parent ade75a7 commit 0c87c00
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/main/java/otter/jet/reader/ReaderConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public ReaderService readerService(
createNatsServerUrl(natsServerHost, natsServerPort),
messageDeserializer,
readerConfigurationProperties.getSubject(),
readerConfigurationProperties.getStartDate(),
messageStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
@ConfigurationProperties(prefix = "read")
public class ReaderConfigurationProperties {
private String subject = "*";
private String startDate = "";

public String getStartDate() {
return startDate;
}

public String getSubject() {
return subject;
Expand All @@ -15,6 +20,10 @@ public void setSubject(String subject) {
this.subject = subject;
}

public void setStartDate(String startDate) {
this.startDate = startDate;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/otter/jet/reader/ReaderService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import io.nats.client.JetStreamApiException;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.Subscription;

import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand All @@ -22,18 +26,22 @@ public class ReaderService {

private static final Logger LOG = LoggerFactory.getLogger(ReaderService.class);
private static final String NO_MATCHING_STREAM_CODE = "SUB-90007";
private static final ZonedDateTime LOWEST_DATE = ZonedDateTime.of(1000, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC"));

private final String natsServerUrl;
private final MessageDeserializer messageDeserializer;
private final String subject;
private final MessageStore messageStore;
private final ZonedDateTime startDate;

private final Executor executorService = Executors.newSingleThreadExecutor();

public ReaderService(String natsServerUrl,
MessageDeserializer messageDeserializer,
String subject,
String startDate,
MessageStore messageStore) {
this.startDate = parseStartDate(startDate);
this.natsServerUrl = natsServerUrl;
this.messageDeserializer = messageDeserializer;
this.subject = subject;
Expand All @@ -46,6 +54,13 @@ public void startReadingMessages() {
startMessageListener();
}

private static ZonedDateTime parseStartDate(String startDate) {
if(startDate.isBlank()){
return LOWEST_DATE;
}
return ZonedDateTime.parse(startDate, DateTimeFormatter.ISO_DATE_TIME);
}

private void startMessageListener() {
executorService.execute(
() -> {
Expand All @@ -71,7 +86,11 @@ private Subscription tryToSubscribe(JetStream jetStream)
throws IOException, JetStreamApiException, InterruptedException {

try {
return jetStream.subscribe(subject);
var options = PushSubscribeOptions.builder()
.configuration(getConsumerConfiguration(startDate))
.build();
return jetStream.subscribe(subject, options);

} catch (IllegalStateException e) {
if (e.getMessage().contains(NO_MATCHING_STREAM_CODE)) { // No matching streams for subject
// try again after 5 seconds
Expand All @@ -86,6 +105,10 @@ private Subscription tryToSubscribe(JetStream jetStream)
}
}

private ConsumerConfiguration getConsumerConfiguration(ZonedDateTime startDate) {
return ConsumerConfiguration.builder().startTime(startDate).deliverPolicy(DeliverPolicy.ByStartTime).build();
}

private void continuouslyReadMessages(
Subscription subscription, MessageDeserializer messageDeserializer) throws InterruptedException {
while (true) {
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ read:
subject: "*"
store:
limit: 1000
# startDate: "2020-01-01T00:00:00Z" # format is "yyyy-MM-dd'T'HH:mm:ss'Z'"

server:
port: 1111
Expand Down
35 changes: 20 additions & 15 deletions src/main/resources/templates/msgs-page.ftlh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
color: #333;
}

.message__content.collapsed {
max-height: 0;
padding: 0;
}

/* Styles for the collapsible content */
.message__content {
background: white;
Expand Down Expand Up @@ -147,13 +152,13 @@
}
</style>
<script>
const htmlDecode = (input) => {
const doc = new DOMParser().parseFromString(input, "text/html");
function htmlDecode(input) {
var doc = new DOMParser().parseFromString(input, "text/html");
return doc.documentElement.textContent;
}

const toggleCollapse = (contentId, button) => {
const element = document.getElementById(contentId);
function toggleCollapse(contentId, button) {
var element = document.getElementById(contentId);
element.classList.toggle("collapsed");

// Update button text based on the state
Expand All @@ -164,9 +169,9 @@
}
}

const formatTimestamp = (timestamp) => {
const date = new Date(timestamp);
const options = {year: 'numeric', month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit'};
function formatTimestamp(timestamp) {
var date = new Date(timestamp);
var options = {year: 'numeric', month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit'};
return date.toLocaleDateString(undefined, options);
}
</script>
Expand Down Expand Up @@ -223,19 +228,19 @@

<script>
try {
const decodedMessage = htmlDecode('${message.body()?json_string}');
const messageData = JSON.parse(decodedMessage);
document.getElementById('collapsible-${message_index}').textContent = JSON.stringify(messageData, null, 4);
var decodedMessage = htmlDecode('${message.body()?json_string}');
var messageData = JSON.parse(decodedMessage);
var formattedJson = JSON.stringify(messageData, null, 4);
document.getElementById('collapsible-${message_index}').textContent = formattedJson;
} catch (error) {
document.getElementById('collapsible-${message_index}').textContent = '${message.body()}';
}

document.addEventListener('DOMContentLoaded', (event) => {
document
.querySelectorAll('.message__timestamp')
.forEach((elem) => {
const timestamp = elem.getAttribute('data-timestamp');
elem.textContent = formatTimestamp(timestamp);
var timestamps = document.querySelectorAll('.message__timestamp');
timestamps.forEach(function (elem) {
var timestamp = elem.getAttribute('data-timestamp');
elem.textContent = formatTimestamp(timestamp);
});
});
</script>
Expand Down

0 comments on commit 0c87c00

Please sign in to comment.