Skip to content

Commit

Permalink
Support for sendInitialEvents flag on watches. (#598)
Browse files Browse the repository at this point in the history
* - Allow user to set sendInitialEvents (supported in k8s 1.28+).
- Change HTTP read timeout to infinity, to reduce the chances that the stream gets closed and reopened (possibly with a too-old resourceVersion).
- Add a test for ResourceClient.

* - Running scalafmt.
- Adding a missing case match for ErrorEvent.
- Passing parameters that were missed in Resource.scala.

* Must set resourceVersionMatch=NotOlderThan when sendInitialEvents=true
  • Loading branch information
monktastic authored May 31, 2024
1 parent 04a5b60 commit ed3b7ba
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package com.coralogix.zio.k8s.client

import _root_.io.circe
import cats.data.NonEmptyList
import com.coralogix.zio.k8s.client.model.ParsedWatchEvent
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.Status
import io.circe.Decoder
import io.circe.generic.semiauto.deriveDecoder
import sttp.model.StatusCode
import zio.ZIO

Expand Down Expand Up @@ -90,6 +93,13 @@ case object Gone extends K8sFailure
*/
final case class InvalidEvent(requestInfo: K8sRequestInfo, eventType: String) extends K8sFailure

final case class ErrorEvent(status: String, message: String, reason: String, code: Int)
extends K8sFailure

object ErrorEvent {
implicit val errorDecoder: Decoder[ErrorEvent] = deriveDecoder[ErrorEvent]
}

/** Error produced by the generated getter methods on Kubernetes data structures.
*
* Indicates that the requested field is not present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,20 @@ trait Resource[T] {
* Constrain the returned items by field selectors. Not all fields are supported by the server.
* @param labelSelector
* Constrain the returned items by label selectors.
* @param sendInitialEvents
* Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s
* 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending
* updates.
* @return
* A stream of watch events
*/
def watch(
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): Stream[K8sFailure, TypedWatchEvent[T]]

/** Infinite watch stream of resource change events of type
Expand All @@ -88,17 +94,31 @@ trait Resource[T] {
* Constrain the returned items by field selectors. Not all fields are supported by the server.
* @param labelSelector
* Constrain the returned items by label selectors.
* @param sendInitialEvents
* Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s
* 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending
* updates.
* @return
* A stream of watch events
*/
def watchForever(
namespace: Option[K8sNamespace],
resourceVersion: Option[String] = None,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): ZStream[Any, K8sFailure, TypedWatchEvent[T]] =
ZStream.succeed(Reseted[T]()) ++ watch(namespace, resourceVersion, fieldSelector, labelSelector)
.retry(Schedule.recurWhileEquals(Gone))
ZStream.succeed(Reseted[T]()) ++
watch(
namespace,
resourceVersion,
fieldSelector,
labelSelector,
sendInitialEvents,
readTimeout
)
.retry(Schedule.recurWhileEquals(Gone))

/** Get a resource by its name
* @param name
Expand Down Expand Up @@ -199,16 +219,29 @@ trait NamespacedResource[T] {
* Constrain the returned items by field selectors. Not all fields are supported by the server.
* @param labelSelector
* Constrain the returned items by label selectors.
* @param sendInitialEvents
* Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s
* 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending
* updates.
* @return
* A stream of watch events
*/
def watch(
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): Stream[K8sFailure, TypedWatchEvent[T]] =
asGenericResource.watch(namespace, resourceVersion, fieldSelector, labelSelector)
asGenericResource.watch(
namespace,
resourceVersion,
fieldSelector,
labelSelector,
sendInitialEvents,
readTimeout
)

/** Infinite watch stream of resource change events of type
* [[com.coralogix.zio.k8s.client.model.TypedWatchEvent]]
Expand All @@ -222,16 +255,30 @@ trait NamespacedResource[T] {
* Constrain the returned items by field selectors. Not all fields are supported by the server.
* @param labelSelector
* Constrain the returned items by label selectors.
* @param sendInitialEvents
* Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s
* 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending
* updates.
* @return
* A stream of watch events
*/
def watchForever(
namespace: Option[K8sNamespace],
resourceVersion: Option[String] = None,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): ZStream[Any, K8sFailure, TypedWatchEvent[T]] =
asGenericResource.watchForever(namespace, resourceVersion, fieldSelector, labelSelector)
asGenericResource
.watchForever(
namespace,
resourceVersion,
fieldSelector,
labelSelector,
sendInitialEvents,
readTimeout
)

/** Get a resource by its name
* @param name
Expand Down Expand Up @@ -331,9 +378,18 @@ trait ClusterResource[T] {
def watch(
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): Stream[K8sFailure, TypedWatchEvent[T]] =
asGenericResource.watch(None, resourceVersion, fieldSelector, labelSelector)
asGenericResource.watch(
None,
resourceVersion,
fieldSelector,
labelSelector,
sendInitialEvents,
readTimeout
)

/** Infinite watch stream of resource change events of type
* [[com.coralogix.zio.k8s.client.model.TypedWatchEvent]]
Expand All @@ -350,9 +406,18 @@ trait ClusterResource[T] {
def watchForever(
resourceVersion: Option[String] = None,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): ZStream[Any, K8sFailure, TypedWatchEvent[T]] =
asGenericResource.watchForever(None, resourceVersion, fieldSelector, labelSelector)
asGenericResource.watchForever(
None,
resourceVersion,
fieldSelector,
labelSelector,
sendInitialEvents,
readTimeout
)

/** Get a resource by its name
* @param name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,28 @@ final class ResourceClient[
namespace: Option[K8sNamespace],
fieldSelector: Option[FieldSelector],
labelSelector: Option[LabelSelector],
resourceVersion: Option[String]
resourceVersion: Option[String],
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): Stream[K8sFailure, ParsedWatchEvent[T]] = {
val reqInfo =
K8sRequestInfo(resourceType, "watch", namespace, fieldSelector, labelSelector, None)
ZStream
.unwrap {
handleFailures("watch", namespace, fieldSelector, labelSelector, None) {
k8sRequest
.get(watching(namespace, resourceVersion, fieldSelector, labelSelector))
.get(
watching(namespace, resourceVersion, fieldSelector, labelSelector, sendInitialEvents)
)
.response(asStreamUnsafeWithError)
.readTimeout(10.minutes.asScala)
.readTimeout(readTimeout.asScala)
.send(backend.value)
}.map(_.mapError(RequestFailure(reqInfo, _)))
}
.via(
ZPipeline.fromChannel(
ZPipeline.utf8Decode.channel.mapError(CodingFailure(reqInfo, _))
) >>> ZPipeline.splitLines
) andThen ZPipeline.splitLines
)
.mapZIO { line =>
for {
Expand All @@ -153,22 +157,39 @@ final class ResourceClient[
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): ZStream[Any, K8sFailure, TypedWatchEvent[T]] =
ZStream.unwrap {
Ref.make(resourceVersion).map { lastResourceVersion =>
ZStream
.fromZIO(lastResourceVersion.get)
.flatMap(watchStream(namespace, fieldSelector, labelSelector, _))
.tap {
case ParsedTypedWatchEvent(event) => lastResourceVersion.set(event.resourceVersion)
case ParsedBookmark(resourceVersion) => lastResourceVersion.set(Some(resourceVersion))
}
.collect { case ParsedTypedWatchEvent(event) =>
event
}
.forever
}
for {
lastResourceVersion <- Ref.make(resourceVersion)
sendInitialEvents <- Ref.make(sendInitialEvents)
} yield ZStream
.fromZIO(lastResourceVersion.get.zip(sendInitialEvents.get))
.flatMap { case (lastResourceVersion, sendInitialEvents) =>
watchStream(
namespace,
fieldSelector,
labelSelector,
lastResourceVersion,
sendInitialEvents,
readTimeout
)
}
.tap {
case ParsedTypedWatchEvent(event) =>
lastResourceVersion.set(event.resourceVersion)
case ParsedBookmark(resourceVersion) =>
// If we receive a Bookmark, it means that all initial events have been sent. If we reopen the stream,
// we should not fetch all events again.
lastResourceVersion.set(Some(resourceVersion)) *>
sendInitialEvents.set(false)
}
.collect { case ParsedTypedWatchEvent(event) =>
event
}
.forever
}

def get(name: String, namespace: Option[K8sNamespace]): IO[K8sFailure, T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ trait ResourceClientBase {
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector],
labelSelector: Option[LabelSelector]
labelSelector: Option[LabelSelector],
sendInitialEvents: Boolean
): Uri =
K8sWatchUri(
resourceType,
namespace,
resourceVersion,
allowBookmarks = true,
fieldSelector,
labelSelector
labelSelector,
sendInitialEvents
).toUri(
cluster
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package com.coralogix.zio.k8s.client.model

import com.coralogix.zio.k8s.client.{
DeserializationFailure,
ErrorEvent,
InvalidEvent,
K8sFailure,
K8sRequestInfo
}
import com.coralogix.zio.k8s.client.ErrorEvent.errorDecoder
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.WatchEvent
import io.circe.{ Decoder, Json }
import io.circe.{ parser, Decoder, Json }
import zio.{ IO, ZIO }
import io.circe.generic.semiauto.deriveDecoder
import io.circe.parser._

/** Watch event with parsed payload
*
Expand Down Expand Up @@ -75,6 +79,12 @@ object ParsedWatchEvent {
case "BOOKMARK" =>
parseOrFail(requestInfo, event.`object`.value)(bookmarkedResourceVersion)
.map(ParsedBookmark.apply)
case "ERROR" =>
val json = event.`object`.value
ZIO
.fromEither(errorDecoder.decodeAccumulating(json.hcursor).toEither)
.mapError(DeserializationFailure(requestInfo, _))
.flatMap(ZIO.fail(_))
case _ =>
ZIO.fail(InvalidEvent(requestInfo, event.`type`))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ package object model extends LabelSelector.Syntax with FieldSelector.Syntax {
resourceVersion: Option[String],
allowBookmarks: Boolean,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false
) extends K8sUri {
override def toUri(cluster: K8sCluster): Uri =
K8sSimpleUri(resource, None, None, namespace)
Expand All @@ -189,6 +190,11 @@ package object model extends LabelSelector.Syntax with FieldSelector.Syntax {
.addParam("fieldSelector", fieldSelector.map(_.asQuery))
.addParam("labelSelector", labelSelector.map(_.asQuery))
.addParam("allowWatchBookmarks", if (allowBookmarks) Some("true") else None)
.addParam("sendInitialEvents", if (sendInitialEvents) Some("true") else None)
// Per the K8s doc (https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists):
// "When you set sendInitialEvents=true in the query string, Kubernetes also requires that you set
// resourceVersionMatch to NotOlderThan value."
.addParam("resourceVersionMatch", if (sendInitialEvents) Some("NotOlderThan") else None)
}

val k8sDateTimeFormatter: DateTimeFormatter = DateTimeFormatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ final class TestResourceClient[T: K8sObject: Encoder, DeleteResult] private (
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
labelSelector: Option[LabelSelector] = None,
sendInitialEvents: Boolean = false,
readTimeout: Duration = Duration.Infinity
): Stream[K8sFailure, TypedWatchEvent[T]] =
ZStream.fromTQueue(events).filter {
case Reseted() => true
Expand Down
Loading

0 comments on commit ed3b7ba

Please sign in to comment.