Skip to content

Commit

Permalink
Add coroutine support for appenders
Browse files Browse the repository at this point in the history
Refactored the Appender interface to support suspend functions for asynchronous operations. Introduced CoroutineScope for handling concurrency in various components, ensuring non-blocking behavior. Adjusted all derived classes and usage contexts accordingly.
  • Loading branch information
smyrgeorge committed Oct 22, 2024
1 parent e2f8f4e commit 50d90e7
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 30 deletions.
2 changes: 1 addition & 1 deletion examples/src/nativeMain/kotlin/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import kotlin.coroutines.CoroutineContext

class Main {
class MyBatchAppender(size: Int) : BatchAppender<LoggingEvent>(size) {
override suspend fun append(event: List<LoggingEvent>) {
override suspend fun handle(event: List<LoggingEvent>) {
// E.g. send batch over http.
println(event.joinToString { it.message })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package io.github.smyrgeorge.log4k

interface Appender<T> {
val name: String
fun append(event: T)
suspend fun append(event: T)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import kotlinx.coroutines.IO
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.math.absoluteValue
Expand Down Expand Up @@ -47,8 +46,8 @@ object RootLogger {
}
}

fun log(event: LoggingEvent) = runBlocking { logs.send(event) }
fun trace(event: TracingEvent) = runBlocking { traces.send(event) }
fun log(event: LoggingEvent): Unit = send(LoggerScope) { logs.send(event) }
fun trace(event: TracingEvent): Unit = send(TracerScope) { traces.send(event) }

object Logging {
private var idx: Long = 0
Expand All @@ -69,6 +68,10 @@ object RootLogger {
fun register(appender: Appender<TracingEvent>) = appenders.register(appender)
}

private inline fun send(scope: CoroutineScope, crossinline f: suspend () -> Unit) {
scope.launch { f() }
}

private object LoggerScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.smyrgeorge.log4k

import io.github.smyrgeorge.log4k.impl.OpenTelemetry
import io.github.smyrgeorge.log4k.impl.extensions.toName
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

Expand All @@ -12,8 +13,8 @@ interface TracingEvent {
val level: Level,
val context: Context,
val parent: Span?,
var start: Instant?,
var end: Instant?,
var startAt: Instant?,
var endAt: Instant?,
val attributes: MutableMap<String, Any?>,
val events: MutableList<Event>,
var status: Status,
Expand All @@ -31,7 +32,7 @@ interface TracingEvent {
fun start(): Span {
if (!shouldStart()) return this
if (started) return this
start = Clock.System.now()
startAt = Clock.System.now()
started = true
return this
}
Expand Down Expand Up @@ -61,7 +62,7 @@ interface TracingEvent {
name = OpenTelemetry.EXCEPTION,
timestamp = Clock.System.now(),
attributes = attrs + mapOf(
OpenTelemetry.EXCEPTION_TYPE to error::class.qualifiedName,
OpenTelemetry.EXCEPTION_TYPE to error::class.toName(),
OpenTelemetry.EXCEPTION_ESCAPED to escaped,
OpenTelemetry.EXCEPTION_MESSAGE to error.message,
OpenTelemetry.EXCEPTION_STACKTRACE to error.stackTraceToString(),
Expand All @@ -79,7 +80,7 @@ interface TracingEvent {
fun end(error: Throwable? = null) {
if (!shouldStart()) return
if (closed || !started) return
end = Clock.System.now()
endAt = Clock.System.now()
closed = true
status = Status(
code = error?.let { Status.Code.ERROR } ?: Status.Code.OK,
Expand Down Expand Up @@ -151,8 +152,8 @@ interface TracingEvent {
level = level,
context = Context(traceId, id, isRemote, Context.Tracer(tracer.name, tracer.level)),
parent = parent,
start = null,
end = null,
startAt = null,
endAt = null,
attributes = mutableMapOf(),
events = mutableListOf(),
status = Status(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.github.smyrgeorge.log4k.impl.appenders

import io.github.smyrgeorge.log4k.Appender
import io.github.smyrgeorge.log4k.Logger
import io.github.smyrgeorge.log4k.impl.extensions.toName
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
Expand All @@ -13,33 +13,31 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

@Suppress("unused")
abstract class FlowAppender<T, E> : Appender<E> {
private val log = Logger.of(this::class)
private val scope = FlowAppenderScope()
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
private val logs: Channel<E> = Channel(capacity = Channel.UNLIMITED)

@Suppress("UNCHECKED_CAST")
private var flow: Flow<T> = logs.receiveAsFlow().flowOn(Dispatchers.IO) as Flow<T>
private var flow: Flow<T> = logs.receiveAsFlow().flowOn(dispatcher) as Flow<T>

init {
FlowAppenderScope().launch(Dispatchers.IO) {
FlowAppenderScope().launch(dispatcher) {
@Suppress("UNCHECKED_CAST")
flow = setup(this@FlowAppender.flow as Flow<E>)
flow.onEach { event: T -> runCatching { append(event) } }.launchIn(this)
flow.onEach { event: T -> runCatching { handle(event) } }.launchIn(this)
}
}

final override val name: String = this::class.toName()
final override fun append(event: E) {
runBlocking { logs.send(event) }
}

abstract fun setup(flow: Flow<E>): Flow<T>
abstract suspend fun append(event: T)
abstract suspend fun handle(event: T)

final override val name: String = this::class.toName()
final override suspend fun append(event: E): Unit = logs.send(event)

private class FlowAppenderScope : CoroutineScope {
override val coroutineContext: CoroutineContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.github.smyrgeorge.log4k.impl.extensions.toName

class SimpleConsoleLoggingAppender : Appender<LoggingEvent> {
override val name: String = this::class.toName()
override fun append(event: LoggingEvent) {
override suspend fun append(event: LoggingEvent) {
print(event.format())
event.throwable?.printStackTrace()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.github.smyrgeorge.log4k.impl.appenders

import io.github.smyrgeorge.log4k.Appender
import io.github.smyrgeorge.log4k.LoggingEvent
import io.github.smyrgeorge.log4k.TracingEvent
import io.github.smyrgeorge.log4k.impl.extensions.format
import io.github.smyrgeorge.log4k.impl.extensions.toName

class SimpleConsoleTracingAppender : Appender<TracingEvent> {
override val name: String = this::class.toName()
override fun append(event: TracingEvent) {
override suspend fun append(event: TracingEvent) {
println(event.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

fun <T> Mutex.withLockBlocking(f: () -> T): T = runBlocking { this@withLockBlocking.withLock { f() } }
fun <T> Mutex.withLockBlocking(f: () -> T): T = runBlocking { withLock { f() } }
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@ package io.github.smyrgeorge.log4k.impl.registry

import io.github.smyrgeorge.log4k.Appender
import io.github.smyrgeorge.log4k.impl.extensions.withLockBlocking
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

@Suppress("unused")
class AppenderRegistry<T> {
private val mutex = Mutex()
private val scope = AppenderRegistryScope()
private val appenders = mutableListOf<Appender<T>>()

fun all(): List<Appender<T>> = mutex.withLockBlocking { appenders.toList() }
fun get(name: String): Appender<T>? = mutex.withLockBlocking { appenders.find { it.name == name } }
fun register(appender: Appender<T>) = mutex.withLockBlocking { appenders.add(appender) }
fun unregister(name: String) = mutex.withLockBlocking { appenders.removeAll { it.name == name } }
fun unregisterAll() = mutex.withLockBlocking { appenders.clear() }

private inline fun send(crossinline f: suspend () -> Unit) {
scope.launch { f() }
}

private class AppenderRegistryScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import kotlin.test.Test
class MainTests {

class MyBatchAppender(size: Int) : BatchAppender<LoggingEvent>(size) {
override suspend fun append(event: List<LoggingEvent>) {
override suspend fun handle(event: List<LoggingEvent>) {
// E.g. send batch over http.
println(event.joinToString { it.message })
}
Expand Down

0 comments on commit 50d90e7

Please sign in to comment.