From ed3b7ba37a6aaf342990bb1f295596c8cf7d3cf0 Mon Sep 17 00:00:00 2001 From: Aditya Prasad Date: Fri, 31 May 2024 01:19:49 -0500 Subject: [PATCH] Support for sendInitialEvents flag on watches. (#598) * - Allow user to set sendInitialEvents (supported in k8s 1.28+). - Change HTTP read timeout to infinity, to reduce the chances that the stream gets closed and reopened (possibly with a too-old resourceVersion). - Add a test for ResourceClient. * - Running scalafmt. - Adding a missing case match for ErrorEvent. - Passing parameters that were missed in Resource.scala. * Must set resourceVersionMatch=NotOlderThan when sendInitialEvents=true --- .../coralogix/zio/k8s/client/K8sFailure.scala | 10 + .../coralogix/zio/k8s/client/Resource.scala | 89 +++++++-- .../zio/k8s/client/impl/ResourceClient.scala | 57 ++++-- .../k8s/client/impl/ResourceClientBase.scala | 6 +- .../k8s/client/model/ParsedWatchEvent.scala | 12 +- .../zio/k8s/client/model/package.scala | 8 +- .../k8s/client/test/TestResourceClient.scala | 4 +- .../k8s/client/impl/ResourceClientSpec.scala | 171 ++++++++++++++++++ .../client/model/ParsedWatchEventSpec.scala | 53 ++++++ .../client/test/TestResourceClientSpec.scala | 3 +- .../zio/k8s/operator/OperatorFailure.scala | 24 ++- .../operator/leader/locks/LeaseLockSpec.scala | 4 +- 12 files changed, 393 insertions(+), 48 deletions(-) create mode 100644 zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/impl/ResourceClientSpec.scala create mode 100644 zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEventSpec.scala diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/K8sFailure.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/K8sFailure.scala index a261043da..1cbcdc945 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/K8sFailure.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/K8sFailure.scala @@ -2,7 +2,10 @@ package com.coralogix.zio.k8s.client import _root_.io.circe import cats.data.NonEmptyList +import com.coralogix.zio.k8s.client.model.ParsedWatchEvent import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.Status +import io.circe.Decoder +import io.circe.generic.semiauto.deriveDecoder import sttp.model.StatusCode import zio.ZIO @@ -90,6 +93,13 @@ case object Gone extends K8sFailure */ final case class InvalidEvent(requestInfo: K8sRequestInfo, eventType: String) extends K8sFailure +final case class ErrorEvent(status: String, message: String, reason: String, code: Int) + extends K8sFailure + +object ErrorEvent { + implicit val errorDecoder: Decoder[ErrorEvent] = deriveDecoder[ErrorEvent] +} + /** Error produced by the generated getter methods on Kubernetes data structures. * * Indicates that the requested field is not present. diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/Resource.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/Resource.scala index 0263d5ad8..fefc96093 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/Resource.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/Resource.scala @@ -64,6 +64,10 @@ trait Resource[T] { * Constrain the returned items by field selectors. Not all fields are supported by the server. * @param labelSelector * Constrain the returned items by label selectors. + * @param sendInitialEvents + * Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s + * 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending + * updates. * @return * A stream of watch events */ @@ -71,7 +75,9 @@ trait Resource[T] { namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): Stream[K8sFailure, TypedWatchEvent[T]] /** Infinite watch stream of resource change events of type @@ -88,6 +94,10 @@ trait Resource[T] { * Constrain the returned items by field selectors. Not all fields are supported by the server. * @param labelSelector * Constrain the returned items by label selectors. + * @param sendInitialEvents + * Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s + * 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending + * updates. * @return * A stream of watch events */ @@ -95,10 +105,20 @@ trait Resource[T] { namespace: Option[K8sNamespace], resourceVersion: Option[String] = None, fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): ZStream[Any, K8sFailure, TypedWatchEvent[T]] = - ZStream.succeed(Reseted[T]()) ++ watch(namespace, resourceVersion, fieldSelector, labelSelector) - .retry(Schedule.recurWhileEquals(Gone)) + ZStream.succeed(Reseted[T]()) ++ + watch( + namespace, + resourceVersion, + fieldSelector, + labelSelector, + sendInitialEvents, + readTimeout + ) + .retry(Schedule.recurWhileEquals(Gone)) /** Get a resource by its name * @param name @@ -199,6 +219,10 @@ trait NamespacedResource[T] { * Constrain the returned items by field selectors. Not all fields are supported by the server. * @param labelSelector * Constrain the returned items by label selectors. + * @param sendInitialEvents + * Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s + * 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending + * updates. * @return * A stream of watch events */ @@ -206,9 +230,18 @@ trait NamespacedResource[T] { namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): Stream[K8sFailure, TypedWatchEvent[T]] = - asGenericResource.watch(namespace, resourceVersion, fieldSelector, labelSelector) + asGenericResource.watch( + namespace, + resourceVersion, + fieldSelector, + labelSelector, + sendInitialEvents, + readTimeout + ) /** Infinite watch stream of resource change events of type * [[com.coralogix.zio.k8s.client.model.TypedWatchEvent]] @@ -222,6 +255,10 @@ trait NamespacedResource[T] { * Constrain the returned items by field selectors. Not all fields are supported by the server. * @param labelSelector * Constrain the returned items by label selectors. + * @param sendInitialEvents + * Whether to set sendInitialEvents=true in the k8s watch request. Only has an effect in k8s + * 1.28+. If set, k8s returns all existing resources as synthetic Added events before sending + * updates. * @return * A stream of watch events */ @@ -229,9 +266,19 @@ trait NamespacedResource[T] { namespace: Option[K8sNamespace], resourceVersion: Option[String] = None, fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): ZStream[Any, K8sFailure, TypedWatchEvent[T]] = - asGenericResource.watchForever(namespace, resourceVersion, fieldSelector, labelSelector) + asGenericResource + .watchForever( + namespace, + resourceVersion, + fieldSelector, + labelSelector, + sendInitialEvents, + readTimeout + ) /** Get a resource by its name * @param name @@ -331,9 +378,18 @@ trait ClusterResource[T] { def watch( resourceVersion: Option[String], fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): Stream[K8sFailure, TypedWatchEvent[T]] = - asGenericResource.watch(None, resourceVersion, fieldSelector, labelSelector) + asGenericResource.watch( + None, + resourceVersion, + fieldSelector, + labelSelector, + sendInitialEvents, + readTimeout + ) /** Infinite watch stream of resource change events of type * [[com.coralogix.zio.k8s.client.model.TypedWatchEvent]] @@ -350,9 +406,18 @@ trait ClusterResource[T] { def watchForever( resourceVersion: Option[String] = None, fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): ZStream[Any, K8sFailure, TypedWatchEvent[T]] = - asGenericResource.watchForever(None, resourceVersion, fieldSelector, labelSelector) + asGenericResource.watchForever( + None, + resourceVersion, + fieldSelector, + labelSelector, + sendInitialEvents, + readTimeout + ) /** Get a resource by its name * @param name diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClient.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClient.scala index 671e7ef18..75750bbd5 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClient.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClient.scala @@ -119,7 +119,9 @@ final class ResourceClient[ namespace: Option[K8sNamespace], fieldSelector: Option[FieldSelector], labelSelector: Option[LabelSelector], - resourceVersion: Option[String] + resourceVersion: Option[String], + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): Stream[K8sFailure, ParsedWatchEvent[T]] = { val reqInfo = K8sRequestInfo(resourceType, "watch", namespace, fieldSelector, labelSelector, None) @@ -127,16 +129,18 @@ final class ResourceClient[ .unwrap { handleFailures("watch", namespace, fieldSelector, labelSelector, None) { k8sRequest - .get(watching(namespace, resourceVersion, fieldSelector, labelSelector)) + .get( + watching(namespace, resourceVersion, fieldSelector, labelSelector, sendInitialEvents) + ) .response(asStreamUnsafeWithError) - .readTimeout(10.minutes.asScala) + .readTimeout(readTimeout.asScala) .send(backend.value) }.map(_.mapError(RequestFailure(reqInfo, _))) } .via( ZPipeline.fromChannel( ZPipeline.utf8Decode.channel.mapError(CodingFailure(reqInfo, _)) - ) >>> ZPipeline.splitLines + ) andThen ZPipeline.splitLines ) .mapZIO { line => for { @@ -153,22 +157,39 @@ final class ResourceClient[ namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): ZStream[Any, K8sFailure, TypedWatchEvent[T]] = ZStream.unwrap { - Ref.make(resourceVersion).map { lastResourceVersion => - ZStream - .fromZIO(lastResourceVersion.get) - .flatMap(watchStream(namespace, fieldSelector, labelSelector, _)) - .tap { - case ParsedTypedWatchEvent(event) => lastResourceVersion.set(event.resourceVersion) - case ParsedBookmark(resourceVersion) => lastResourceVersion.set(Some(resourceVersion)) - } - .collect { case ParsedTypedWatchEvent(event) => - event - } - .forever - } + for { + lastResourceVersion <- Ref.make(resourceVersion) + sendInitialEvents <- Ref.make(sendInitialEvents) + } yield ZStream + .fromZIO(lastResourceVersion.get.zip(sendInitialEvents.get)) + .flatMap { case (lastResourceVersion, sendInitialEvents) => + watchStream( + namespace, + fieldSelector, + labelSelector, + lastResourceVersion, + sendInitialEvents, + readTimeout + ) + } + .tap { + case ParsedTypedWatchEvent(event) => + lastResourceVersion.set(event.resourceVersion) + case ParsedBookmark(resourceVersion) => + // If we receive a Bookmark, it means that all initial events have been sent. If we reopen the stream, + // we should not fetch all events again. + lastResourceVersion.set(Some(resourceVersion)) *> + sendInitialEvents.set(false) + } + .collect { case ParsedTypedWatchEvent(event) => + event + } + .forever } def get(name: String, namespace: Option[K8sNamespace]): IO[K8sFailure, T] = diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClientBase.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClientBase.scala index a1f5382a0..d45385b39 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClientBase.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/impl/ResourceClientBase.scala @@ -115,7 +115,8 @@ trait ResourceClientBase { namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector], - labelSelector: Option[LabelSelector] + labelSelector: Option[LabelSelector], + sendInitialEvents: Boolean ): Uri = K8sWatchUri( resourceType, @@ -123,7 +124,8 @@ trait ResourceClientBase { resourceVersion, allowBookmarks = true, fieldSelector, - labelSelector + labelSelector, + sendInitialEvents ).toUri( cluster ) diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEvent.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEvent.scala index 4baf35301..db5e33acb 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEvent.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEvent.scala @@ -2,13 +2,17 @@ package com.coralogix.zio.k8s.client.model import com.coralogix.zio.k8s.client.{ DeserializationFailure, + ErrorEvent, InvalidEvent, K8sFailure, K8sRequestInfo } +import com.coralogix.zio.k8s.client.ErrorEvent.errorDecoder import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.WatchEvent -import io.circe.{ Decoder, Json } +import io.circe.{ parser, Decoder, Json } import zio.{ IO, ZIO } +import io.circe.generic.semiauto.deriveDecoder +import io.circe.parser._ /** Watch event with parsed payload * @@ -75,6 +79,12 @@ object ParsedWatchEvent { case "BOOKMARK" => parseOrFail(requestInfo, event.`object`.value)(bookmarkedResourceVersion) .map(ParsedBookmark.apply) + case "ERROR" => + val json = event.`object`.value + ZIO + .fromEither(errorDecoder.decodeAccumulating(json.hcursor).toEither) + .mapError(DeserializationFailure(requestInfo, _)) + .flatMap(ZIO.fail(_)) case _ => ZIO.fail(InvalidEvent(requestInfo, event.`type`)) } diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/package.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/package.scala index 8f35082d9..800bc7681 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/package.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/model/package.scala @@ -179,7 +179,8 @@ package object model extends LabelSelector.Syntax with FieldSelector.Syntax { resourceVersion: Option[String], allowBookmarks: Boolean, fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false ) extends K8sUri { override def toUri(cluster: K8sCluster): Uri = K8sSimpleUri(resource, None, None, namespace) @@ -189,6 +190,11 @@ package object model extends LabelSelector.Syntax with FieldSelector.Syntax { .addParam("fieldSelector", fieldSelector.map(_.asQuery)) .addParam("labelSelector", labelSelector.map(_.asQuery)) .addParam("allowWatchBookmarks", if (allowBookmarks) Some("true") else None) + .addParam("sendInitialEvents", if (sendInitialEvents) Some("true") else None) + // Per the K8s doc (https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists): + // "When you set sendInitialEvents=true in the query string, Kubernetes also requires that you set + // resourceVersionMatch to NotOlderThan value." + .addParam("resourceVersionMatch", if (sendInitialEvents) Some("NotOlderThan") else None) } val k8sDateTimeFormatter: DateTimeFormatter = DateTimeFormatter diff --git a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/test/TestResourceClient.scala b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/test/TestResourceClient.scala index 0a5e9e7cf..54bd709bb 100644 --- a/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/test/TestResourceClient.scala +++ b/zio-k8s-client/src/main/scala/com/coralogix/zio/k8s/client/test/TestResourceClient.scala @@ -66,7 +66,9 @@ final class TestResourceClient[T: K8sObject: Encoder, DeleteResult] private ( namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector] = None, - labelSelector: Option[LabelSelector] = None + labelSelector: Option[LabelSelector] = None, + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): Stream[K8sFailure, TypedWatchEvent[T]] = ZStream.fromTQueue(events).filter { case Reseted() => true diff --git a/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/impl/ResourceClientSpec.scala b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/impl/ResourceClientSpec.scala new file mode 100644 index 000000000..2b5a85bee --- /dev/null +++ b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/impl/ResourceClientSpec.scala @@ -0,0 +1,171 @@ +package com.coralogix.zio.k8s.client.impl + +import com.coralogix.zio.k8s.client.config.backend.K8sBackend +import com.coralogix.zio.k8s.client.model._ +import com.coralogix.zio.k8s.client.{ ErrorEvent, Gone, K8sFailure } +import com.coralogix.zio.k8s.model.core.v1.Node +import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.{ ObjectMeta, Status, WatchEvent } +import com.coralogix.zio.k8s.model.pkg.runtime.RawExtension +import io.circe.syntax._ +import sttp.capabilities.WebSockets +import sttp.capabilities.zio.ZioStreams +import sttp.client3.HttpError +import sttp.client3.httpclient.zio.HttpClientZioBackend +import sttp.client3.testing.RecordingSttpBackend +import sttp.model.Uri.QuerySegment.KeyValue +import sttp.model._ +import zio.Task +import zio.stream.ZStream +import zio.test.{ Spec, _ } + +object ResourceClientSpec extends ZIOSpecDefault { + override def spec: Spec[TestEnvironment, Any] = + suite("ResourceClient")( + parseError, + watchResources + ) + + private def nodeToRawExtension(node: Node): RawExtension = RawExtension(node.asJson) + private val ResourceType = K8sResourceType("test", "group", "version") + + /** Simulate k8s sending an ADDED event followed by an ERROR. Tests that: + * + * 1. The watch ends. (If it didn't, the test would never finish.) 2. The ERROR event is parsed + * into the right class. + */ + val parseError: Spec[TestEnvironment, Any] = test("Ensure errors are parsed correctly") { + val addedEvt = WatchEvent( + nodeToRawExtension(Node(Some(ObjectMeta(name = "node1", resourceVersion = "1")))), + "ADDED" + ) + + val status = "Failure" + val message = "too old resource version: 33948840 (34026715)" + val reason = "Expired" + val code = 410 + val errorStr = s""" + |{ + | "type": "ERROR", + | "object": { + | "kind": "Status", + | "apiVersion": "v1", + | "metadata": {}, + | "status": "$status", + | "message": "$message", + | "reason": "$reason", + | "code": $code + | } + |}""".stripMargin.replace("\n", "") + + val resp = addedEvt.asJson.noSpaces + "\n" + errorStr + val testingBackend = new RecordingSttpBackend[Task, ZioStreams with WebSockets]( + HttpClientZioBackend.stub.whenAnyRequest + .thenRespond(Right(ZStream.fromIterable(resp.getBytes))) + ) + + val cluster = K8sCluster(Uri("http://foo/bar"), None) + val rc = new ResourceClient[Node, Status](ResourceType, cluster, K8sBackend(testingBackend)) + + for { + eventsAndFailures <- rc.watch(None, None, sendInitialEvents = true).either.runCollect + } yield { + val interactions = testingBackend.allInteractions + val failures: List[K8sFailure] = eventsAndFailures.toList.collect { case Left(x) => x } + assertTrue( + interactions.size == 1, + eventsAndFailures.length == 2, + failures.length == 1, + failures.head == ErrorEvent(status, message, reason, code) + ) + } + } + + /** Simulates the k8s server responding three times, with: + * + * 1. Some events but no bookmark. 2. Some events and a bookmark. 3. Some events and then an + * error. + * + * We test that: + * + * 1. The watch is continued when there is no error (but the stream is closed). 2. + * sendInitialEvents=true the first and second times, but not the third (because we got a + * bookmark). 3. resourceVersion is empty the first time, but increments on subsequent calls + * based on events / bookmarks. + * + * Note that this is not how a real k8s server would behave. In particular, it's not honoring the + * resourceVersions we request. It's important that the client behave properly anyway, so we + * don't artificially add that constraint. + */ + val watchResources: Spec[TestEnvironment, Any] = + test("Ensure that sendInitialEvents and resourceVersion are set correctly") { + val addedEvt = WatchEvent( + nodeToRawExtension(Node(Some(ObjectMeta(name = "node1", resourceVersion = "1")))), + "ADDED" + ) + val modifiedEvt = WatchEvent( + nodeToRawExtension(Node(Some(ObjectMeta(name = "node1", resourceVersion = "2")))), + "MODIFIED" + ) + val bookmarkEvt = WatchEvent( + nodeToRawExtension(Node(Some(ObjectMeta(name = "node2", resourceVersion = "3")))), + "BOOKMARK" + ) + + val resp1 = Seq(addedEvt, modifiedEvt) + .map(_.asJson.noSpaces) + .mkString("\n") + + val resp2 = Seq(addedEvt, modifiedEvt, bookmarkEvt) + .map(_.asJson.noSpaces) + .mkString("\n") + + val testingBackend = new RecordingSttpBackend[Task, ZioStreams with WebSockets]( + HttpClientZioBackend.stub.whenAnyRequest + .thenRespondCyclic( + Right(ZStream.fromIterable(resp1.getBytes)), + Right(ZStream.fromIterable(resp2.getBytes)), + Left(HttpError("Gone for some reason", StatusCode.Gone)) + ) + ) + + val cluster = K8sCluster(Uri("http://foo/bar"), None) + val rc = new ResourceClient[Node, Status](ResourceType, cluster, K8sBackend(testingBackend)) + + for { + eventsAndFailures <- rc.watch(None, None, sendInitialEvents = true).either.runCollect + } yield { + val interactions = testingBackend.allInteractions + val events: List[TypedWatchEvent[Node]] = eventsAndFailures.toList.collect { + case Right(x) => x + } + val failures: List[K8sFailure] = eventsAndFailures.toList.collect { case Left(x) => x } + val sendInitialEvents = interactions.map( + _._1.uri.querySegments + .exists { + case KeyValue(k, v, _, _) if k == "sendInitialEvents" && v == "true" => true + case _ => false + } + ) + val resourceVersions = interactions.map( + _._1.uri.querySegments + .collect { + case KeyValue(k, v, _, _) if k == "resourceVersion" => Integer.parseInt(v) + } + ) + assertTrue( + // The HTTP server was called 3 times. + interactions.size == 3, + // Added, Modified, Added, Modified, Gone. + eventsAndFailures.length == 5, + events.length == 4, + events.map(_.resourceVersion.get).map(Integer.parseInt) == List(1, 2, 1, 2), + failures.length == 1, + failures.head == Gone, + // sendInitialEvents should only be disabled after the first bookmark (after the second call). + sendInitialEvents == List(true, true, false), + // On the first call, unset. Then we get as far as rv 2, and finally the bookmark with 3. + resourceVersions == List(List.empty, List(2), List(3)) + ) + } + } +} diff --git a/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEventSpec.scala b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEventSpec.scala new file mode 100644 index 000000000..008da52e9 --- /dev/null +++ b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/model/ParsedWatchEventSpec.scala @@ -0,0 +1,53 @@ +package com.coralogix.zio.k8s.client.model + +import com.coralogix.zio.k8s.client.{ ErrorEvent, K8sRequestInfo } +import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.{ ObjectMeta, WatchEvent } +import io.circe.Decoder +import io.circe.parser.decode +import zio.prelude.data.Optional +import zio.test.Assertion.{ equalTo, isLeft } +import zio.test.{ assert, Spec, TestEnvironment, ZIOSpecDefault } + +object ParsedWatchEventSpec extends ZIOSpecDefault { + override def spec: Spec[TestEnvironment, Any] = + suite("Parse events")( + test("Ensure that error events are handled properly") { + val status = "Failure" + val message = "too old resource version: 33948840 (34026715)" + val reason = "Expired" + val code = 410 + + val errorStr = s""" + |{ + | "type": "ERROR", + | "object": { + | "kind": "Status", + | "apiVersion": "v1", + | "metadata": {}, + | "status": "$status", + | "message": "$message", + | "reason": "$reason", + | "code": $code + | } + |}""".stripMargin + + val expectedErrEvt = ErrorEvent(status, message, reason, code) + + val event: WatchEvent = decode[WatchEvent](errorStr).fold( + error => throw new RuntimeException(s"Could not decode watch event: $error"), + identity + ) + + implicit val WatchEventDecoder: Decoder[None.type] = Decoder.decodeNone + implicit val k8sObject: K8sObject[None.type] = new K8sObject[None.type] { + override def metadata(obj: None.type): Optional[ObjectMeta] = ??? + override def mapMetadata(f: ObjectMeta => ObjectMeta)(r: None.type): None.type = None + } + + val reqInfo: K8sRequestInfo = K8sRequestInfo(K8sResourceType("foo", "bar", "baz"), "get") + for { + result <- ParsedWatchEvent.from(reqInfo, event).either + } yield assert(result)(isLeft(equalTo(expectedErrEvt))) + } + ) +} diff --git a/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/test/TestResourceClientSpec.scala b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/test/TestResourceClientSpec.scala index 32c6398b6..952ceb86c 100644 --- a/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/test/TestResourceClientSpec.scala +++ b/zio-k8s-client/src/test/scala/com/coralogix/zio/k8s/client/test/TestResourceClientSpec.scala @@ -313,8 +313,7 @@ object TestResourceClientSpec extends ZIOSpecDefault { val fieldSelector = FieldSelector.FieldEquals(Chunk("metadata", "name"), name) for { - client <- - TestResourceClient.make[Node, Status](() => Status()) + client <- TestResourceClient.make[Node, Status](() => Status()) created <- client.create(node, None) _ <- client.create(node2, None) // bump version and change label diff --git a/zio-k8s-operator/src/main/scala/com/coralogix/zio/k8s/operator/OperatorFailure.scala b/zio-k8s-operator/src/main/scala/com/coralogix/zio/k8s/operator/OperatorFailure.scala index 2b0870d1d..0c2ff0ff5 100644 --- a/zio-k8s-operator/src/main/scala/com/coralogix/zio/k8s/operator/OperatorFailure.scala +++ b/zio-k8s-operator/src/main/scala/com/coralogix/zio/k8s/operator/OperatorFailure.scala @@ -22,35 +22,39 @@ case class OperatorError[E](error: E) extends OperatorFailure[E] object OperatorFailure { implicit val k8sFailureToThrowable: ConvertableToThrowable[K8sFailure] = { - case Unauthorized(reqInfo, message) => + case Unauthorized(reqInfo, message) => new RuntimeException(formatErrorWithK8sContext(reqInfo, s"K8s authorization error: $message")) - case HttpFailure(reqInfo, message, code) => + case HttpFailure(reqInfo, message, code) => new RuntimeException( formatErrorWithK8sContext(reqInfo, s"K8s HTTP error: $message with code $code") ) - case CodingFailure(reqInfo, failure) => + case CodingFailure(reqInfo, failure) => new RuntimeException( formatErrorWithK8sContext(reqInfo, "K8s character coding error"), failure ) - case DecodedFailure(reqInfo, status, code) => + case DecodedFailure(reqInfo, status, code) => new RuntimeException( formatErrorWithK8sContext(reqInfo, s"K8s error: ${status.message} with code $code") ) - case DeserializationFailure(reqInfo, error) => + case DeserializationFailure(reqInfo, error) => val prettyPrintedError = error.toList.map(CircePrettyFailure.prettyPrint).mkString("\n") new RuntimeException( formatErrorWithK8sContext(reqInfo, s"K8s deserialization failure: $prettyPrintedError") ) - case RequestFailure(reqInfo, reason) => + case RequestFailure(reqInfo, reason) => new RuntimeException(formatErrorWithK8sContext(reqInfo, "K8s request error"), reason) - case Gone => + case Gone => new RuntimeException(s"Gone") - case InvalidEvent(reqInfo, eventType) => + case InvalidEvent(reqInfo, eventType) => new RuntimeException(formatErrorWithK8sContext(reqInfo, s"Invalid event type: $eventType")) - case UndefinedField(fieldName) => + case UndefinedField(fieldName) => new RuntimeException(s"Undefined field $fieldName") - case NotFound => + case ErrorEvent(status, message, reason, code) => + new RuntimeException( + s"Received error event. Status $status, message: $message, reason: $reason, code: $code" + ) + case NotFound => new RuntimeException(s"Not found") } diff --git a/zio-k8s-operator/src/test/scala/com/coralogix/zio/k8s/operator/leader/locks/LeaseLockSpec.scala b/zio-k8s-operator/src/test/scala/com/coralogix/zio/k8s/operator/leader/locks/LeaseLockSpec.scala index f5fc7278f..7f1123d6c 100644 --- a/zio-k8s-operator/src/test/scala/com/coralogix/zio/k8s/operator/leader/locks/LeaseLockSpec.scala +++ b/zio-k8s-operator/src/test/scala/com/coralogix/zio/k8s/operator/leader/locks/LeaseLockSpec.scala @@ -93,7 +93,9 @@ object LeaseLockSpec extends ZIOSpecDefault { namespace: Option[K8sNamespace], resourceVersion: Option[String], fieldSelector: Option[FieldSelector], - labelSelector: Option[LabelSelector] + labelSelector: Option[LabelSelector], + sendInitialEvents: Boolean = false, + readTimeout: Duration = Duration.Infinity ): stream.Stream[K8sFailure, TypedWatchEvent[Lease]] = ZStream.unwrap { ifZIO(failSwitch.get)(