Skip to content

Commit

Permalink
chore: improve ask methods
Browse files Browse the repository at this point in the history
  • Loading branch information
niltonheck committed Sep 9, 2024
1 parent cfb495e commit 5b0d632
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,15 @@ val answerSession = AnswerSession(
override fun onStateChange(interactions: MutableList<Interaction>) {
// Process list of interaction post-response
}

override fun onMessageChange(data: String) {
// Process the incoming streaming
}
}
)

runBlocking {
answerSession.ask(AskParams(
val answer = answerSession.ask(AskParams(
query = "What's the best movie to watch with the family?"
))

println(answer)
}
```

Expand Down
49 changes: 32 additions & 17 deletions client/src/commonMain/kotlin/com/orama/endpoint/AnswerSession.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.sse.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.serialization.builtins.ListSerializer
import kotlinx.serialization.builtins.MapSerializer
import kotlinx.serialization.builtins.serializer
Expand Down Expand Up @@ -60,7 +66,7 @@ class AnswerSession<T>(
close()
}

private fun handleServerSentEvent(event: ServerSentEvent, sseSerializer: SSESerializer<T>) {
private fun handleServerSentEvent(event: ServerSentEvent, sseSerializer: SSESerializer<T>): SSEEvent<T> {

val chunkData = sseSerializer.process(event)
val currentState = sseSerializer.getState()
Expand All @@ -72,6 +78,8 @@ class AnswerSession<T>(
else -> handleMessageContent(currentState.message)
}
}

return chunkData
}

private fun handleSources(sourcesData: List<Hit<T>>) {
Expand All @@ -94,21 +102,7 @@ class AnswerSession<T>(
)
}

/**
* This function is just to allow better readability.
*
* For our Kotlin SDK streaming is always available.
* Developers can device to use it or not by subscribing
* to the AnswerEventListener.onMessageChange event.
*
* Alias for: ask(params: AskParams)
*/
suspend fun askStream(params: AskParams) {
this.ask(params)
return
}

suspend fun ask(params: AskParams) : String {
private suspend fun processAnswer(params: AskParams, onSSEEvent: ((event: String) -> Unit)? = null): String {
var eventResult = emptyEventResult<T>()

try {
Expand All @@ -130,8 +124,13 @@ class AnswerSession<T>(

val serializerClass = SSESerializer(sourcesSerializer = answerParams.serializer)


incoming.collect { item ->
handleServerSentEvent(item, serializerClass)
val sseEvent = handleServerSentEvent(item, serializerClass)

if(sseEvent.type == EventType.TEXT) {
onSSEEvent?.invoke((sseEvent.message as StringMessage).content)
}
}

eventResult = serializerClass.getState()
Expand All @@ -146,6 +145,22 @@ class AnswerSession<T>(
}
}

suspend fun ask(params: AskParams) : String {
return this.processAnswer(params)
}

suspend fun askStream(params: AskParams, coroutineScope: CoroutineScope): Flow<String> = callbackFlow {
this@AnswerSession.processAnswer(params) { sseEvent: String ->
coroutineScope.launch {
send(sseEvent)
}
}

close()
awaitClose {}
}


private fun buildRequestBody(question: String, messages: List<Message>?, searchParams: Map<String, String>): String {

var encodedMessages = "[]"
Expand Down

0 comments on commit 5b0d632

Please sign in to comment.