Skip to content

Commit

Permalink
stop threads (#74)
Browse files Browse the repository at this point in the history
* Interrupt threads before shutdown

* Set logging to debug level on dev.watch()

* Use new version of rlo_12 and rlo_13 with differnt way of handling DEW threads to enable graceful shutdown

* Removes unnecessary logging messages
  • Loading branch information
StrongestNumber9 authored Jul 27, 2023
1 parent 83551b1 commit 4c41020
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 57 deletions.
13 changes: 11 additions & 2 deletions example/combined.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ metadata:
app: first-pod-appname
app-stderr: first-pod-appname-stderr
env: dev
hostname: first-pod
hostname-stderr: first-pod-stderr
name: first-pod
namespace: default
spec:
Expand Down Expand Up @@ -268,7 +270,7 @@ spec:
restartPolicy: Always
serviceAccount: kubelogreader
serviceAccountName: kubelogreader
terminationGracePeriodSeconds: 0
terminationGracePeriodSeconds: 30
volumes:
- configMap:
name: app-config-8dtf4m92md
Expand Down Expand Up @@ -308,6 +310,13 @@ spec:
- /usr/bin/java
image: ghcr.io/teragrep/rlp_07/app:latest
imagePullPolicy: IfNotPresent
lifecycle:
preStop:
exec:
command:
- sh
- -c
- echo presleep; sleep 10;
name: first-pod
ports:
- containerPort: 1601
Expand All @@ -321,7 +330,7 @@ spec:
imagePullSecrets:
- name: ghcr.io
restartPolicy: Always
terminationGracePeriodSeconds: 0
terminationGracePeriodSeconds: 30
volumes:
- configMap:
name: receiver-config-cf8g6bh7tg
Expand Down
2 changes: 2 additions & 0 deletions example/pods/first-pod.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"name": "first-pod",
"namespace": "default",
"labels": {
"hostname": "first-pod",
"hostname-stderr": "first-pod-stderr",
"app": "first-pod-appname",
"app-stderr": "first-pod-appname-stderr",
"env": "dev"
Expand Down
2 changes: 1 addition & 1 deletion example/pods/k8s_01.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"serviceAccount": "kubelogreader",
"serviceAccountName": "kubelogreader",
"dnsPolicy": "ClusterFirst",
"terminationGracePeriodSeconds": 0,
"terminationGracePeriodSeconds": 30,
"volumes": [
{
"name": "app-config",
Expand Down
13 changes: 12 additions & 1 deletion example/pods/receiver.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"spec": {
"hostname": "receiver",
"dnsPolicy": "ClusterFirst",
"terminationGracePeriodSeconds": 0,
"terminationGracePeriodSeconds": 30,
"volumes": [
{
"name": "receiver-config",
Expand All @@ -39,6 +39,17 @@
"name": "receiver-config"
}
],
"lifecycle": {
"preStop": {
"exec": {
"command": [
"sh",
"-c",
"echo presleep; sleep 10;"
]
}
}
},
"command": [
"/usr/bin/java"
],
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlo_13</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
</dependency>

<!-- RELP sending library -->
Expand Down
81 changes: 32 additions & 49 deletions src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class KubernetesLogReader {
private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesLogReader.class);
private static final MetricRegistry metricRegistry = new MetricRegistry();
static Gson gson = new Gson();
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, InterruptedException {
AppConfig appConfig;
try {
try(InputStreamReader isr = new InputStreamReader(Files.newInputStream(Paths.get("etc/config.json")), StandardCharsets.UTF_8)) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public static void main(String[] args) throws IOException {
Arrays.toString(logfiles)
);

List<Thread> threads = new ArrayList<>();
List<DirectoryEventWatcher> dews = new ArrayList<>();
String statesStore = System.getProperty("user.dir") + "/var";
LOGGER.debug(
"Using {} as statestore",
Expand All @@ -143,8 +143,20 @@ public static void main(String[] args) throws IOException {
// Graceful shutdown so Relp sessions are gracefully terminated
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutting down.");
for(DirectoryEventWatcher dew : dews) {
LOGGER.debug("Shutting down dew " + dew);
try {
dew.stop();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
LOGGER.info(
"Disconnecting {} relp threads",
outputThreads
);
for(int i=1; i <= outputThreads; i++) {
LOGGER.info(
LOGGER.debug(
"Disconnecting relp thread #{}/{}",
i,
outputThreads
Expand All @@ -164,52 +176,23 @@ public static void main(String[] args) throws IOException {

// Start a new thread for all logfile watchers
for (String logfile : logfiles) {
Thread thread = new Thread(() -> {
LOGGER.debug(
"Starting new DirectoryEventWatcher thread on directory '{}' with pattern '{}'",
appConfig.getKubernetes().getLogdir(),
logfile
);
try {
DirectoryEventWatcher dew = new DirectoryEventWatcher(
Paths.get(appConfig.getKubernetes().getLogdir()),
false,
Pattern.compile(logfile),
statefulFileReader,
500,
TimeUnit.MILLISECONDS,
appConfig.getKubernetes().getMaxLogReadingThreads()
);
dew.watch();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.setName("DEW-" + threads.size());
thread.start();
threads.add(thread);
}

// FIXME: Is this necessary
for (Thread thread : threads) {
if(LOGGER.isTraceEnabled()) {
LOGGER.trace(
"Waiting for thread {}#{} to finish",
thread.getName(),
thread.getId()
);
}
try {
thread.join();
} catch (InterruptedException e) {
LOGGER.error(
"Failed to stop thread {}#{}:",
thread.getName(),
thread.getId(),
e
);
throw new RuntimeException(e);
}
LOGGER.debug(
"Starting new DirectoryEventWatcher on directory '{}' with pattern '{}'",
appConfig.getKubernetes().getLogdir(),
logfile
);
DirectoryEventWatcher dew = new DirectoryEventWatcher(
Paths.get(appConfig.getKubernetes().getLogdir()),
false,
Pattern.compile(logfile),
statefulFileReader,
500,
TimeUnit.MILLISECONDS,
appConfig.getKubernetes().getMaxLogReadingThreads()
);
dew.start();
dews.add(dew);
}
Thread.sleep(Long.MAX_VALUE);
}
}
4 changes: 1 addition & 3 deletions src/main/java/com/teragrep/k8s_01/RelpOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,11 @@ public void disconnect() {
totalConnections.dec();
relpConnection.disconnect();
} catch (IOException | TimeoutException e) {
LOGGER.debug(
LOGGER.info(
"[#{}] Had to teardown connection",
getId()
);
relpConnection.tearDown();
throughputErrors.mark();
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit 4c41020

Please sign in to comment.