diff --git a/.github/workflows/cc.yaml b/.github/workflows/cc.yaml new file mode 100644 index 0000000..58847dc --- /dev/null +++ b/.github/workflows/cc.yaml @@ -0,0 +1,17 @@ +name: Continuous Check + +on: [push] + +jobs: + build: + name: Continuous Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Java 21 + uses: actions/setup-java@v4 + with: + java-version: 21 + distribution: temurin + - name: Build with Gradle + run: ./gradlew check --info \ No newline at end of file diff --git a/.gitignore b/.gitignore index 524f096..81dd035 100644 --- a/.gitignore +++ b/.gitignore @@ -1,24 +1,5 @@ -# Compiled class file -*.class - -# Log file -*.log - -# BlueJ files -*.ctxt - -# Mobile Tools for Java (J2ME) -.mtj.tmp/ - -# Package Files # -*.jar -*.war -*.nar -*.ear -*.zip -*.tar.gz -*.rar - -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* -replay_pid* +build/** +out/** +bin/** +.idea +.gradle \ No newline at end of file diff --git a/README.md b/README.md index c2f4773..980ee92 100644 --- a/README.md +++ b/README.md @@ -1 +1,39 @@ -# s2324v-se3 \ No newline at end of file +# s2324v-se3 + +Reference implementation for the Concurrent Programming final project, Spring 2024. + +## Design notes + +The code in this repository implements a single-node _publish-subscriber_ system (pub-sub), with a TCP/IP interface. + +Client systems interact with the pub-sub system via the establishment of TCP/IP connections and the sending of +_requests_ through these connections. +The available _request_ types are defined in [ClientRequest](src/main/kotlin/pt/isel/pc/protocol/ClientRequest.kt). +For each _request_, the server must reply with a _response_ - +[ClientResponse](src/main/kotlin/pt/isel/pc/protocol/ClientResponse.kt). + +The server can also take the initiative to send messages to clients, without them being responses to requests. +These messages are called _server pushes_ and are defined in [ServerPush](src/main/kotlin/pt/isel/pc/protocol/ServerPush.kt). + +The way strings are transformed into _requests_ (i.e. parsing) +and how _responses_ and _server pushes_ are transformed into strings (i.e. serialization) is defined in +[ParseAndSerialize](src/main/kotlin/pt/isel/pc/protocol/ParseAndSerialize.kt). + +The design for some components of this system is inspired in the [actor model](https://en.wikipedia.org/wiki/Actor_model). + +The [Server](src/main/kotlin/pt/isel/pc/Server.kt) is responsible for: +- Creating a server socket and binding it to an IP address and a port. +- Manage the association between topics and subscribers. +- Accept connections from remote clients and create the data structures and threads required to deal with those clients. +- The server is composed by two threads: + - a _main_ thread, implementing the main server domain logic and state, namely the list of remote clients and topics. + - a _accept_ thread, responsible for accepting new connections. +- The _main_ thread runs a loop that + - retrieves _control messages_ from a _control queue_. + - changes internal state based on those control messages. + - produces side effects based on those control message. +- With few exceptions (e.g. the server socket), all server state is exclusively managed by the _main_ thread. +- All interaction with the server is done by sending control messages to the server's _control queue_. +- +The [RemoteClient] [RemoteClient](src/main/kotlin/pt/isel/pc/RemoteClient.kt) is responsible to interact with a +specific connected client, and follows a similar design. diff --git a/build.gradle.kts b/build.gradle.kts new file mode 100644 index 0000000..4014dee --- /dev/null +++ b/build.gradle.kts @@ -0,0 +1,61 @@ + + +plugins { + kotlin("jvm") version "1.9.21" +} + +group = "pt.isel" +version = "1.0-SNAPSHOT" + +repositories { + mavenCentral() +} + +val ktlint: Configuration by configurations.creating + +dependencies { + implementation("org.slf4j:slf4j-api:1.7.36") + implementation("org.slf4j:slf4j-simple:2.0.0-alpha7") + + testImplementation("org.jetbrains.kotlin:kotlin-test") + ktlint("com.pinterest:ktlint:0.48.2") { + attributes { + attribute(Bundling.BUNDLING_ATTRIBUTE, objects.named(Bundling.EXTERNAL)) + } + } +} + +tasks.test { + useJUnitPlatform() + // To access the *non-public* Continuation API + // ONLY for learning purposes + jvmArgs(listOf( + "--add-exports", "java.base/jdk.internal.vm=ALL-UNNAMED" + )) +} +kotlin { + jvmToolchain(21) +} + +val outputDir = "${layout.buildDirectory}/reports/ktlint/" +val inputFiles = project.fileTree(mapOf("dir" to "src", "include" to "**/*.kt")) +val ktlintCheck by tasks.creating(JavaExec::class) { + inputs.files(inputFiles) + outputs.dir(outputDir) + + description = "Check Kotlin code style." + classpath = ktlint + mainClass.set("com.pinterest.ktlint.Main") + args = listOf("src/**/*.kt") +} + +tasks.named("check") { + dependsOn("ktlintCheck") +} + +// Use the following to run: +// java -cp "build/libs/*:build/classes/kotlin/main" .Kt +tasks.register("packLibs") { + from(configurations.runtimeClasspath) + into("build/libs") +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..7fc6f1f --- /dev/null +++ b/gradle.properties @@ -0,0 +1 @@ +kotlin.code.style=official diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..249e583 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..50a0971 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Mon Feb 19 19:18:35 WET 2024 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..1b6c787 --- /dev/null +++ b/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..ac1b06f --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle.kts b/settings.gradle.kts new file mode 100644 index 0000000..5ae9dad --- /dev/null +++ b/settings.gradle.kts @@ -0,0 +1,5 @@ +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.5.0" +} +rootProject.name = "pc" + diff --git a/src/main/kotlin/pt/isel/pc/PublishedMessage.kt b/src/main/kotlin/pt/isel/pc/PublishedMessage.kt new file mode 100644 index 0000000..1fec0df --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/PublishedMessage.kt @@ -0,0 +1,9 @@ +package pt.isel.pc + +/** + * Represents a message published to a topic. + */ +data class PublishedMessage( + val topicName: TopicName, + val content: String, +) diff --git a/src/main/kotlin/pt/isel/pc/RemoteClient.kt b/src/main/kotlin/pt/isel/pc/RemoteClient.kt new file mode 100644 index 0000000..9e1bb3e --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/RemoteClient.kt @@ -0,0 +1,186 @@ +package pt.isel.pc + +import org.slf4j.LoggerFactory +import pt.isel.pc.protocol.ClientRequest +import pt.isel.pc.protocol.ClientResponse +import pt.isel.pc.protocol.ServerPush +import pt.isel.pc.protocol.parseClientRequest +import pt.isel.pc.protocol.serialize +import pt.isel.pc.utils.SuccessOrError +import pt.isel.pc.utils.sendLine +import java.io.BufferedWriter +import java.io.Writer +import java.net.Socket +import java.util.concurrent.LinkedBlockingQueue +import kotlin.concurrent.thread + +/** + * The component responsible to interact with a remote client, via a [Socket]. + */ +class RemoteClient private constructor( + private val server: Server, + val clientId: String, + private val clientSocket: Socket, +) : Subscriber { + private val controlQueue = LinkedBlockingQueue() + private val controlThread: Thread + private val readThread: Thread + private var state = State.RUNNING + + init { + controlThread = thread(isDaemon = true) { + logger.info("[{}] Remote client started main thread", clientId) + controlLoop() + } + readThread = thread(isDaemon = true) { + logger.info("[{}] Remote client started read thread", clientId) + readLoop() + } + } + + fun shutdown() { + controlQueue.put(ControlMessage.Shutdown) + } + + override fun send(message: PublishedMessage) { + controlQueue.put(ControlMessage.Message(message)) + } + + private fun handleShutdown(writer: Writer) { + if (state != State.RUNNING) { + return + } + writer.sendLine(serialize(ServerPush.Bye)) + clientSocket.close() + state = State.SHUTTING_DOWN + } + + private fun handleMessage(writer: BufferedWriter, message: PublishedMessage) { + if (state != State.RUNNING) { + return + } + writer.sendLine(serialize(ServerPush.PublishedMessage(message))) + } + + private fun handleClientSocketLine(writer: BufferedWriter, line: String) { + if (state != State.RUNNING) { + return + } + val response = when (val res = parseClientRequest(line)) { + is SuccessOrError.Success -> when (val request = res.value) { + is ClientRequest.Publish -> { + server.publish(PublishedMessage(request.topic, request.message)) + ClientResponse.OkPublish + } + + is ClientRequest.Subscribe -> { + request.topics.forEach { + server.subscribe(it, this) + } + ClientResponse.OkSubscribe + } + + is ClientRequest.Unsubscribe -> { + request.topics.forEach { + server.unsubscribe(it, this) + } + ClientResponse.OkUnsubscribe + } + } + + is SuccessOrError.Error -> { + ClientResponse.Error(res.error) + } + } + writer.sendLine(serialize(response)) + } + + private fun handleClientSocketError(throwable: Throwable) { + logger.info("Client socket operation thrown: {}", throwable.message) + } + + private fun handleClientSocketEnded() { + if (state != State.RUNNING) { + return + } + state = State.SHUTTING_DOWN + } + + private fun handleReadLoopEnded() { + state = State.SHUTDOWN + } + + private fun controlLoop() { + try { + clientSocket.getOutputStream().bufferedWriter().use { writer -> + writer.sendLine(serialize(ServerPush.Hi)) + while (state != State.SHUTDOWN) { + val controlMessage = controlQueue.take() + logger.info("[{}] main thread received {}", clientId, controlMessage) + when (controlMessage) { + ControlMessage.Shutdown -> handleShutdown(writer) + is ControlMessage.Message -> handleMessage(writer, controlMessage.value) + is ControlMessage.ClientSocketLine -> handleClientSocketLine(writer, controlMessage.value) + ControlMessage.ClientSocketEnded -> handleClientSocketEnded() + + is ControlMessage.ClientSocketError -> handleClientSocketError(controlMessage.throwable) + + ControlMessage.ReadLoopEnded -> handleReadLoopEnded() + } + } + } + } finally { + logger.info("[{}] remote client ending", clientId) + server.remoteClientEnded(this) + } + } + + private fun readLoop() { + clientSocket.getInputStream().bufferedReader().use { reader -> + try { + while (true) { + val line: String? = reader.readLine() + if (line == null) { + logger.info("[{}] end of input stream reached", clientId) + controlQueue.put(ControlMessage.ClientSocketEnded) + return + } + logger.info("[{}] line received: {}", clientId, line) + controlQueue.put(ControlMessage.ClientSocketLine(line)) + } + } catch (ex: Throwable) { + logger.info("[{}]Exception on read loop: {}, {}", clientId, ex.javaClass.name, ex.message) + controlQueue.put(ControlMessage.ClientSocketError(ex)) + } finally { + logger.info("[{}] client loop ending", clientId) + controlQueue.put(ControlMessage.ReadLoopEnded) + } + } + } + + companion object { + private val logger = LoggerFactory.getLogger(RemoteClient::class.java) + fun start(server: Server, clientId: String, socket: Socket): RemoteClient { + return RemoteClient( + server, + clientId, + socket, + ) + } + } + + private sealed interface ControlMessage { + data class Message(val value: PublishedMessage) : ControlMessage + data object Shutdown : ControlMessage + data object ClientSocketEnded : ControlMessage + data class ClientSocketError(val throwable: Throwable) : ControlMessage + data class ClientSocketLine(val value: String) : ControlMessage + data object ReadLoopEnded : ControlMessage + } + + private enum class State { + RUNNING, + SHUTTING_DOWN, + SHUTDOWN, + } +} diff --git a/src/main/kotlin/pt/isel/pc/Server.kt b/src/main/kotlin/pt/isel/pc/Server.kt new file mode 100644 index 0000000..1d5f227 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/Server.kt @@ -0,0 +1,192 @@ +package pt.isel.pc + +import org.slf4j.LoggerFactory +import java.net.ServerSocket +import java.net.Socket +import java.net.SocketAddress +import java.util.concurrent.LinkedBlockingQueue +import kotlin.concurrent.thread + +/** + * The server component. + */ +class Server private constructor( + private val serverSocket: ServerSocket, + private val controlQueue: LinkedBlockingQueue, +) { + + private val controlThread: Thread + private val acceptThread: Thread + private val clientSet = mutableSetOf() + private val topicSet = TopicSet() + + private var currentClientId = 0 + private var state = State.RUNNING + private var acceptThreadEnded = false + + init { + controlThread = thread(isDaemon = true) { + controlLoop() + } + acceptThread = thread(isDaemon = true) { + acceptLoop() + } + } + + fun shutdown() { + controlQueue.put(ControlMessage.Shutdowm) + } + + fun publish(message: PublishedMessage) { + controlQueue.put(ControlMessage.Publish(message)) + } + + fun subscribe(topicName: TopicName, subscriber: Subscriber) { + controlQueue.put(ControlMessage.Subscribe(topicName, subscriber)) + } + + fun unsubscribe(topicName: TopicName, subscriber: Subscriber) { + controlQueue.put(ControlMessage.Unsubscribe(topicName, subscriber)) + } + + fun remoteClientEnded(client: RemoteClient) { + controlQueue.put(ControlMessage.RemoteClientEnded(client)) + } + + fun join() { + controlThread.join() + } + + private fun handleNewClientSocket(clientSocket: Socket) { + if (state != State.RUNNING) { + clientSocket.close() + return + } + val newId = currentClientId++ + val remoteClient = RemoteClient.start(this, newId.toString(), clientSocket) + clientSet.add(remoteClient) + logger.info("Server: started new remote client") + } + + private fun handleRemoteClientEnded(remoteClient: RemoteClient) { + clientSet.remove(remoteClient) + topicSet.unsubscribe(remoteClient) + logger.info("Server: remote client ended {}", remoteClient.clientId) + if (state == State.SHUTTING_DOWN) { + if (clientSet.isEmpty() && acceptThreadEnded) { + state = State.SHUTDOWN + } + } + } + + private fun handlePublish(message: PublishedMessage) { + topicSet.getSubscribersFor(message.topicName).forEach { + it.send(message) + } + } + + private fun handleSubscribe(topicName: TopicName, subscriber: Subscriber) { + topicSet.subscribe(topicName, subscriber) + } + + private fun handleUnsubscribe(topicName: TopicName, subscriber: Subscriber) { + topicSet.unsubscribe(topicName, subscriber) + } + + private fun handleShutdown() { + if (state != State.RUNNING) { + return + } + startShutdown() + } + + private fun startShutdown() { + serverSocket.close() + clientSet.forEach { + it.shutdown() + } + state = State.SHUTTING_DOWN + } + + private fun handleAcceptLoopEnded() { + acceptThreadEnded = true + if (state != State.SHUTTING_DOWN) { + logger.info("Accept loop ended unexpectedly") + startShutdown() + } + if (clientSet.isEmpty()) { + state = State.SHUTDOWN + } + } + + private fun controlLoop() { + try { + while (state != State.SHUTDOWN) { + try { + when (val controlMessage = controlQueue.take()) { + is ControlMessage.NewClientSocket -> handleNewClientSocket(controlMessage.clientSocket) + is ControlMessage.RemoteClientEnded -> handleRemoteClientEnded(controlMessage.remoteClient) + is ControlMessage.Publish -> handlePublish(controlMessage.message) + is ControlMessage.Subscribe -> handleSubscribe( + controlMessage.topicName, + controlMessage.subscriber, + ) + is ControlMessage.Unsubscribe -> handleUnsubscribe( + controlMessage.topicName, + controlMessage.subscriber, + ) + ControlMessage.Shutdowm -> handleShutdown() + ControlMessage.AcceptLoopEnded -> handleAcceptLoopEnded() + } + } catch (ex: Throwable) { + logger.info("Unexpected exception, ignoring it", ex) + } + } + } finally { + logger.info("server ending") + } + } + + private fun acceptLoop() { + try { + while (true) { + // TODO add limitation to the number of active sockets + val clientSocket = serverSocket.accept() + logger.info("New client socket accepted") + controlQueue.put(ControlMessage.NewClientSocket(clientSocket)) + } + } catch (ex: Exception) { + logger.info("Exception on accept loop: {}", ex.message) + // continue + } finally { + controlQueue.put(ControlMessage.AcceptLoopEnded) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(Server::class.java) + fun start(address: SocketAddress): Server { + val serverSocket = ServerSocket() + serverSocket.bind(address) + val controlQueue = LinkedBlockingQueue() + return Server(serverSocket, controlQueue) + } + } + + private sealed interface ControlMessage { + data class NewClientSocket(val clientSocket: Socket) : ControlMessage + data class RemoteClientEnded(val remoteClient: RemoteClient) : ControlMessage + data class Publish(val message: PublishedMessage) : ControlMessage + data class Subscribe(val topicName: TopicName, val subscriber: Subscriber) : ControlMessage + data class Unsubscribe(val topicName: TopicName, val subscriber: Subscriber) : ControlMessage + + data object Shutdowm : ControlMessage + data object AcceptLoopEnded : ControlMessage + } + + private enum class State { + RUNNING, + SHUTTING_DOWN, + SHUTDOWN, + } +} diff --git a/src/main/kotlin/pt/isel/pc/Subscriber.kt b/src/main/kotlin/pt/isel/pc/Subscriber.kt new file mode 100644 index 0000000..91b9c65 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/Subscriber.kt @@ -0,0 +1,8 @@ +package pt.isel.pc + +/** + * A subscriber of messages sent to topics. + */ +interface Subscriber { + fun send(message: PublishedMessage) +} diff --git a/src/main/kotlin/pt/isel/pc/Topic.kt b/src/main/kotlin/pt/isel/pc/Topic.kt new file mode 100644 index 0000000..957cc2f --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/Topic.kt @@ -0,0 +1,11 @@ +package pt.isel.pc + +/** + * Represents a topic and the subscribers to those topics. + * + */ +class Topic( + val name: TopicName, +) { + val subscribers = mutableSetOf() +} diff --git a/src/main/kotlin/pt/isel/pc/TopicName.kt b/src/main/kotlin/pt/isel/pc/TopicName.kt new file mode 100644 index 0000000..f8ae70c --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/TopicName.kt @@ -0,0 +1,7 @@ +package pt.isel.pc + +/** + * A topic name. + */ +@JvmInline +value class TopicName(val value: String) diff --git a/src/main/kotlin/pt/isel/pc/TopicSet.kt b/src/main/kotlin/pt/isel/pc/TopicSet.kt new file mode 100644 index 0000000..cf0595e --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/TopicSet.kt @@ -0,0 +1,41 @@ +package pt.isel.pc + +/** + * Represents a set of topics, as well as the subscribers to those topics. + */ +class TopicSet { + + private val topicsByName = mutableMapOf() + private val topicsBySubscriber = mutableMapOf>() + + fun subscribe(name: TopicName, subscriber: Subscriber) { + val topic = topicsByName.computeIfAbsent(name) { Topic(name) } + topic.subscribers.add(subscriber) + topicsBySubscriber.computeIfAbsent(subscriber) { mutableSetOf() }.add(topic) + } + + fun unsubscribe(name: TopicName, subscriber: Subscriber) { + val topic = topicsByName[name] ?: return + topic.subscribers.remove(subscriber) + if (topic.subscribers.isEmpty()) { + topicsByName.remove(name) + } + val topicSet = topicsBySubscriber[subscriber] ?: return + topicSet.remove(topic) + if (topicSet.isEmpty()) { + topicsBySubscriber.remove(subscriber) + } + } + + fun unsubscribe(subscriber: Subscriber) { + val topicSet = topicsBySubscriber[subscriber] ?: return + topicSet.toList().forEach { + unsubscribe(it.name, subscriber) + } + } + + fun getSubscribersFor(name: TopicName): Set = + topicsByName[name]?.subscribers?.toSet() ?: setOf() + + fun getTopicWithName(topicName: TopicName): Topic? = topicsByName[topicName] +} diff --git a/src/main/kotlin/pt/isel/pc/app/App.kt b/src/main/kotlin/pt/isel/pc/app/App.kt new file mode 100644 index 0000000..3a245b5 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/app/App.kt @@ -0,0 +1,26 @@ +package pt.isel.pc.app + +import org.slf4j.LoggerFactory +import pt.isel.pc.Server +import java.net.InetSocketAddress + +fun main() { + // start server + val server = Server.start(InetSocketAddress("0.0.0.0", 8080)) + logger.info("Started server") + + // register shutdown hook + val shutdownThread = Thread { + logger.info("Starting shutdown process") + server.shutdown() + server.join() + } + Runtime.getRuntime().addShutdownHook(shutdownThread) + + // wait for server to end + logger.info("Waiting for server to end") + server.join() + logger.info("main ending") +} + +private val logger = LoggerFactory.getLogger("App") diff --git a/src/main/kotlin/pt/isel/pc/protocol/ClientRequest.kt b/src/main/kotlin/pt/isel/pc/protocol/ClientRequest.kt new file mode 100644 index 0000000..29ced87 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/protocol/ClientRequest.kt @@ -0,0 +1,12 @@ +package pt.isel.pc.protocol + +import pt.isel.pc.TopicName + +/** + * Sealed hierarchy (i.e. union type) to represent client requests. + */ +sealed interface ClientRequest { + data class Publish(val topic: TopicName, val message: String) : ClientRequest + data class Subscribe(val topics: List) : ClientRequest + data class Unsubscribe(val topics: List) : ClientRequest +} diff --git a/src/main/kotlin/pt/isel/pc/protocol/ClientRequestError.kt b/src/main/kotlin/pt/isel/pc/protocol/ClientRequestError.kt new file mode 100644 index 0000000..b3d0340 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/protocol/ClientRequestError.kt @@ -0,0 +1,10 @@ +package pt.isel.pc.protocol + +/** + * Sealed hierarchy to represent error responses to client requests. + */ +sealed interface ClientRequestError { + data object MissingCommandName : ClientRequestError + data object UnknownCommandName : ClientRequestError + data object InvalidArguments : ClientRequestError +} diff --git a/src/main/kotlin/pt/isel/pc/protocol/ClientResponse.kt b/src/main/kotlin/pt/isel/pc/protocol/ClientResponse.kt new file mode 100644 index 0000000..12d2fb1 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/protocol/ClientResponse.kt @@ -0,0 +1,11 @@ +package pt.isel.pc.protocol + +/** + * Sealed hierarchy to represent success or error responses to client requests + */ +sealed interface ClientResponse { + data object OkPublish : ClientResponse + data object OkSubscribe : ClientResponse + data object OkUnsubscribe : ClientResponse + data class Error(val error: ClientRequestError) : ClientResponse +} diff --git a/src/main/kotlin/pt/isel/pc/protocol/ParseAndSerialize.kt b/src/main/kotlin/pt/isel/pc/protocol/ParseAndSerialize.kt new file mode 100644 index 0000000..eae9600 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/protocol/ParseAndSerialize.kt @@ -0,0 +1,93 @@ +package pt.isel.pc.protocol + +import pt.isel.pc.TopicName +import pt.isel.pc.utils.SuccessOrError + +interface ClientRequestParser { + val name: String + fun parse(args: List, line: String): SuccessOrError +} + +object PublishParser : ClientRequestParser { + override val name = "PUBLISH" + override fun parse(args: List, line: String): SuccessOrError { + if (args.isEmpty()) { + return SuccessOrError.error(ClientRequestError.InvalidArguments) + } + val topicName = args[0] + var index = name.length + // skip spaces + while (line[index] == ' ') { + index += 1 + } + // skip topic name + index += topicName.length + val message = if (index + 1 >= line.length) { + "" + } else { + line.substring(index + 1) + } + return SuccessOrError.success(ClientRequest.Publish(TopicName(topicName), message)) + } +} + +object SubscribeParser : ClientRequestParser { + override val name = "SUBSCRIBE" + + override fun parse(args: List, line: String): SuccessOrError { + if (args.isEmpty()) { + return SuccessOrError.error(ClientRequestError.InvalidArguments) + } + return SuccessOrError.success(ClientRequest.Subscribe(args.map { TopicName(it) })) + } +} + +object UnsubscribeParser : ClientRequestParser { + override val name = "UNSUBSCRIBE" + override fun parse(args: List, line: String): SuccessOrError { + if (args.isEmpty()) { + return SuccessOrError.error(ClientRequestError.InvalidArguments) + } + return SuccessOrError.success(ClientRequest.Unsubscribe(args.map { TopicName(it) })) + } +} + +private val clientRequestParsers = listOf( + UnsubscribeParser, + SubscribeParser, + PublishParser, +).associateBy { it.name } + +fun parseClientRequest(line: String): SuccessOrError { + val trimmedLine = line.trimStart() + val parts = trimmedLine.split(" ") + if (parts.isEmpty() || parts[0].isEmpty()) { + return SuccessOrError.error(ClientRequestError.MissingCommandName) + } + val commandName = parts[0] + val args = parts.subList(1, parts.size) + return clientRequestParsers[commandName]?.parse(args, trimmedLine) + ?: SuccessOrError.error(ClientRequestError.UnknownCommandName) +} + +fun serialize(error: ClientRequestError): String = + when (error) { + is ClientRequestError.InvalidArguments -> "INVALID_ARGUMENTS" + ClientRequestError.MissingCommandName -> "MISSING_COMMAND_NAME" + ClientRequestError.UnknownCommandName -> "UNKNOWN_COMMAND_NAME" + } + +fun serialize(response: ClientResponse): String = + when (response) { + is ClientResponse.Error -> "-${serialize(response.error)}" + is ClientResponse.OkPublish -> "+" + ClientResponse.OkSubscribe -> "+" + ClientResponse.OkUnsubscribe -> "+" + } + +fun serialize(serverPush: ServerPush): String = + when (serverPush) { + is ServerPush.PublishedMessage -> ">${serverPush.message.topicName.value} ${serverPush.message.content}" + is ServerPush.Hi -> "!hi" + is ServerPush.Bye -> "!bye" + } diff --git a/src/main/kotlin/pt/isel/pc/protocol/ServerPush.kt b/src/main/kotlin/pt/isel/pc/protocol/ServerPush.kt new file mode 100644 index 0000000..9227a5f --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/protocol/ServerPush.kt @@ -0,0 +1,10 @@ +package pt.isel.pc.protocol + +/** + * Sealed hierarchy to represent messages sent by the server that are not responses to requests. + */ +sealed interface ServerPush { + data class PublishedMessage(val message: pt.isel.pc.PublishedMessage) : ServerPush + data object Hi : ServerPush + data object Bye : ServerPush +} diff --git a/src/main/kotlin/pt/isel/pc/utils/SuccessOrError.kt b/src/main/kotlin/pt/isel/pc/utils/SuccessOrError.kt new file mode 100644 index 0000000..c0eb163 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/utils/SuccessOrError.kt @@ -0,0 +1,29 @@ +package pt.isel.pc.utils + +/** + * Sealed hierarchy to represent success or error results. + */ +sealed interface SuccessOrError { + + val successOrThrow: TSuccess + val errorOrThrow: TError + + data class Success(val value: TSuccess) : SuccessOrError { + override val successOrThrow: TSuccess + get() = value + override val errorOrThrow: Nothing + get() = throw IllegalStateException() + } + + data class Error(val error: TError) : SuccessOrError { + override val successOrThrow: Nothing + get() = throw IllegalStateException() + override val errorOrThrow: TError + get() = error + } + + companion object { + fun success(value: TSuccess) = Success(value) + fun error(error: TError) = Error(error) + } +} diff --git a/src/main/kotlin/pt/isel/pc/utils/WriterUtils.kt b/src/main/kotlin/pt/isel/pc/utils/WriterUtils.kt new file mode 100644 index 0000000..c13a5b5 --- /dev/null +++ b/src/main/kotlin/pt/isel/pc/utils/WriterUtils.kt @@ -0,0 +1,8 @@ +package pt.isel.pc.utils + +import java.io.Writer + +fun Writer.sendLine(line: String) { + appendLine(line) + flush() +} diff --git a/src/test/kotlin/pt/isel/pc/ParseTests.kt b/src/test/kotlin/pt/isel/pc/ParseTests.kt new file mode 100644 index 0000000..ebc7650 --- /dev/null +++ b/src/test/kotlin/pt/isel/pc/ParseTests.kt @@ -0,0 +1,100 @@ +package pt.isel.pc + +import pt.isel.pc.protocol.ClientRequest +import pt.isel.pc.protocol.ClientRequestError +import pt.isel.pc.protocol.ClientResponse +import pt.isel.pc.protocol.ServerPush +import pt.isel.pc.protocol.parseClientRequest +import kotlin.test.Test +import kotlin.test.assertEquals + +class ParseTests { + + @Test + fun `test success parse cases`() { + assertEquals( + ClientRequest.Publish(TopicName("t1"), "hello world"), + parseClientRequest("PUBLISH t1 hello world").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), "hello world"), + parseClientRequest(" PUBLISH t1 hello world").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), " hello world"), + parseClientRequest("PUBLISH t1 hello world").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), " hello world "), + parseClientRequest("PUBLISH t1 hello world ").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), ""), + parseClientRequest("PUBLISH t1").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), ""), + parseClientRequest("PUBLISH t1 ").successOrThrow, + ) + assertEquals( + ClientRequest.Publish(TopicName("t1"), ""), + parseClientRequest(" PUBLISH t1 ").successOrThrow, + ) + } + + @Test + fun `test error parse cases`() { + assertEquals( + ClientRequestError.MissingCommandName, + parseClientRequest("").errorOrThrow, + ) + assertEquals( + ClientRequestError.MissingCommandName, + parseClientRequest(" ").errorOrThrow, + ) + assertEquals( + ClientRequestError.UnknownCommandName, + parseClientRequest("NOT-A-COMMAND").errorOrThrow, + ) + assertEquals( + ClientRequestError.InvalidArguments, + parseClientRequest("PUBLISH").errorOrThrow, + ) + assertEquals( + ClientRequestError.InvalidArguments, + parseClientRequest("SUBSCRIBE").errorOrThrow, + ) + } + + @Test + fun `test toString`() { + assertEquals( + "+", + pt.isel.pc.protocol.serialize(ClientResponse.OkPublish), + ) + + assertEquals( + "+", + pt.isel.pc.protocol.serialize(ClientResponse.OkSubscribe), + ) + + assertEquals( + "+", + pt.isel.pc.protocol.serialize(ClientResponse.OkUnsubscribe), + ) + + assertEquals( + "-INVALID_ARGUMENTS", + pt.isel.pc.protocol.serialize(ClientResponse.Error(ClientRequestError.InvalidArguments)), + ) + + assertEquals( + ">the-topic the content", + pt.isel.pc.protocol.serialize( + ServerPush.PublishedMessage( + PublishedMessage(TopicName("the-topic"), "the content"), + ), + ), + ) + } +} diff --git a/src/test/kotlin/pt/isel/pc/TopicSetTests.kt b/src/test/kotlin/pt/isel/pc/TopicSetTests.kt new file mode 100644 index 0000000..2046529 --- /dev/null +++ b/src/test/kotlin/pt/isel/pc/TopicSetTests.kt @@ -0,0 +1,55 @@ +package pt.isel.pc + +import org.junit.jupiter.api.Assertions.assertEquals +import kotlin.test.Test + +class TopicSetTests { + + @Test + fun `basic functionality tests`() { + // given: a topic set + val topicSet = TopicSet() + + // and: arrays of topic names and subscriber + val topicNames = Array(3) { + TopicName(it.toString()) + } + val subscribers = Array(3) { + TestSubscriber() + } + + // when: adding some subscribers to topics + repeat(3) { + topicSet.subscribe(topicNames[0], subscribers[it]) + } + repeat(2) { + topicSet.subscribe(topicNames[1], subscribers[it]) + } + repeat(1) { + topicSet.subscribe(topicNames[2], subscribers[it]) + } + + // then: + assertEquals(3, topicSet.getSubscribersFor(topicNames[0]).size) + assertEquals(2, topicSet.getSubscribersFor(topicNames[1]).size) + assertEquals(1, topicSet.getSubscribersFor(topicNames[2]).size) + + // when: removing a subscriber from a topic 0 + topicSet.unsubscribe(topicNames[0], subscribers[2]) + + // then: topic 0 has only two subscribers + assertEquals(2, topicSet.getSubscribersFor(topicNames[0]).size) + + // when: removing a subscriber 2 + topicSet.unsubscribe(subscribers[2]) + + // then: topic 2 does not exist anymore + topicSet.getTopicWithName(topicNames[2]) + } + + private class TestSubscriber : Subscriber { + override fun send(message: PublishedMessage) { + // Nothing, for testing purposes only + } + } +}