diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b70566..cf9acce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +- Add `kubernetes_use_cached_resources` option to Kubernetes strategy + +## 3.4.1 + - Use new cypher names - Allow Epmd strategy to reconnect after connection failures - Detect Self Signed Certificate Authority for Kubernetes Strategy diff --git a/lib/strategy/kubernetes.ex b/lib/strategy/kubernetes.ex index e365699..ae83791 100644 --- a/lib/strategy/kubernetes.ex +++ b/lib/strategy/kubernetes.ex @@ -25,6 +25,7 @@ defmodule Cluster.Strategy.Kubernetes do - `:kubernetes_selector` - `:kubernetes_service_name` - `:kubernetes_ip_lookup_mode` + - `:kubernetes_use_cached_resources` - `:mode` ## Getting `` @@ -70,6 +71,11 @@ defmodule Cluster.Strategy.Kubernetes do Then, this strategy will fetch the IP of all pods with that label and attempt to connect. + ### `kubernetes_use_cached_resources` option + + When setting this value, this strategy will use cached resource version value to fetch k8s resources. + In k8s resources are incremented by 1 on every change, this version will set requested resourceVersion + to 0, that will use cached versions of resources, take in mind that this may be outdated or unavailable. ### `:mode` option @@ -362,6 +368,9 @@ defmodule Cluster.Strategy.Kubernetes do selector = Keyword.fetch!(config, :kubernetes_selector) ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints) + use_cache = Keyword.get(config, :kubernetes_use_cached_resources, false) + resource_version = if use_cache, do: 0, else: nil + master_name = Keyword.get(config, :kubernetes_master, @kubernetes_master) cluster_domain = System.get_env("CLUSTER_DOMAIN", "#{cluster_name}.local") @@ -380,12 +389,19 @@ defmodule Cluster.Strategy.Kubernetes do cond do app_name != nil and selector != nil -> - selector = URI.encode(selector) + query_params = + [] + |> apply_param(:labelSelector, selector) + |> apply_param(:resourceVersion, resource_version) + |> URI.encode_query(:rfc3986) path = case ip_lookup_mode do - :endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}" - :pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}" + :endpoints -> + "api/v1/namespaces/#{namespace}/endpoints?#{query_params}" + + :pods -> + "api/v1/namespaces/#{namespace}/pods?#{query_params}" end headers = [{~c"authorization", ~c"Bearer #{token}"}] @@ -440,6 +456,12 @@ defmodule Cluster.Strategy.Kubernetes do end end + defp apply_param(params, key, value) when value != nil do + [{key, value} | params] + end + + defp apply_param(params, _key, _value), do: params + defp parse_response(:endpoints, resp) do case resp do %{"items" => items} when is_list(items) -> diff --git a/test/fixtures/vcr_cassettes/kubernetes.json b/test/fixtures/vcr_cassettes/kubernetes.json index b92ea50..5490851 100644 --- a/test/fixtures/vcr_cassettes/kubernetes.json +++ b/test/fixtures/vcr_cassettes/kubernetes.json @@ -31,6 +31,38 @@ "type": "ok" } }, + { + "request": { + "body": "", + "headers": { + "authorization": "***" + }, + "method": "get", + "options": { + "httpc_options": [], + "http_options": { + "ssl": "[verify: :verify_none]" + } + }, + "request_body": "", + "url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/endpoints?labelSelector=app=test_selector&resourceVersion=0" + }, + "response": { + "binary": false, + "body": "{\"kind\":\"EndpointsList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"subsets\":[{\"addresses\":[{\"hostname\":\"my-hostname-0\",\"ip\":\"10.48.33.136\",\"nodeName\":\"gke-jshmrtn-cluster-default-pool-a61da41f-db9x\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037783\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}]}\n", + "headers": { + "date": "Fri, 26 Jan 2018 13:18:46 GMT", + "content-length": "877", + "content-type": "application/json" + }, + "status_code": [ + "HTTP/1.1", + 200, + "OK" + ], + "type": "ok" + } + }, { "request": { "body": "", diff --git a/test/fixtures/vcr_cassettes/kubernetes_pods.json b/test/fixtures/vcr_cassettes/kubernetes_pods.json index 222095f..6dcebe9 100644 --- a/test/fixtures/vcr_cassettes/kubernetes_pods.json +++ b/test/fixtures/vcr_cassettes/kubernetes_pods.json @@ -30,5 +30,37 @@ ], "type": "ok" } + }, + { + "request": { + "body": "", + "headers": { + "authorization": "***" + }, + "method": "get", + "options": { + "httpc_options": [], + "http_options": { + "ssl": "[verify: :verify_none]" + } + }, + "request_body": "", + "url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/pods?labelSelector=app=test_selector&resourceVersion=0" + }, + "response": { + "binary": false, + "body": "{\"kind\":\"PodList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"spec\": { \"hostname\": \"my-hostname-0\" },\"status\":{\"podIP\": \"10.48.33.136\"}}]}\n", + "headers": { + "date": "Fri, 26 Jan 2018 13:18:46 GMT", + "content-length": "877", + "content-type": "application/json" + }, + "status_code": [ + "HTTP/1.1", + 200, + "OK" + ], + "type": "ok" + } } ] diff --git a/test/kubernetes_test.exs b/test/kubernetes_test.exs index 2a86b70..512ad6d 100644 --- a/test/kubernetes_test.exs +++ b/test/kubernetes_test.exs @@ -81,6 +81,60 @@ defmodule Cluster.Strategy.KubernetesTest do end end + test "works with cached resources" do + use_cassette "kubernetes", custom: true do + capture_log(fn -> + start_supervised!({Kubernetes, + [ + %Cluster.Strategy.State{ + topology: :name, + config: [ + kubernetes_node_basename: "test_basename", + kubernetes_selector: "app=test_selector", + kubernetes_use_cached_resources: true, + # If you want to run the test freshly, you'll need to create a DNS Entry + kubernetes_master: "cluster.localhost.", + kubernetes_service_account_path: + Path.join([__DIR__, "fixtures", "kubernetes", "service_account"]) + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[]]} + } + ]}) + + assert_receive {:connect, _}, 5_000 + end) + end + end + + test "works with no cached resources" do + use_cassette "kubernetes", custom: true do + capture_log(fn -> + start_supervised!({Kubernetes, + [ + %Cluster.Strategy.State{ + topology: :name, + config: [ + kubernetes_node_basename: "test_basename", + kubernetes_selector: "app=test_selector", + kubernetes_use_cached_resources: false, + # If you want to run the test freshly, you'll need to create a DNS Entry + kubernetes_master: "cluster.localhost.", + kubernetes_service_account_path: + Path.join([__DIR__, "fixtures", "kubernetes", "service_account"]) + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[]]} + } + ]}) + + assert_receive {:connect, _}, 5_000 + end) + end + end + test "works with dns and cluster_name" do use_cassette "kubernetes", custom: true do capture_log(fn -> @@ -201,6 +255,34 @@ defmodule Cluster.Strategy.KubernetesTest do end end + test "works with pods and cached resources" do + use_cassette "kubernetes_pods", custom: true do + capture_log(fn -> + start_supervised!({Kubernetes, + [ + %Cluster.Strategy.State{ + topology: :name, + config: [ + kubernetes_node_basename: "test_basename", + kubernetes_selector: "app=test_selector", + # If you want to run the test freshly, you'll need to create a DNS Entry + kubernetes_master: "cluster.localhost.", + kubernetes_ip_lookup_mode: :pods, + kubernetes_use_cached_resources: true, + kubernetes_service_account_path: + Path.join([__DIR__, "fixtures", "kubernetes", "service_account"]) + ], + connect: {Nodes, :connect, [self()]}, + disconnect: {Nodes, :disconnect, [self()]}, + list_nodes: {Nodes, :list_nodes, [[]]} + } + ]}) + + assert_receive {:connect, :"test_basename@10.48.33.136"}, 5_000 + end) + end + end + test "works with pods and dns" do use_cassette "kubernetes_pods", custom: true do capture_log(fn ->