Skip to content

Commit

Permalink
Integration tests: agent loading fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorbll committed May 12, 2020
1 parent df51390 commit 9cf2c51
Showing 1 changed file with 23 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.epam.drill.e2e
import com.epam.drill.admin.agent.*
import com.epam.drill.admin.build.*
import com.epam.drill.admin.common.*
import com.epam.drill.admin.common.serialization.*
import com.epam.drill.admin.endpoints.*
import com.epam.drill.admin.endpoints.plugin.*
import com.epam.drill.admin.notification.*
Expand All @@ -11,7 +12,6 @@ import com.epam.drill.admin.router.*
import com.epam.drill.admin.servicegroup.*
import com.epam.drill.api.*
import com.epam.drill.common.*
import com.epam.drill.admin.common.serialization.*
import com.epam.drill.plugin.api.message.*
import com.epam.drill.plugin.api.processing.*
import io.ktor.application.*
Expand All @@ -35,7 +35,7 @@ abstract class PluginStreams {


class AdminUiChannels {
val agentChannel = Channel<AgentInfoDto?>()
val agentChannel = Channel<AgentInfoDto?>(3)
val buildsChannel = Channel<List<BuildSummaryDto>?>()
val agentsChannel = Channel<GroupedAgentsDto?>()
val allPluginsChannel = Channel<Set<PluginDto>?>()
Expand All @@ -57,51 +57,37 @@ class UIEVENTLOOP(
val glob: Channel<GroupedAgentsDto> = Channel()
) {

fun Application.queued(wsTopic: WsTopic, incoming: ReceiveChannel<Frame>) = this.launch {
incoming.consumeEach {
when (it) {
fun Application.queued(wsTopic: WsTopic, incoming: ReceiveChannel<Frame>) = launch {
incoming.consumeEach { frame ->
when (frame) {
is Frame.Text -> {
val parsedJson = json.parseJson(it.readText()) as JsonObject
if (uiStreamDebug)
val parsedJson = json.parseJson(frame.readText()) as JsonObject
if (uiStreamDebug) {
println("UI: $parsedJson")
}
val messageType = WsMessageType.valueOf(parsedJson["type"]!!.content)
val url = parsedJson[WsSendMessage::destination.name]!!.content
val content = parsedJson[WsSendMessage::message.name]!!.toString()
val (_, type) = wsTopic.getParams(url)
val notEmptyResponse = content != "\"\""
val response = content.takeIf { it != "\"\"" }
when (messageType) {
WsMessageType.MESSAGE, WsMessageType.DELETE ->
this@queued.launch {
when (type) {
is WsRoutes.Agents -> {
glob.send(GroupedAgentsDto.serializer() parse content)
}
is WsRoutes.Agent -> {
if (notEmptyResponse) {
cs[type.agentId]!!.agentChannel.send(AgentInfoDto.serializer() parse content)
} else {
cs[type.agentId]!!.agentChannel.send(null)
}
}
is WsRoutes.AgentBuilds -> {
if (notEmptyResponse) {
cs.getValue(type.agentId)
.buildsChannel.send((BuildSummaryDto.serializer().list parse content))
} else {
cs.getValue(type.agentId).buildsChannel.send(null)
}
}
is WsRoutes.AgentPlugins -> {
if (notEmptyResponse) {
cs[type.agentId]!!.agentPluginInfoChannel.send(PluginDto.serializer().set parse content)
} else {
cs[type.agentId]!!.agentPluginInfoChannel.send(null)
}
}
WsMessageType.MESSAGE, WsMessageType.DELETE -> launch {
when (type) {
is WsRoutes.Agents -> glob.run {
send(GroupedAgentsDto.serializer() parse content)
}
is WsRoutes.Agent -> cs.getValue(type.agentId).agentChannel.run {
send(response?.run { AgentInfoDto.serializer() parse content })
}
is WsRoutes.AgentBuilds -> cs.getValue(type.agentId).buildsChannel.run {
send(response?.run { BuildSummaryDto.serializer().list parse content })
}
is WsRoutes.AgentPlugins -> cs.getValue(type.agentId).agentPluginInfoChannel.run {
send(response?.run { PluginDto.serializer().set parse content })
}
}
else -> {
}
else -> Unit
}
}
is Frame.Close -> {
Expand All @@ -113,7 +99,6 @@ class UIEVENTLOOP(
}
}


class Agent(
val app: Application,
val agentId: String,
Expand Down

0 comments on commit 9cf2c51

Please sign in to comment.