diff --git a/core/src/main/kotlin/endpoints/agent/SessionStorage.kt b/core/src/main/kotlin/endpoints/agent/SessionStorage.kt index 4f07efe43..79d47bfff 100644 --- a/core/src/main/kotlin/endpoints/agent/SessionStorage.kt +++ b/core/src/main/kotlin/endpoints/agent/SessionStorage.kt @@ -2,6 +2,8 @@ package com.epam.drill.admin.endpoints.agent import com.epam.drill.admin.common.* import com.epam.drill.admin.endpoints.* +import com.epam.drill.admin.endpoints.plugin.* +import com.epam.drill.admin.plugin.* import com.epam.drill.admin.util.* import io.ktor.http.cio.websocket.* import kotlinx.atomicfu.* @@ -14,14 +16,16 @@ private val logger = KotlinLogging.logger {} class SessionStorage { internal val sessions get() = _sessions.value + internal val subscriptions get() = _subscriptions.value private val _sessions = atomic(emptyBiSetMap()) + private val _subscriptions = atomic(mutableMapOf()) operator fun contains(destination: String): Boolean = destination in sessions.first suspend fun sendTo( destination: String, - message: Any, + message: FrontMessage, type: WsMessageType = WsMessageType.MESSAGE ): Set = sendTo( destination = destination, @@ -30,11 +34,10 @@ class SessionStorage { suspend fun sendTo( destination: String, - messageProvider: () -> String + messageProvider: (Subscription?) -> String ): Set = sessions.first[destination].apply { if (any()) { - val messageStr = messageProvider() - forEach { it.send(destination, messageStr) } + forEach { it.send(destination, messageProvider(subscriptions[it])) } } } diff --git a/core/src/main/kotlin/endpoints/plugin/DrillPluginWs.kt b/core/src/main/kotlin/endpoints/plugin/DrillPluginWs.kt index fa86495c2..3d785d8a1 100644 --- a/core/src/main/kotlin/endpoints/plugin/DrillPluginWs.kt +++ b/core/src/main/kotlin/endpoints/plugin/DrillPluginWs.kt @@ -76,9 +76,12 @@ class DrillPluginWs(override val kodein: Kodein) : KodeinAware { val destination = event.destination val subscriptionKey = destination.toKey(subscription) sessionCache.subscribe(subscriptionKey, this) + sessionCache.subscriptions[this] = subscription val pluginCache = pluginCaches[pluginId] val message = pluginCache[subscriptionKey] ?: "" - val messageToSend = message.toWsMessageAsString(destination, WsMessageType.MESSAGE, subscription) + val messageToSend = message + .processWithSubscription(subscription) + .toWsMessageAsString(destination, WsMessageType.MESSAGE, subscription) send(messageToSend) logger.trace { "Subscribed to $subscriptionKey, ${toDebugString()}" } } diff --git a/core/src/main/kotlin/endpoints/plugin/Filters.kt b/core/src/main/kotlin/endpoints/plugin/Filters.kt new file mode 100644 index 000000000..f34d28017 --- /dev/null +++ b/core/src/main/kotlin/endpoints/plugin/Filters.kt @@ -0,0 +1,46 @@ +package com.epam.drill.admin.endpoints.plugin + +import com.epam.drill.admin.plugin.* +import kotlinx.serialization.* +import kotlin.reflect.* +import kotlin.reflect.full.* + + +@Serializable +data class SearchStatement(val fieldName: String, val value: String) + +@Serializable +data class SortStatement(val fieldName: String, val order: String) + +internal fun FrontMessage.processWithSubscription(subscription: Subscription?): Any = + when (subscription) { + is AgentSubscription -> + applyFilter(subscription.searchStatement) + .applySort(subscription.sortStatement) + else -> this + } + + +private fun FrontMessage.applyFilter(statement: SearchStatement?): FrontMessage = statement?.let { + runCatching { + @Suppress("UNCHECKED_CAST") + (this as? Iterable)?.filter { fMessage -> + val propertyValue = fMessage::class.getProperty(statement.fieldName).call(fMessage) + statement.value in propertyValue.toString() + } + }.getOrNull() +} ?: this + +@Suppress("UNCHECKED_CAST") +internal fun FrontMessage.applySort(statement: SortStatement?): Any = statement?.let { + runCatching { + val selector: (Any) -> Comparable = { + it::class.getProperty(statement.fieldName).call(it) as Comparable + } + val data = this as Iterable + if (statement.order == "ASC") data.sortedBy(selector) else data.sortedByDescending(selector) + }.getOrNull() +} ?: this + +private fun KClass.getProperty(name: String) = + memberProperties.first { it.name == name } diff --git a/core/src/main/kotlin/endpoints/plugin/Subscriptions.kt b/core/src/main/kotlin/endpoints/plugin/Subscriptions.kt index 9a31a0b60..3284ebfa7 100644 --- a/core/src/main/kotlin/endpoints/plugin/Subscriptions.kt +++ b/core/src/main/kotlin/endpoints/plugin/Subscriptions.kt @@ -12,7 +12,9 @@ sealed class Subscription { @SerialName("AGENT") data class AgentSubscription( val agentId: String, - val buildVersion: String? = null + val buildVersion: String? = null, + val searchStatement: SearchStatement? = null, + val sortStatement: SortStatement? = null ) : Subscription() { override fun toKey(destination: String) = "agent::$agentId:$buildVersion$destination" } diff --git a/core/src/main/kotlin/plugin/Cache.kt b/core/src/main/kotlin/plugin/Cache.kt index c27025cab..eb086c4ec 100644 --- a/core/src/main/kotlin/plugin/Cache.kt +++ b/core/src/main/kotlin/plugin/Cache.kt @@ -7,12 +7,14 @@ import com.epam.drill.admin.endpoints.plugin.* import com.epam.drill.admin.plugins.* +typealias FrontMessage = Any + class PluginCaches( private val cacheService: CacheService, private val plugins: Plugins ) { - internal operator fun get(pluginId: String): Cache = cacheService.getOrCreate(pluginId) + internal operator fun get(pluginId: String): Cache = cacheService.getOrCreate(pluginId) //TODO aggregate plugin data internal fun getData( diff --git a/core/src/main/kotlin/plugin/Senders.kt b/core/src/main/kotlin/plugin/Senders.kt index 35373c4b1..c25ad49e9 100644 --- a/core/src/main/kotlin/plugin/Senders.kt +++ b/core/src/main/kotlin/plugin/Senders.kt @@ -28,13 +28,16 @@ class PluginSenders(override val kodein: Kodein) : KodeinAware { logger.trace { "Removed message by key $subscriptionKey" } pluginCache.remove(subscriptionKey) } else { - val messageForSend = message.toWsMessageAsString(dest, WsMessageType.MESSAGE, subscription) logger.trace { "Sending message to $subscriptionKey" } pluginCache[subscriptionKey] = message val sessionCache = pluginSessions[pluginId] sessionCache.sendTo( destination = subscriptionKey, - messageProvider = { messageForSend } + messageProvider = { sessionSubscription -> + message + .processWithSubscription(sessionSubscription) + .toWsMessageAsString(dest, WsMessageType.MESSAGE, sessionSubscription) + } ) } } diff --git a/tests/src/test/kotlin/endpoints/DrillPluginWsTest.kt b/tests/src/test/kotlin/endpoints/DrillPluginWsTest.kt index cf8610576..61c7289fc 100644 --- a/tests/src/test/kotlin/endpoints/DrillPluginWsTest.kt +++ b/tests/src/test/kotlin/endpoints/DrillPluginWsTest.kt @@ -19,6 +19,8 @@ import io.ktor.locations.* import io.ktor.serialization.* import io.ktor.server.testing.* import io.ktor.websocket.* +import kotlinx.coroutines.channels.* +import kotlinx.serialization.Serializable import kotlinx.serialization.json.* import org.kodein.di.* import org.kodein.di.generic.* @@ -118,6 +120,94 @@ class PluginWsTest { } } + @Serializable + data class Data(val field1: String, val field2: Int) + + @Test + fun `should apply filters to list topics`() { + withTestApplication(testApp) { + val fieldName = Data::field1.name + handleWebSocketConversation(socketUrl()) { incoming, outgoing -> + + val destination = "/ws/plugins/test-plugin" + val message = listOf( + Data("x1", 10), + Data("x2", 10), + Data("x2", 10), + Data("x3", 10) + ) + + + subscribeWithFilter(outgoing, destination) + assertEquals("", readMessage(incoming)?.content, "first subscription should be empty") + + + sendListData(destination, message) + assertEquals(message.size, (readMessage(incoming) as JsonArray).size) + + + subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x1")) + assertEquals(1, (readMessage(incoming) as JsonArray).size) + + unsubscribe(outgoing, destination) + + subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x2")) + assertEquals(2, (readMessage(incoming) as JsonArray).size) + + unsubscribe(outgoing, destination) + + //cache value + subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x2")) + assertEquals(2, (readMessage(incoming) as JsonArray).size) + + } + + } + } + + private suspend fun unsubscribe( + outgoing: SendChannel, + destination: String + ) { + outgoing.send( + uiMessage( + Unsubscribe( + destination, + AgentSubscription.serializer() stringify AgentSubscription(agentId, buildVersion) + ) + + ) + ) + } + + private suspend fun subscribeWithFilter( + outgoing: SendChannel, + destination: String, + filter: SearchStatement? = null + ) { + outgoing.send( + uiMessage( + Subscribe( + destination, + AgentSubscription.serializer() stringify AgentSubscription(agentId, buildVersion, searchStatement = filter) + ) + ) + ) + } + + private suspend fun sendListData(destination: String, message: List) { + val ps by kodeinApplication.kodein.instance() + val sender = ps.sender("test-plugin") + sender.send(AgentSendContext(agentId, buildVersion), destination, message) + } + + private suspend fun readMessage(incoming: ReceiveChannel): JsonElement? { + val receive = incoming.receive() as? Frame.Text ?: fail() + val readText = receive.readText() + val fromJson = json.parseJson(readText) as JsonObject + return fromJson[WsSendMessage::message.name] + } + @Test fun `should return data from storage which was sent before via send()`() { withTestApplication(testApp) {