diff --git a/backend/lib/edgehog/application.ex b/backend/lib/edgehog/application.ex index 770dc69ac..3ebaaf321 100644 --- a/backend/lib/edgehog/application.ex +++ b/backend/lib/edgehog/application.ex @@ -57,6 +57,8 @@ defmodule Edgehog.Application do {Finch, name: EdgehogFinch}, # Start the UpdateCampaigns supervisor Edgehog.UpdateCampaigns.Supervisor, + # Start the Deployments supervisor + Edgehog.Containers.Release.Deployment.Supervisor, # Start the Tenant Reconciler Supervisor {Edgehog.Tenants.Reconciler.Supervisor, tenant_to_trigger_url_fun: tenant_to_trigger_url_fun}, # Start the Endpoint (http/https) diff --git a/backend/lib/edgehog/containers/container/changes/deploy_container_on_device.ex b/backend/lib/edgehog/containers/container/changes/deploy_container_on_device.ex index d8e2eb257..1c99c7254 100644 --- a/backend/lib/edgehog/containers/container/changes/deploy_container_on_device.ex +++ b/backend/lib/edgehog/containers/container/changes/deploy_container_on_device.ex @@ -32,7 +32,7 @@ defmodule Edgehog.Containers.Container.Changes.DeployContainerOnDevice do Ash.Changeset.after_action(changeset, fn _changeset, deployment -> with {:ok, deployment} <- Ash.load(deployment, [:device, container: [:image, :networks]]), - {:ok, _image_deployment} <- deploy_image(deployment, tenant), + {:ok, image_deployment} <- deploy_image(deployment, tenant), {:ok, _device} <- Devices.send_create_container_request(deployment.device, deployment.container, tenant: tenant) do {:ok, deployment} diff --git a/backend/lib/edgehog/containers/container/changes/emit_if_new.ex b/backend/lib/edgehog/containers/container/changes/emit_if_new.ex new file mode 100644 index 000000000..aa6d60df5 --- /dev/null +++ b/backend/lib/edgehog/containers/container/changes/emit_if_new.ex @@ -0,0 +1,34 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Container.Changes.EmitIfNew do + @moduledoc false + use Ash.Resource.Change + + alias Edgehog.PubSub + + def change(changeset, _opts, _context) do + Ash.Changeset.after_transaction(changeset, fn _changeset, result -> + with {:ok, deployment} <- result do + PubSub.publish!(:available_container, deployment) + end + end) + end +end diff --git a/backend/lib/edgehog/containers/containers.ex b/backend/lib/edgehog/containers/containers.ex index 36a934129..dddeed3aa 100644 --- a/backend/lib/edgehog/containers/containers.ex +++ b/backend/lib/edgehog/containers/containers.ex @@ -140,6 +140,7 @@ defmodule Edgehog.Containers do resource Image.Deployment do define :deploy_image, action: :deploy, args: [:image_id, :device_id] define :fetch_image_deployment, action: :read, get_by_identity: :image_instance + define :image_deployment_sent, action: :sent define :image_deployment_unpulled, action: :unpulled define :image_deployment_pulled, action: :pulled define :image_deployment_errored, action: :errored, args: [:message] @@ -168,6 +169,7 @@ defmodule Edgehog.Containers do resource Network.Deployment do define :deploy_network, action: :deploy, args: [:network_id, :device_id] define :fetch_network_deployment, action: :read, get_by_identity: :network_instance + define :network_deployment_sent, action: :sent define :network_deployment_available, action: :available define :network_deployment_unavailable, action: :unavailable define :network_deployment_errored, action: :errored, args: [:message] diff --git a/backend/lib/edgehog/containers/image/changes/deploy_image_on_device.ex b/backend/lib/edgehog/containers/image/changes/deploy_image_on_device.ex index a6cb697a1..1b5ae54ae 100644 --- a/backend/lib/edgehog/containers/image/changes/deploy_image_on_device.ex +++ b/backend/lib/edgehog/containers/image/changes/deploy_image_on_device.ex @@ -18,7 +18,7 @@ # SPDX-License-Identifier: Apache-2.0 # -defmodule Edgehog.Containers.Image.Changes.DeployImageOnDevice do +defmodule Edgehog.Containers.Image.Deployment.Changes.DeployImageOnDevice do @moduledoc false use Ash.Resource.Change diff --git a/backend/lib/edgehog/containers/image/changes/emit_if_new.ex b/backend/lib/edgehog/containers/image/changes/emit_if_new.ex new file mode 100644 index 000000000..df441ee34 --- /dev/null +++ b/backend/lib/edgehog/containers/image/changes/emit_if_new.ex @@ -0,0 +1,36 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Image.Changes.EmitIfNew do + @moduledoc false + use Ash.Resource.Change + + alias Edgehog.PubSub + + def change(changeset, _opts, _context) do + Ash.Changeset.after_transaction(changeset, fn _changeset, result -> + with {:ok, deployment} <- result do + PubSub.publish!(:available_image, deployment) + end + + result + end) + end +end diff --git a/backend/lib/edgehog/containers/image/deployment.ex b/backend/lib/edgehog/containers/image/deployment.ex index 2e1dabc90..a70e1405a 100644 --- a/backend/lib/edgehog/containers/image/deployment.ex +++ b/backend/lib/edgehog/containers/image/deployment.ex @@ -24,7 +24,7 @@ defmodule Edgehog.Containers.Image.Deployment do domain: Edgehog.Containers, extensions: [AshGraphql.Resource, AshStateMachine] - alias Edgehog.Containers.Image.Changes + alias Edgehog.Containers.Image.Deployment.Changes state_machine do initial_states([:created, :sent]) @@ -58,6 +58,9 @@ defmodule Edgehog.Containers.Image.Deployment do change transition_state(:created) change manage_relationship(:device_id, :device, type: :append) change Changes.DeployImageOnDevice + end + + update :sent do change transition_state(:sent) end diff --git a/backend/lib/edgehog/containers/network/changes/emit_if_new.ex b/backend/lib/edgehog/containers/network/changes/emit_if_new.ex new file mode 100644 index 000000000..e89c930f8 --- /dev/null +++ b/backend/lib/edgehog/containers/network/changes/emit_if_new.ex @@ -0,0 +1,34 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Network.Changes.EmitIfNew do + @moduledoc false + use Ash.Resource.Change + + alias Edgehog.PubSub + + def change(changeset, _opts, _context) do + Ash.Changeset.after_transaction(changeset, fn _changeset, result -> + with {:ok, deployment} <- result do + PubSub.publish!(:available_network, deployment) + end + end) + end +end diff --git a/backend/lib/edgehog/containers/network/deployment.ex b/backend/lib/edgehog/containers/network/deployment.ex index df9fbc43c..47c100770 100644 --- a/backend/lib/edgehog/containers/network/deployment.ex +++ b/backend/lib/edgehog/containers/network/deployment.ex @@ -59,6 +59,9 @@ defmodule Edgehog.Containers.Network.Deployment do change manage_relationship(:device_id, :device, type: :append) change Changes.DeployNetworkOnDevice + end + + update :sent do change transition_state(:sent) end diff --git a/backend/lib/edgehog/containers/release/deployment/changes/create_deployment_on_device.ex b/backend/lib/edgehog/containers/release/deployment/changes/create_deployment_on_device.ex index 92f43b2d9..2cd000698 100644 --- a/backend/lib/edgehog/containers/release/deployment/changes/create_deployment_on_device.ex +++ b/backend/lib/edgehog/containers/release/deployment/changes/create_deployment_on_device.ex @@ -23,6 +23,7 @@ defmodule Edgehog.Containers.Release.Deployment.Changes.CreateDeploymentOnDevice use Ash.Resource.Change alias Edgehog.Containers + alias Edgehog.Containers.Release.Deployment.CheckerSupervisor alias Edgehog.Devices @impl Ash.Resource.Change @@ -32,9 +33,17 @@ defmodule Edgehog.Containers.Release.Deployment.Changes.CreateDeploymentOnDevice Ash.Changeset.after_action(changeset, fn _changeset, deployment -> with {:ok, deployment} <- Ash.load(deployment, [:device, release: [:containers, :networks]]), - :ok <- deploy_containers(deployment, tenant), - :ok <- deploy_networks(deployment, tenant), + {:ok, containers, images} <- deploy_containers(deployment, tenant), + {:ok, networks} <- deploy_networks(deployment, tenant), {:ok, _device} <- Devices.send_create_deployment_request(deployment.device, deployment) do + data = %{ + deployment: deployment, + containers: containers, + images: images, + networks: networks + } + + CheckerSupervisor.start_checker!(data) {:ok, deployment} end end) @@ -44,10 +53,15 @@ defmodule Edgehog.Containers.Release.Deployment.Changes.CreateDeploymentOnDevice containers = deployment.release.containers device = deployment.device - Enum.reduce_while(containers, :ok, fn container, _acc -> + Enum.reduce_while(containers, {:ok, [], []}, fn container, {:ok, containers_deployments, image_deployments} -> case Containers.deploy_container(container.id, device.id, tenant: tenant) do - {:ok, _container_deployment} -> {:cont, :ok} - error -> {:halt, error} + {:ok, container_deployment} -> + new_containers = [container_deployment | containers_deployments] + new_images = [image_deployment.resource | image_deployments] + {:cont, {:ok, new_containers, new_images}} + + error -> + {:halt, error} end end) end @@ -56,9 +70,9 @@ defmodule Edgehog.Containers.Release.Deployment.Changes.CreateDeploymentOnDevice containers = deployment.release.networks device = deployment.device - Enum.reduce_while(containers, :ok, fn network, _acc -> + Enum.reduce_while(containers, {:ok, []}, fn network, {:ok, deployments} -> case Containers.deploy_network(network.id, device.id, tenant: tenant) do - {:ok, _network_deployment} -> {:cont, :ok} + {:ok, network_deployment} -> {:cont, {:ok, [network_deployment | deployments]}} error -> {:halt, error} end end) diff --git a/backend/lib/edgehog/containers/release/deployment/checker.ex b/backend/lib/edgehog/containers/release/deployment/checker.ex new file mode 100644 index 000000000..71e3a0165 --- /dev/null +++ b/backend/lib/edgehog/containers/release/deployment/checker.ex @@ -0,0 +1,203 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Release.Deployment.Checker do + @moduledoc false + + use GenStateMachine, restart: :transient, callback_mode: [:handle_event_function, :state_enter] + + alias __MODULE__, as: Data + alias Edgehog.Containers + alias Edgehog.PubSub + + require Logger + + defstruct [ + :tenant, + :deployment, + :networks, + :containers, + :images + ] + + def start_link(args) do + name = args[:name] || __MODULE__ + + GenStateMachine.start_link(__MODULE__, args, name: name) + end + + @impl GenStateMachine + def init(opts) do + tenant = Keyword.fetch!(opts, :tenant) + deployment = Keyword.fetch!(opts, :deployment) + networks = Keyword.fetch!(opts, :networks) + containers = Keyword.fetch!(opts, :containers) + images = Keyword.fetch!(opts, :images) + + data = %Data{ + tenant: tenant, + deployment: deployment, + networks: networks, + containers: containers, + images: images + } + + if opts[:wait_for_start_execution] do + # Use this to manually start the executor in tests + {:ok, :wait_for_start_execution, data} + else + {:ok, :initialization, data, internal_event(:init_data)} + end + end + + # State: :wait_for_start_execution + + @impl GenStateMachine + def handle_event(:enter, _old_state, :wait_for_start_execution, _data) do + :keep_state_and_data + end + + def handle_event(:info, :start_execution, :wait_for_start_execution, data) do + {:next_state, :initialization, data, internal_event(:init_data)} + end + + # State: initialization + + @impl GenStateMachine + def handle_event(:enter, _old_state, :initialization, data) do + %{deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :initialization state") + + :keep_state_and_data + end + + def handle_event(:internal, :init_data, :initialization, data) do + %{deployment: deployment, containers: containers, networks: networks, images: images} = data + + Enum.each( + [deployment | containers ++ networks ++ containers ++ images], + &PubSub.subscribe_to_events_for/1 + ) + + {:next_state, :check_images, data} + end + + # State: check images + + @impl GenStateMachine + def handle_event(:enter, _old_state, :check_images, data) do + %{images: images, deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :check_images state") + + if images == [], + do: {:next_state, :check_networks, data}, + else: :keep_state_and_data + end + + def handle_event(:info, {:available_image, image_id}, :check_images, data) do + %{images: images} = data + + # drop an image if recived + new_images = Enum.reject(images, fn image -> image.id == image_id end) + new_data = Map.put(data, :images, new_images) + + if new_images == [], + do: {:next_state, :check_networks, new_data}, + else: {:keep_state, new_data} + end + + # State: check networks + + @impl GenStateMachine + def handle_event(:enter, _old_state, :check_networks, data) do + %{networks: networks, deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :check_networks state") + + if networks == [], + do: {:next_state, :check_containers, data}, + else: :keep_state_and_data + end + + def handle_event(:info, {:available_network, network_id}, :check_networks, data) do + %{networks: networks} = data + + # drop a network if recived + new_networks = Enum.reject(networks, fn network -> network.id == network_id end) + new_data = Map.put(data, :networks, new_networks) + + if new_networks == [], + do: {:next_state, :check_containers, new_data}, + else: {:keep_state, new_data} + end + + # State: check containers + + @impl GenStateMachine + def handle_event(:enter, _old_state, :check_containers, data) do + %{containers: containers, deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :check_containers state") + + if containers == [], + do: {:next_state, :check_deployment, data}, + else: :keep_state_and_data + end + + def handle_event(:info, {:available_container, container_id}, :check_containers, data) do + %{containers: containers} = data + + # drop a container if recived + new_containers = Enum.reject(containers, fn container -> container.id == container_id end) + new_data = Map.put(data, :containers, new_containers) + + if new_containers == [], + do: {:next_state, :check_deployment, new_data}, + else: {:keep_state, new_data} + end + + # State: check deployment + + @impl GenStateMachine + def handle_event(:enter, _old_state, :check_containers, data) do + %{deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :check_deployment state") + + :keep_state_and_data + end + + def handle_event(:info, :deployment_available, :check_deployment, data) do + %{deployment: deployment} = data + + PubSub.publish!(:available_deployment, deployment) + _ = Containers.run_ready_actions!(deployment) + + {:stop, :normal} + end + + # Helper functions + + defp internal_event(payload) do + {:next_event, :internal, payload} + end +end diff --git a/backend/lib/edgehog/containers/release/deployment/checker_supervisor.ex b/backend/lib/edgehog/containers/release/deployment/checker_supervisor.ex new file mode 100644 index 000000000..a090dd9f3 --- /dev/null +++ b/backend/lib/edgehog/containers/release/deployment/checker_supervisor.ex @@ -0,0 +1,97 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Release.Deployment.CheckerSupervisor do + @moduledoc false + use DynamicSupervisor + + alias Edgehog.Containers.Release.Deployment + alias Edgehog.Containers.Release.Deployment.CheckerRegistry + + def start_link(init_arg) do + DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + def start_checker!(data) do + %{ + deployment: deployment, + containers: containers, + images: images, + networks: networks + } = data + + tenant = deployment.tenant_id + checker_id = {tenant, deployment.id} + name = {:via, Registry, {CheckerRegistry, checker_id}} + + base_args = [ + name: name, + deployment: deployment, + containers: containers, + networks: networks, + images: images, + tenant: tenant + ] + + child_spec = + deployment + |> checker_child_spec(base_args) + |> Supervisor.child_spec(id: deployment.id) + + case DynamicSupervisor.start_child(__MODULE__, child_spec) do + {:ok, pid} -> + pid + + {:error, {:already_started, pid}} -> + pid + + {:error, reason} -> + msg = + "Release Deployment checker for deployment #{deployment.id} failed to start: " <> + "#{inspect(reason)}" + + raise msg + end + end + + defp checker_child_spec(deployment, base_args) do + # During tests we add `:wait_for_start_execution` to avoid having the checker running + # without us being ready to test it + args = base_args ++ checker_test_args(deployment) + + {Deployment.Checker, args} + end + + if Mix.env() == :test do + # Pass additional checker-specific test args only in the test env + defp checker_test_args(%Deployment{} = _deployment) do + [wait_for_start_execution: true] + end + else + defp checker_test_args(_deployment), do: [] + end + + # Callbacks + + @impl DynamicSupervisor + def init(_init_arg) do + DynamicSupervisor.init(strategy: :one_for_one) + end +end diff --git a/backend/lib/edgehog/containers/release/deployment/deployer.ex b/backend/lib/edgehog/containers/release/deployment/deployer.ex new file mode 100644 index 000000000..cf2ae7626 --- /dev/null +++ b/backend/lib/edgehog/containers/release/deployment/deployer.ex @@ -0,0 +1,116 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Release.Deployment.Deployer do + @moduledoc false + use GenStateMachine, restart: :transient, callback_mode: [:handle_event_function, :state_enter] + + alias __MODULE__, as: Data + alias Edgehog.Containers + alias Edgehog.PubSub + + require Logger + + defstruct [ + :tenant, + :deployment, + :release, + :networks, + :containers, + :images + ] + + def start_link(args) do + name = args[:name] || __MODULE__ + + GenStateMachine.start_link(__MODULE__, args, name: name) + end + + @impl GenStateMachine + def init(opts) do + deployment = Keyword.fetch!(opts, :deployment) + + data = %Data{ + tenant: deployment.tenant_id, + deployment: deployment, + } + + if opts[:wait_for_start_execution] do + # Use this to manually start the executor in tests + {:ok, :wait_for_start_execution, data} + else + {:ok, :init, data, internal_event(:init_data)} + end + end + + + # State: :wait_for_start_execution + + @impl GenStateMachine + def handle_event(:enter, _old_state, :wait_for_start_execution, _data) do + :keep_state_and_data + end + + def handle_event(:info, :start_execution, :wait_for_start_execution, data) do + {:next_state, :initialization, data, internal_event(:init_data)} + end + + # State: :init + + @impl GenStateMachine + def handle_event(:enter, _old_state, :init, data) do + %{deployment: deployment} = data + + Logger.info("Release Deployment #{deployment.id}: entering :init state") + + :keep_state_and_data + end + + def handle_event(:internal, :init_data, :init, data) do + %{deployment: deployment} = data + + case Ash.load(deployment, [device: [], release: [containers: [:image], networks: []]]) do + {:ok, deployment} -> + release = deployment.release + containers = release.containers + networks = release.networks + images = containers |> Enum.map(& &1.image) + + new_data = Enum.into(data, %{containers: containers, networks: networks, images: images}) + {:new_state, :send_images, new_data} + + {:error, error} -> + # DB error, log and stop + end + end + # State: :send_images :internal + # State: :check_images + # State: :send_networks :internal + # State: :check_networks + # State: :send_containers :internal + # State: :check_containers + + # Helpers + defp terminate_on_error(deployment, error) do + Logger.error("Deployer for #{deployment.id} terminated on error: #{inspect(error)}") + + {:stop, :normal} + end +end diff --git a/backend/lib/edgehog/containers/release/deployment/supervisor.ex b/backend/lib/edgehog/containers/release/deployment/supervisor.ex new file mode 100644 index 000000000..72fa8d729 --- /dev/null +++ b/backend/lib/edgehog/containers/release/deployment/supervisor.ex @@ -0,0 +1,52 @@ +# +# This file is part of Edgehog. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +defmodule Edgehog.Containers.Release.Deployment.Supervisor do + @moduledoc false + use Supervisor + + alias Edgehog.Containers.Release.Deployment.CheckerRegistry + alias Edgehog.Containers.Release.Deployment.CheckerSupervisor + + @base_children [ + {Registry, name: CheckerRegistry, keys: :unique}, + CheckerSupervisor + ] + + @mix_env Mix.env() + + def start_link(init_arg) do + Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl Supervisor + def init(_init_arg) do + @mix_env + |> children() + |> Supervisor.init(strategy: :rest_for_one) + end + + @dialyzer {:nowarn_function, children: 1} + + # This could be easy for testing + defp children(_env) do + @base_children + end +end diff --git a/backend/lib/edgehog/pubsub.ex b/backend/lib/edgehog/pubsub.ex index b729e2637..363fda2eb 100644 --- a/backend/lib/edgehog/pubsub.ex +++ b/backend/lib/edgehog/pubsub.ex @@ -23,9 +23,19 @@ defmodule Edgehog.PubSub do This module implements a PubSub system for events happening inside Edgehog """ + alias Edgehog.Containers.Container + alias Edgehog.Containers.Image + alias Edgehog.Containers.Network + alias Edgehog.Containers.Release alias Edgehog.OSManagement.OTAOperation - @type event :: :ota_operation_created | :ota_operation_updated | :release_deployment_available + @type event :: + :ota_operation_created + | :ota_operation_updated + | :available_deployment + | :available_image + | :available_network + | :available_container @doc """ Publish an event to the PubSub. Raises if any of the publish fails. @@ -44,8 +54,45 @@ defmodule Edgehog.PubSub do payload = {event, ota_operation} topics = [ - topic_for_subject(ota_operation), - wildcard_topic_for_subject(ota_operation) + topic_for_subject(ota_operation) + ] + + broadcast_many!(topics, payload) + end + + def publish!(:available_deployment = event, %Release.Deployment{} = deployment) do + topics = [ + topic_for_subject(deployment) + ] + + broadcast_many!(topics, event) + end + + def publish!(:available_image = event, %Image.Deployment{} = deployment) do + payload = {event, deployment.id} + + topics = [ + topic_for_subject(deployment) + ] + + broadcast_many!(topics, payload) + end + + def publish!(:available_network = event, %Network.Deployment{} = deployment) do + payload = {event, deployment.id} + + topics = [ + topic_for_subject(deployment) + ] + + broadcast_many!(topics, payload) + end + + def publish!(:available_container = event, %Container.Deployment{} = deployment) do + payload = {event, deployment.id} + + topics = [ + topic_for_subject(deployment) ] broadcast_many!(topics, payload) @@ -75,6 +122,10 @@ defmodule Edgehog.PubSub do defp topic_for_subject(subject) defp topic_for_subject(%OTAOperation{id: id}), do: "ota_operations:#{id}" + defp topic_for_subject(%Release.Deployment{id: id}), do: "release_deployment:#{id}" + defp topic_for_subject(%Image.Deployment{id: id}), do: "image_deployment:#{id}" + defp topic_for_subject(%Network.Deployment{id: id}), do: "network_deployment:#{id}" + defp topic_for_subject(%Container.Deployment{id: id}), do: "container_deployment:#{id}" defp topic_for_subject({:ota_operation, id}), do: "ota_operations:#{id}" defp topic_for_subject(:ota_operations), do: "ota_operations:*" defp topic_for_subject(subject) when is_atom(subject), do: Atom.to_string(subject)