Skip to content

Commit

Permalink
EPMDJ-3093 implemented filtering and sorting core functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorKey committed Sep 7, 2020
1 parent 9df3c34 commit 6fe01e4
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 9 deletions.
11 changes: 7 additions & 4 deletions core/src/main/kotlin/endpoints/agent/SessionStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.epam.drill.admin.endpoints.agent

import com.epam.drill.admin.common.*
import com.epam.drill.admin.endpoints.*
import com.epam.drill.admin.endpoints.plugin.*
import com.epam.drill.admin.plugin.*
import com.epam.drill.admin.util.*
import io.ktor.http.cio.websocket.*
import kotlinx.atomicfu.*
Expand All @@ -14,14 +16,16 @@ private val logger = KotlinLogging.logger {}
class SessionStorage {

internal val sessions get() = _sessions.value
internal val subscriptions get() = _subscriptions.value

private val _sessions = atomic(emptyBiSetMap<String, WebSocketSession>())
private val _subscriptions = atomic(mutableMapOf<WebSocketSession, Subscription?>())

operator fun contains(destination: String): Boolean = destination in sessions.first

suspend fun sendTo(
destination: String,
message: Any,
message: FrontMessage,
type: WsMessageType = WsMessageType.MESSAGE
): Set<WebSocketSession> = sendTo(
destination = destination,
Expand All @@ -30,11 +34,10 @@ class SessionStorage {

suspend fun sendTo(
destination: String,
messageProvider: () -> String
messageProvider: (Subscription?) -> String
): Set<WebSocketSession> = sessions.first[destination].apply {
if (any()) {
val messageStr = messageProvider()
forEach { it.send(destination, messageStr) }
forEach { it.send(destination, messageProvider(subscriptions[it])) }
}
}

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/kotlin/endpoints/plugin/DrillPluginWs.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ class DrillPluginWs(override val kodein: Kodein) : KodeinAware {
val destination = event.destination
val subscriptionKey = destination.toKey(subscription)
sessionCache.subscribe(subscriptionKey, this)
sessionCache.subscriptions[this] = subscription
val pluginCache = pluginCaches[pluginId]
val message = pluginCache[subscriptionKey] ?: ""
val messageToSend = message.toWsMessageAsString(destination, WsMessageType.MESSAGE, subscription)
val messageToSend = message
.processWithSubscription(subscription)
.toWsMessageAsString(destination, WsMessageType.MESSAGE, subscription)
send(messageToSend)
logger.trace { "Subscribed to $subscriptionKey, ${toDebugString()}" }
}
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/kotlin/endpoints/plugin/Filters.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.epam.drill.admin.endpoints.plugin

import com.epam.drill.admin.plugin.*
import kotlinx.serialization.*
import kotlin.reflect.*
import kotlin.reflect.full.*


@Serializable
data class SearchStatement(val fieldName: String, val value: String)

@Serializable
data class SortStatement(val fieldName: String, val order: String)

internal fun FrontMessage.processWithSubscription(subscription: Subscription?): Any =
when (subscription) {
is AgentSubscription ->
applyFilter(subscription.searchStatement)
.applySort(subscription.sortStatement)
else -> this
}


private fun FrontMessage.applyFilter(statement: SearchStatement?): FrontMessage = statement?.let {
runCatching {
@Suppress("UNCHECKED_CAST")
(this as? Iterable<Any>)?.filter { fMessage ->
val propertyValue = fMessage::class.getProperty(statement.fieldName).call(fMessage)
statement.value in propertyValue.toString()
}
}.getOrNull()
} ?: this

@Suppress("UNCHECKED_CAST")
internal fun FrontMessage.applySort(statement: SortStatement?): Any = statement?.let {
runCatching {
val selector: (Any) -> Comparable<Any> = {
it::class.getProperty(statement.fieldName).call(it) as Comparable<Any>
}
val data = this as Iterable<Any>
if (statement.order == "ASC") data.sortedBy(selector) else data.sortedByDescending(selector)
}.getOrNull()
} ?: this

private fun KClass<out Any>.getProperty(name: String) =
memberProperties.first { it.name == name }
4 changes: 3 additions & 1 deletion core/src/main/kotlin/endpoints/plugin/Subscriptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ sealed class Subscription {
@SerialName("AGENT")
data class AgentSubscription(
val agentId: String,
val buildVersion: String? = null
val buildVersion: String? = null,
val searchStatement: SearchStatement? = null,
val sortStatement: SortStatement? = null
) : Subscription() {
override fun toKey(destination: String) = "agent::$agentId:$buildVersion$destination"
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/kotlin/plugin/Cache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import com.epam.drill.admin.endpoints.plugin.*
import com.epam.drill.admin.plugins.*


typealias FrontMessage = Any

class PluginCaches(
private val cacheService: CacheService,
private val plugins: Plugins
) {

internal operator fun get(pluginId: String): Cache<Any, Any> = cacheService.getOrCreate(pluginId)
internal operator fun get(pluginId: String): Cache<Any, FrontMessage> = cacheService.getOrCreate(pluginId)

//TODO aggregate plugin data
internal fun getData(
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/kotlin/plugin/Senders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ class PluginSenders(override val kodein: Kodein) : KodeinAware {
logger.trace { "Removed message by key $subscriptionKey" }
pluginCache.remove(subscriptionKey)
} else {
val messageForSend = message.toWsMessageAsString(dest, WsMessageType.MESSAGE, subscription)
logger.trace { "Sending message to $subscriptionKey" }
pluginCache[subscriptionKey] = message
val sessionCache = pluginSessions[pluginId]
sessionCache.sendTo(
destination = subscriptionKey,
messageProvider = { messageForSend }
messageProvider = { sessionSubscription ->
message
.processWithSubscription(sessionSubscription)
.toWsMessageAsString(dest, WsMessageType.MESSAGE, sessionSubscription)
}
)
}
}
Expand Down
90 changes: 90 additions & 0 deletions tests/src/test/kotlin/endpoints/DrillPluginWsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import io.ktor.locations.*
import io.ktor.serialization.*
import io.ktor.server.testing.*
import io.ktor.websocket.*
import kotlinx.coroutines.channels.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.*
import org.kodein.di.*
import org.kodein.di.generic.*
Expand Down Expand Up @@ -118,6 +120,94 @@ class PluginWsTest {
}
}

@Serializable
data class Data(val field1: String, val field2: Int)

@Test
fun `should apply filters to list topics`() {
withTestApplication(testApp) {
val fieldName = Data::field1.name
handleWebSocketConversation(socketUrl()) { incoming, outgoing ->

val destination = "/ws/plugins/test-plugin"
val message = listOf(
Data("x1", 10),
Data("x2", 10),
Data("x2", 10),
Data("x3", 10)
)


subscribeWithFilter(outgoing, destination)
assertEquals("", readMessage(incoming)?.content, "first subscription should be empty")


sendListData(destination, message)
assertEquals(message.size, (readMessage(incoming) as JsonArray).size)


subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x1"))
assertEquals(1, (readMessage(incoming) as JsonArray).size)

unsubscribe(outgoing, destination)

subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x2"))
assertEquals(2, (readMessage(incoming) as JsonArray).size)

unsubscribe(outgoing, destination)

//cache value
subscribeWithFilter(outgoing, destination, SearchStatement(fieldName, "x2"))
assertEquals(2, (readMessage(incoming) as JsonArray).size)

}

}
}

private suspend fun unsubscribe(
outgoing: SendChannel<Frame>,
destination: String
) {
outgoing.send(
uiMessage(
Unsubscribe(
destination,
AgentSubscription.serializer() stringify AgentSubscription(agentId, buildVersion)
)

)
)
}

private suspend fun subscribeWithFilter(
outgoing: SendChannel<Frame>,
destination: String,
filter: SearchStatement? = null
) {
outgoing.send(
uiMessage(
Subscribe(
destination,
AgentSubscription.serializer() stringify AgentSubscription(agentId, buildVersion, searchStatement = filter)
)
)
)
}

private suspend fun sendListData(destination: String, message: List<Data>) {
val ps by kodeinApplication.kodein.instance<PluginSenders>()
val sender = ps.sender("test-plugin")
sender.send(AgentSendContext(agentId, buildVersion), destination, message)
}

private suspend fun readMessage(incoming: ReceiveChannel<Frame>): JsonElement? {
val receive = incoming.receive() as? Frame.Text ?: fail()
val readText = receive.readText()
val fromJson = json.parseJson(readText) as JsonObject
return fromJson[WsSendMessage::message.name]
}

@Test
fun `should return data from storage which was sent before via send()`() {
withTestApplication(testApp) {
Expand Down

0 comments on commit 6fe01e4

Please sign in to comment.