How to implement ndjson Response converter #756
Unanswered
ag2s20150909
asked this question in
Q&A
Replies: 2 comments
-
The current solution is to write it manually, it would be nice to generate it automatically. interface TestService{
@Streaming
@GET("/api/test")
suspend fun _test() : HttpStatement
}
fun TestService.testFlow():Flow<FooResponse> = callbackFlow {
val channel: ByteReadChannel=_test().body()
while (!channel.isClosedForRead) {
val line = channel.readUTF8Line()
if (!line.isNullOrBlank()) {
try {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
ktjson.decodeFromString(FooResponse.serializer(),line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
ktjson.decodeFromString(FooResponse.serializer(),line)
}
trySendBlocking(obj)
}catch (e:Exception){
throw e
}
}
}
awaitClose {
}
}
or
inline fun <reified T> HttpStatement.asFlow(format: StringFormat=ktjson):Flow<T> = callbackFlow {
val content:ByteReadChannel=this@asFlow.body()
while (!content.isClosedForRead) {
val line = content.readUTF8Line()
try {
if (!line.isNullOrEmpty()) {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
format.decodeFromString<T>(line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
format.decodeFromString<T>(line)
}
trySendBlocking(obj)
}
}catch (e:Exception){
throw e
}
}
awaitClose {
}
}
|
Beta Was this translation helpful? Give feedback.
0 replies
-
Another attempt. It works, but it's not streaming. .......
install(ContentNegotiation){
json(ktjson)
register(NDJson, NdjsonContentConverter(ktjson))
register(SSE, NdjsonContentConverter(ktjson))
}
.....
private val NDJson = ContentType("application","x-ndjson")
private val SSE =ContentType("text","event-stream")
class NdjsonContentConverter(private val format: StringFormat): ContentConverter{
override suspend fun deserialize(
charset: Charset,
typeInfo: TypeInfo,
content: ByteReadChannel
): Any? {
check(typeInfo.type==Flow::class){"For NdjsonContentConverter the return type must be kotlinx.coroutines.flow.Flow ,like Flow<Foo> or Flow<out Foo>"}
val responseType = typeInfo.upperBoundType(0)!!.type.javaObjectType
val loader =format.serializersModule.serializer(responseType)
return callbackFlow {
while (!content.isClosedForRead) {
val line = content.readUTF8Line()
try {
if (!line.isNullOrEmpty()) {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
ktjson.decodeFromString(loader,line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
ktjson.decodeFromString(loader,line)
}
trySendBlocking(obj)
}
}catch (e:Exception){
throw e
}
}
awaitClose { }
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
When using retrofit it can do something like this
Beta Was this translation helpful? Give feedback.
All reactions