Skip to content

Commit

Permalink
Fix broken ConfigMapLock. It used to close the scope prematurely. Add…
Browse files Browse the repository at this point in the history
… two tests. (#613)

Signed-off-by: aditpras <akprasad@gmail.com>
  • Loading branch information
monktastic authored Jul 17, 2024
1 parent e840e0b commit bcf1627
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object LeaderExample extends ZIOAppDefault {
}

private def example(): ZIO[
Any with LeaderElection.Service,
LeaderElection.Service,
Nothing,
Option[Nothing]
] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package com.coralogix.zio.k8s.operator.leader

import com.coralogix.zio.k8s.client.model.K8sNamespace
import com.coralogix.zio.k8s.model.core.v1.Pod
import zio.ZIO
import zio.{ Scope, ZIO }

/** Common interface for different lock implementations used for leader election.
*/
trait LeaderLock {
def acquireLock(
namespace: K8sNamespace,
pod: Pod
): ZIO[Any, LeaderElectionFailure[Nothing], Unit]
): ZIO[Scope, LeaderElectionFailure[Nothing], Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.{ DeleteOptions, Status }
import com.coralogix.zio.k8s.operator.OperatorFailure.k8sFailureToThrowable
import com.coralogix.zio.k8s.operator.OperatorLogging.logFailure
import com.coralogix.zio.k8s.operator.leader.{ KubernetesError, LeaderElectionFailure, LeaderLock }
import zio.{ Cause, IO, Schedule, ZIO, ZLayer }
import zio.{ Cause, IO, Schedule, Scope, ZIO, ZLayer }

abstract class LeaderForLifeLock[T: K8sObject](
lockName: String,
Expand Down Expand Up @@ -47,26 +47,22 @@ abstract class LeaderForLifeLock[T: K8sObject](
def acquireLock(
namespace: K8sNamespace,
self: Pod
): ZIO[Any, LeaderElectionFailure[Nothing], Unit] =
ZIO.scoped {
for {
alreadyOwned <- checkIfAlreadyOwned(namespace, self)
lock <-
if (alreadyOwned)
ZIO
.logAnnotate("name", "Leader") {
ZIO.logInfo(
s"Lock '$lockName' in namespace '${namespace.value}' is already owned by the current pod"
)
} flatMap { _ =>
ZIO.acquireRelease(ZIO.unit)(_ => deleteLock(lockName, namespace))
): ZIO[Scope, LeaderElectionFailure[Nothing], Unit] =
for {
alreadyOwned <- checkIfAlreadyOwned(namespace, self)
lock <-
if (alreadyOwned)
ZIO
.logAnnotate("name", "Leader") {
ZIO.logInfo(
s"Lock '$lockName' in namespace '${namespace.value}' is already owned by the current pod"
)
}
else
ZIO.acquireRelease(
tryCreateLock(namespace, self)
)(_ => deleteLock(lockName, namespace))
} yield lock
}
else
ZIO.acquireRelease(
tryCreateLock(namespace, self)
)(_ => deleteLock(lockName, namespace))
} yield lock

private def checkIfAlreadyOwned(
namespace: K8sNamespace,
Expand All @@ -91,20 +87,25 @@ abstract class LeaderForLifeLock[T: K8sObject](
self: Pod
): ZIO[Any, LeaderElectionFailure[Nothing], Unit] =
ZIO.logAnnotate("name", "Leader") {
val podName = self.metadata.flatMap(_.name).getOrElse("(unknown name)")
for {
_ <- ZIO.logInfo(s"Acquiring lock '$lockName' in namespace '${namespace.value}'")
_ <-
ZIO.logInfo(s"Pod $podName acquiring lock '$lockName' in namespace '${namespace.value}'")
lock <- makeLock(lockName, namespace, self)
finalRetryPolicy = retryPolicy && Schedule.recurWhileZIO[Any, K8sFailure] {
case DecodedFailure(_, status, code)
if status.reason.contains("AlreadyExists") =>
ZIO.logInfo(s"Lock is already taken, retrying...").as(true)
case _ =>
ZIO.succeed(false)
ZIO
.logInfo(s"Pod $podName failed to acquire lock '$lockName''")
.as(false)
}
_ <- client
.create(lock, Some(namespace))
.retry(finalRetryPolicy)
.mapError(KubernetesError.apply)
_ <- ZIO.logInfo(s"Pod $podName successfully acquired lock '$lockName'")
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class LeaseLock(
override def acquireLock(
namespace: K8sNamespace,
pod: Pod
): ZIO[Any, leader.LeaderElectionFailure[Nothing], Unit] =
ZIO.scoped(for {
): ZIO[Scope, leader.LeaderElectionFailure[Nothing], Unit] =
for {
store <- Ref.make(Option.empty[VersionedRecord])
name <- pod.getName.mapError(KubernetesError.apply)
impl = new Impl(store, namespace, name)
_ <- ZIO.acquireReleaseInterruptible(impl.acquire())(impl.release())
_ <- impl.renew().fork
} yield ())
} yield ()

class Impl(
store: Ref[Option[VersionedRecord]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.coralogix.zio.k8s.operator.OperatorLogging.logFailure
import com.coralogix.zio.k8s.operator.contextinfo.{ ContextInfo, ContextInfoFailure }
import com.coralogix.zio.k8s.operator.leader.locks.leaderlockresources.LeaderLockResources
import com.coralogix.zio.k8s.operator.leader.locks.{ ConfigMapLock, CustomLeaderLock, LeaseLock }
import zio.ZIO.logInfo
import zio._

package object leader {
Expand All @@ -24,27 +25,26 @@ package object leader {
* @param f
* Inner effect to protect
*/
def runAsLeader[R, E, A](f: ZIO[R, E, A]): ZIO[R with Any, E, Option[A]] =
def runAsLeader[R, E, A](f: ZIO[R, E, A]): ZIO[R, E, Option[A]] = ZIO.scoped[R] {
lease
.zipRight(f.mapBoth(ApplicationError.apply, Some.apply))
.catchAll((failure: LeaderElectionFailure[E]) =>
logLeaderElectionFailure(failure).as(None)
)
}

/** Creates a managed lock implementing the leader election algorithm
*/
def lease: ZIO[Any, LeaderElectionFailure[Nothing], Unit]
def lease: ZIO[Scope, LeaderElectionFailure[Nothing], Unit]
}

class Live(contextInfo: ContextInfo.Service, lock: LeaderLock) extends Service {
override def lease: ZIO[Any, LeaderElectionFailure[Nothing], Unit] =
ZIO.scoped {
for {
namespace <- contextInfo.namespace.mapError(ContextInfoError.apply)
pod <- contextInfo.pod.mapError(ContextInfoError.apply)
managedLock <- lock.acquireLock(namespace, pod)
} yield managedLock
}
override def lease: ZIO[Scope, LeaderElectionFailure[Nothing], Unit] =
for {
namespace <- contextInfo.namespace.mapError(ContextInfoError.apply)
pod <- contextInfo.pod.mapError(ContextInfoError.apply)
managedLock <- lock.acquireLock(namespace, pod)
} yield managedLock
}

class LiveTemporary(
Expand Down Expand Up @@ -229,7 +229,7 @@ package object leader {
*/
def runAsLeader[R, E, A](
f: ZIO[R, E, A]
): ZIO[R with Any with LeaderElection, E, Option[A]] =
): ZIO[R with LeaderElection, E, Option[A]] =
ZIO.serviceWithZIO[LeaderElection](_.runAsLeader(f))

/** Creates a managed lock implementing the leader election algorithm
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.coralogix.zio.k8s.operator.leader.locks

import com.coralogix.zio.k8s.client.model.K8sNamespace
import com.coralogix.zio.k8s.client.v1.configmaps.ConfigMaps
import com.coralogix.zio.k8s.client.v1.pods.Pods
import com.coralogix.zio.k8s.model.core.v1.Pod
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.ObjectMeta
import com.coralogix.zio.k8s.operator.contextinfo.ContextInfo
import com.coralogix.zio.k8s.operator.leader
import com.coralogix.zio.k8s.operator.leader.LeaderElection
import zio.ZIO.logInfo
import zio.test.TestAspect.timeout
import zio.test.{ assertTrue, Spec, TestEnvironment, ZIOSpecDefault }
import zio.{ durationInt, Promise, Schedule, ZIO, ZLayer }

object ConfigMapLockSpec extends ZIOSpecDefault {
private val sharedLayers = ZLayer.make[Pods with ConfigMaps](
// The function here is only for deletes and should never be called.
Pods.test(() => ???),
ConfigMaps.test
)

private def makePod(name: String) =
Pod(metadata = Some(ObjectMeta(name = name, uid = Some(name))))

// Schedule.stop is used so that if acquisition fails, it does not retry.
private val leaderElectionLayer = LeaderElection.configMapLock("abc", retryPolicy = Schedule.stop)

override def spec: Spec[TestEnvironment, Any] =
suite("ConfigMap based leader election")(
singleLeaderTest,
reentranceTest
).provide(sharedLayers) @@ timeout(5.seconds)

// A ZIO that hogs the lock and never completes. The promise is there to let the caller know that it has started.
def lockHogger(promise: Promise[Nothing, Unit]): ZIO[Any, Nothing, Unit] = for {
_ <- promise.succeed(())
_ <- logInfo("Running locked code")
_ <- ZIO.never
} yield ()

val singleLeaderTest: Spec[Pods with ConfigMaps, Any] =
test("second fiber should fail to acquire lock") {
// We create two ContextInfos so that the two ZIOs trying to get locks can pretend to be different Pods competing
// for the same lock.
val ciLayer1 = ContextInfo.test(makePod("pod1"), K8sNamespace.default)
val ciLayer2 = ContextInfo.test(makePod("pod2"), K8sNamespace.default)
for {
promise <- Promise.make[Nothing, Unit]
fiber <- leader
.runAsLeader(lockHogger(promise))
.provideSome[ConfigMaps with Pods](ciLayer1, leaderElectionLayer)
.fork
// Ensure that fiber has acquired the lock.
_ <- promise.await
// This will return None since it cannot get the lock.
result <- leader
.runAsLeader(ZIO.unit)
.provideSome[ConfigMaps with Pods](ciLayer2, leaderElectionLayer)
_ <- fiber.interrupt
} yield assertTrue(result.isEmpty)
}

// The lock should be reentrant.
val reentranceTest: Spec[Pods with ConfigMaps, Any] =
test("a pod that already has the lock should be able to reuse it") {
val ciLayer = ContextInfo.test(makePod("pod1"), K8sNamespace.default)
for {
promise <- Promise.make[Nothing, Unit]
fiber <- leader
.runAsLeader(lockHogger(promise))
.provideSome[ConfigMaps with Pods](ciLayer, leaderElectionLayer)
.fork
// Ensure that fiber1 has acquired the lock.
_ <- promise.await
// Now reusing the ContextInfo, so it's the same Pod. It already has the lock, so should run successfully.
result <- leader
.runAsLeader(ZIO.unit)
.provideSome[ConfigMaps with Pods](ciLayer, leaderElectionLayer)
_ <- fiber.interrupt
} yield assertTrue(result.nonEmpty)
}
}

0 comments on commit bcf1627

Please sign in to comment.