Skip to content

Commit

Permalink
Changing DebuggingDirectives to run in a correct sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
luksow committed Jul 3, 2024
1 parent 1870c5a commit 2abfa19
Showing 1 changed file with 34 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,31 @@ trait DebuggingDirectives {
*/
def logRequest(logHeaders: Boolean = true, logBody: Boolean = true,
redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains,
maxLogLength: Int = Int.MaxValue,
logAction: Option[String => IO[Unit]] = None): Directive0 = {
val log = logAction.getOrElse { (s: String) =>
val log = trimLog(maxLogLength).andThen(logAction.getOrElse { (s: String) =>
DebuggingDirectives.logger.info(s)
}
})

Directive { inner => ctx =>
if (logBody) {
if (logBody && !ctx.request.isChunked) {
IO.ref(Vector.empty[Chunk[Byte]])
.flatMap { vec =>
val collectChunks: Pipe[IO, Byte, Nothing] =
_.chunks.flatMap(c => Stream.exec(vec.update(_ :+ c)))
val pipe: Pipe[IO, Byte, Byte] = _.observe(collectChunks)
val bodyForLog = Stream.eval(vec.get).flatMap(v => Stream.emits(v)).unchunks
val logRequest = Logger.logMessage[IO, Request[IO]](ctx.request.withBodyStream(bodyForLog))(logHeaders,
logBody, redactHeadersWhen)(log)
val bodyForRouting = pipe(ctx.request.body)
val newCtx = ctx.copy(request = ctx.request.withBodyStream(bodyForRouting))
inner(())(newCtx).flatMap {
case RouteResult.Complete(response) =>
IO.pure(RouteResult.Complete(response.withBodyStream(response.body.onFinalizeWeak(logRequest))))
case RouteResult.Rejected(rejections) =>
logRequest.as(RouteResult.Rejected(rejections))
}
val newBody = Stream.eval(vec.get)
.flatMap(chunks => Stream.emits(chunks).covary[IO])
.flatMap(chunks => Stream.chunk(chunks).covary[IO])
val newRequest = ctx.request.withBodyStream(
ctx.request.body.observe(_.chunks.flatMap(chunk => Stream.eval(vec.update(_ :+ chunk)).drain)))

val newCtx = ctx.copy(request = ctx.request.withBodyStream(newBody))
Logger.logMessage[IO, Request[IO]](newRequest)(logHeaders, logBody = true, redactHeadersWhen)(log).flatMap(
_ =>
inner(())(newCtx))
}
} else {
inner(())(ctx).flatMap {
case RouteResult.Complete(response) =>
Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody, redactHeadersWhen)(log)
.as(RouteResult.Complete(response))
case RouteResult.Rejected(rejections) =>
Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody, redactHeadersWhen)(log)
.as(RouteResult.Rejected(rejections))
}
Logger.logMessage[IO, Request[IO]](ctx.request)(logHeaders, logBody = false, redactHeadersWhen)(log).flatMap(
_ =>
inner(())(ctx))
}
}
}
Expand All @@ -60,25 +53,20 @@ trait DebuggingDirectives {
*/
def logResult(logHeaders: Boolean = true, logBody: Boolean = true,
redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains,
maxLogLength: Int = Int.MaxValue,
logAction: Option[String => IO[Unit]] = None): Directive0 = {
val log = logAction.getOrElse { (s: String) =>
val log = trimLog(maxLogLength).andThen(logAction.getOrElse { (s: String) =>
DebuggingDirectives.logger.info(s)
}
})

Directive { inner => ctx =>
inner(())(ctx).flatMap {
case RouteResult.Complete(response) =>
if (logBody) {
IO.ref(Vector.empty[Chunk[Byte]]).map { vec =>
val newBody = Stream.eval(vec.get).flatMap(v => Stream.emits(v)).unchunks
// Cannot Be Done Asynchronously - Otherwise All Chunks May Not Be Appended Previous to Finalization
val logPipe: Pipe[IO, Byte, Byte] =
_.observe(_.chunks.flatMap(c => Stream.exec(vec.update(_ :+ c))))
.onFinalizeWeak(Logger.logMessage[IO, Response[IO]](response.withBodyStream(newBody))(logHeaders,
logBody, redactHeadersWhen)(log))
RouteResult.Complete(response.withBodyStream(logPipe(response.body)))
}
if (logBody && !response.isChunked) {
Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody = true, redactHeadersWhen)(log).as(
RouteResult.Complete(response))
} else {
Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody, redactHeadersWhen)(log).as(
Logger.logMessage[IO, Response[IO]](response)(logHeaders, logBody = false, redactHeadersWhen)(log).as(
RouteResult.Complete(response))
}
case RouteResult.Rejected(rejections) =>
Expand All @@ -95,10 +83,17 @@ trait DebuggingDirectives {
*/
def logRequestResult(logHeaders: Boolean = true, logBody: Boolean = true,
redactHeadersWhen: CIString => Boolean = Headers.SensitiveHeaders.contains,
maxLogLength: Int = Int.MaxValue,
logAction: Option[String => IO[Unit]] = None): Directive0 = {
logResult(logHeaders, logBody, redactHeadersWhen, logAction) & logRequest(logHeaders, logBody, redactHeadersWhen,
logResult(logHeaders, logBody, redactHeadersWhen, maxLogLength, logAction) & logRequest(logHeaders, logBody,
redactHeadersWhen,
maxLogLength,
logAction)
}

private def trimLog(maxLogLength: Int): String => String = { log =>
if (log.length > maxLogLength) log.take(maxLogLength) + "..." else log
}
}

object DebuggingDirectives extends DebuggingDirectives {
Expand Down

0 comments on commit 2abfa19

Please sign in to comment.