Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(capabilities): Add capabilites message type #1049

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
`booleanarray`, `longintegerarray`, `stringarray`, `datetimearray`, `binaryblobarray`.
- [astarte_export] Added a new command for exporting by device_id.
`mix astarte.export $REALM $FILE_XML $DEVICE_ID`
- [astarte_data_updater_plant] Handle `capabilities` messages,
which allow the device to declare whether it supports optional
Astarte MQTT v1 protocol features. For the moment,
just purge properties compression format is customizable.

## [1.2.1] - Unreleased
### Changed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 - 2023 SECO Mind Srl
# Copyright 2017 - 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
alias Astarte.Core.CQLUtils
alias Astarte.DataUpdaterPlant.Config
alias Astarte.Core.Device
alias Astarte.Core.Device.Capabilities
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping
alias Astarte.Core.Mapping.EndpointsAutomaton
Expand Down Expand Up @@ -69,6 +70,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
# TODO change this, we want extended device IDs to fall in the same process
{realm, device_id} = sharding_key

capabilities = %Capabilities{
purge_properties_compression_format: :zlib
}

state = %State{
realm: realm,
device_id: device_id,
Expand All @@ -90,7 +95,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
trigger_id_to_policy_name: %{},
discard_messages: false,
last_deletion_in_progress_refresh: 0,
last_datastream_maximum_retention_refresh: 0
last_datastream_maximum_retention_refresh: 0,
capabilities: capabilities
}

encoded_device_id = Device.encode_device_id(device_id)
Expand All @@ -104,9 +110,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do

{:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(db_client)

{:ok, capabilities} = Queries.fetch_device_capabilities(realm, device_id)

new_state =
Map.merge(state, stats_and_introspection)
|> Map.put(:datastream_maximum_storage_retention, ttl)
|> Map.put(:capabilities, capabilities)

{:ok, new_state}
end
Expand Down Expand Up @@ -141,6 +150,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
%{@control_path_header => control_path} = headers
handle_control(state, control_path, payload, timestamp)

"capabilities" ->
handle_capabilities(state, payload, timestamp)

_ ->
# Ack all messages for now
{:ack, :ok, state}
Expand Down Expand Up @@ -1188,6 +1200,30 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
{:ok, new_state}
end

@impl true
def handle_continue({:malformed_capabilities_message, payload, timestamp}, state) do
:telemetry.execute(
[:astarte, :data_updater_plant, :data_updater, :discarded_capabilities_message],
%{},
%{realm: state.realm}
)

base64_payload = Base.encode64(payload)

error_metadata = %{
"base64_payload" => base64_payload
}

execute_device_error_triggers(
state,
"malformed_capabilities_message",
error_metadata,
timestamp
)

{:ok, state}
end

defp path_ttl(nil) do
nil
end
Expand Down Expand Up @@ -1608,6 +1644,42 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
{:ack, :ok, final_state}
end

def handle_capabilities(state, payload, timestamp) do
%{device_id: device_id, realm: realm} = state

Logger.debug("Handling capability trough payload: #{inspect(Base.encode64(payload))}",
tag: "handling_capabilities"
)

with {:ok, bson_payload} <- Cyanide.decode(payload),
changeset = Capabilities.changeset(state.capabilities, bson_payload),
{:ok, capabilities} <- Ecto.Changeset.apply_action(changeset, :update) do
Queries.set_device_capabilities(realm, device_id, capabilities)

new_state = %State{state | capabilities: capabilities}

Logger.debug(
"Successfully handled capability. New device capabilities: #{inspect(capabilities)}",
tag: "handling_capabilities"
)

{:ack, :ok, new_state}
else
error -> malformed_capabilities_message(error, payload, state, timestamp)
end
end

defp malformed_capabilities_message(error, payload, state, timestamp) do
Logger.warning(
"Unexpected error while processing payload #{inspect(Base.encode64(payload))}: #{error}",
tag: "malformed_capabilities_message"
)

{:ok, new_state} = ask_clean_session(state, timestamp)
continue_arg = {:malformed_capabilities_message, payload, timestamp}
{:discard, :malformed_capabilities_message, new_state, {:continue, continue_arg}}
end

def handle_control(%State{discard_messages: true} = state, _, _, _) do
{:ack, :discard_messages, state}
end
Expand Down Expand Up @@ -1641,9 +1713,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do

# TODO: check payload size, to avoid anoying crashes

<<_size_header::size(32), zlib_payload::binary>> = payload

case PayloadsDecoder.safe_inflate(zlib_payload) do
case decode_payload(state, payload) do
{:ok, decoded_payload} ->
:ok = prune_device_properties(new_state, decoded_payload, timestamp_ms)

Expand Down Expand Up @@ -1727,6 +1797,17 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
{:discard, :unexpected_control_message, new_state, {:continue, continue_arg}}
end

defp decode_payload(%State{capabilities: capabilities} = _state, payload) do
case Map.get(capabilities, :purge_properties_compression_format) do
:zlib ->
<<_size_header::size(32), zlib_payload::binary>> = payload
PayloadsDecoder.safe_inflate(zlib_payload)

:plaintext ->
{:ok, payload}
end
end

defp delete_volatile_trigger(
state,
{obj_id, _obj_type},
Expand Down Expand Up @@ -2673,7 +2754,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do

# TODO: use the returned byte count in stats
with {:ok, _bytes} <-
send_consumer_properties_payload(state.realm, state.device_id, abs_paths_list) do
send_consumer_properties_payload(
state.realm,
state.device_id,
abs_paths_list,
state.purge_properties_compression_format
) do
:ok
end
end
Expand Down Expand Up @@ -2742,15 +2828,21 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

defp send_consumer_properties_payload(realm, device_id, abs_paths_list) do
defp send_consumer_properties_payload(realm, device_id, abs_paths_list, compression_format) do
topic = "#{realm}/#{Device.encode_device_id(device_id)}/control/consumer/properties"

uncompressed_payload = Enum.join(abs_paths_list, ";")

payload_size = byte_size(uncompressed_payload)
compressed_payload = :zlib.compress(uncompressed_payload)
payload =
case compression_format do
:zlib ->
payload_size = byte_size(uncompressed_payload)
compressed_payload = :zlib.compress(uncompressed_payload)
<<payload_size::unsigned-big-integer-size(32), compressed_payload::binary>>

payload = <<payload_size::unsigned-big-integer-size(32), compressed_payload::binary>>
:plaintext ->
uncompressed_payload
end

case VMQPlugin.publish(topic, payload, 2) do
{:ok, %{local_matches: local, remote_matches: remote}} when local + remote == 1 ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
alias Astarte.Core.CQLUtils
alias Astarte.Core.Device
alias Astarte.Core.Device.Capabilities
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping
alias Astarte.DataUpdaterPlant.Config
Expand Down Expand Up @@ -450,6 +451,90 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
end
end

def fetch_device_capabilities(realm, device_id) do
Xandra.Cluster.run(
:xandra,
&do_fetch_device_capabilities(&1, realm, device_id)
)
end

defp do_fetch_device_capabilities(conn, realm, device_id) do
statement = """
SELECT purge_properties_compression_format
FROM #{realm}.devices
WHERE device_id=:device_id
"""

with {:ok, prepared} <- Xandra.prepare(conn, statement),
{:ok, %Xandra.Page{} = page} <-
Xandra.execute(conn, prepared, %{"device_id" => device_id}, uuid_format: :binary) do
capabilities = Enum.map(page, &map_to_capabilities/1)

{:ok, hd(capabilities)}
else
{:error, %Xandra.Error{} = error} ->
_ =
Logger.warning(
"Database error while fetching device #{device_id} capabilities: #{Exception.message(error)}"
)

{:error, :database_error}

{:error, %Xandra.ConnectionError{} = error} ->
_ =
Logger.warning(
"Database connection error while fetching device #{device_id} capabilities: #{Exception.message(error)}"
)

{:error, :database_connection_error}
end
end

defp map_to_capabilities(change) do
%Capabilities{}
|> Capabilities.changeset(change)
|> Ecto.Changeset.apply_changes()
end

def set_device_capabilities(
realm,
device_id,
capabilities
) do
Xandra.Cluster.run(
:xandra,
&do_update_device_capabilities(&1, realm, device_id, capabilities)
)
end

defp do_update_device_capabilities(conn, realm, device_id, capabilities) do
%Capabilities{
purge_properties_compression_format: format
} = capabilities

value =
Capabilities
|> Ecto.Enum.mappings(:purge_properties_compression_format)
|> Keyword.fetch!(format)

statement = """
UPDATE #{realm}.devices
SET purge_properties_compression_format=:format
WHERE device_id=:device_id
"""

with {:ok, prepared} <- Xandra.prepare(conn, statement),
{:ok, %Xandra.Page{} = page} <-
Xandra.execute(
conn,
prepared,
%{"device_id" => device_id, "format" => value},
uuid_format: :binary
) do
Enum.map(page, &map_to_capabilities/1)
end
end

def set_device_connected!(db_client, device_id, timestamp_ms, ip_address) do
set_connection_info!(db_client, device_id, timestamp_ms, ip_address)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 - 2023 SECO Mind Srl
# Copyright 2017 - 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.State do
:trigger_id_to_policy_name,
:discard_messages,
:last_deletion_in_progress_refresh,
:last_datastream_maximum_retention_refresh
:last_datastream_maximum_retention_refresh,
:capabilities
]
end
3 changes: 2 additions & 1 deletion apps/astarte_data_updater_plant/mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 - 2023 SECO Mind Srl
# Copyright 2017 - 2025 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,7 @@ defmodule Astarte.DataUpdaterPlant.Mixfile do
{:amqp, "~> 3.3"},
{:castore, "~> 1.0.0"},
{:cyanide, "~> 2.0"},
{:ecto_sql, "~> 3.12"},
{:excoveralls, "~> 0.15", only: :test},
{:mississippi, github: "secomind/mississippi"},
{:mox, "~> 1.0", only: :test},
Expand Down
5 changes: 3 additions & 2 deletions apps/astarte_data_updater_plant/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "3.3.2", "6cad7469957b29c517a26a27474828f1db28278a13bcc2e7970db9854a3d3080", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "f977c41d81b65a21234a9158e6491b2296f8bd5bda48d5b611a64b6e0d2c3f31"},
"amqp_client": {:hex, :amqp_client, "3.12.14", "2b677bc3f2e2234ba7517042b25d72071a79735042e91f9116bd3c176854b622", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "5f70b6c3b1a739790080da4fddc94a867e99f033c4b1edc20d6ff8b8fb4bd160"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "48eba8429624ae27ec8c9009adce41aa0280cc1f", []},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "ccb81bebb22bf2c2475f3f3e6d3778f185198b41", []},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "5b7f25eded8d5c0d3492c2622c33ff2d961acb46", []},
"astarte_rpc": {:git, "https://github.com/astarte-platform/astarte_rpc.git", "225d179ed87e8a1899626d29d0d7b73f19325120", []},
"castore": {:hex, :castore, "1.0.11", "4bbd584741601eb658007339ea730b082cc61f3554cf2e8f39bf693a11b49073", [:mix], [], "hexpm", "e03990b4db988df56262852f20de0f659871c35154691427a5047f4967a16a62"},
Expand All @@ -14,12 +14,13 @@
"cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"},
"db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"},
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []},
"ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"},
"ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"},
"ecto_sql": {:hex, :ecto_sql, "3.12.1", "c0d0d60e85d9ff4631f12bafa454bc392ce8b9ec83531a412c12a0d415a3a4d0", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "aff5b958a899762c5f09028c847569f7dfb9cc9d63bdb8133bff8a5546de6bf5"},
"efx": {:hex, :efx, "0.2.6", "ec7c42b05073e6fdc61d971cc02d366f73f40d8093272a5326d16861003036b0", [:mix], [{:process_tree, "0.1.2", [hex: :process_tree, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "7648dcfd05f9ac39b257d4a9aa7c5b243823fee31933e725b2d346371b6d6bc4"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
Expand Down
Loading
Loading