diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex index 09ba57a68..e714f92eb 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex @@ -11,10 +11,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Db alias LambdaEthereumConsensus.Store.Db + alias Types.SubnetInfo @behaviour Handler - @attestations_prefix "attestations" - @attnet_prefix "attnet" + @subnet_prefix "subnet" def start_link(init_arg) do GenServer.start_link(__MODULE__, init_arg, name: __MODULE__) @@ -83,8 +83,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do defp update_enr() do enr_fork_id = compute_enr_fork_id() - %{attnets: attnets, syncnets: syncnets} = P2P.Metadata.get_metadata() - Libp2pPort.update_enr(enr_fork_id, attnets, syncnets) + %{attnets: subnets, syncnets: syncnets} = P2P.Metadata.get_metadata() + Libp2pPort.update_enr(enr_fork_id, subnets, syncnets) end defp compute_enr_fork_id() do @@ -107,20 +107,20 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do @impl true def handle_call({:collect, subnet_id, attestation}, _from, _state) do - persist_attestations(subnet_id, [attestation]) - persist_attnet(subnet_id, attestation.data) + subnet_info = SubnetInfo.new_subnet_with_attestation(attestation) + persist_subnet_info(subnet_info, subnet_id) Libp2pPort.subscribe_to_topic(topic(subnet_id), __MODULE__) {:reply, :ok, nil} end def handle_call({:stop_collecting, subnet_id}, _from, _state) do - if has_attnet?(subnet_id) do - collected = fetch_attestations!(subnet_id) - delete_attestations(subnet_id) - delete_subnet(subnet_id) - {:reply, {:ok, collected}, nil} - else - {:reply, {:error, "subnet not joined"}, nil} + case fetch_subnet_info(subnet_id) do + {:ok, subnet_info} -> + delete_subnet(subnet_id) + {:reply, {:ok, subnet_info.attestations}, nil} + + :not_found -> + {:reply, {:error, "subnet not joined"}, nil} end end @@ -132,7 +132,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do {:ok, attestation} <- Ssz.from_ssz(uncompressed, Types.Attestation) do # TODO: validate before accepting Libp2pPort.validate_message(msg_id, :accept) - aggregate_attestation(subnet_id, attestation) + + fetch_subnet_info!(subnet_id) + |> SubnetInfo.aggregate_attestation(attestation) + |> persist_subnet_info(subnet_id) + {:noreply, nil} else {:error, _} -> @@ -147,50 +151,35 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do id_with_trailer |> String.trim_trailing("/ssz_snappy") |> String.to_integer() end - defp aggregate_attestation(subnet_id, attestation) do - if fetch_attnet!(subnet_id) == attestation.data do - attestations = [attestation | fetch_attestations!(subnet_id)] - persist_attestations(subnet_id, attestations) - end - end - - defp persist_attestations(subnet_id, attestations) do - :telemetry.span([:attestations, :persist], %{}, fn -> + @spec persist_subnet_info(SubnetInfo.t(), non_neg_integer()) :: :ok + defp persist_subnet_info(subnet_info, subnet_id) do + :telemetry.span([:subnet, :persist], %{}, fn -> {Db.put( - @attestations_prefix <> Integer.to_string(subnet_id), - :erlang.term_to_binary(attestations) + @subnet_prefix <> Integer.to_string(subnet_id), + SubnetInfo.encode(subnet_info) ), %{}} end) end - defp fetch_attestations!(subnet_id) do - {:ok, attestations} = - :telemetry.span([:attestations, :fetch], %{}, fn -> - {Db.get(@attestations_prefix <> Integer.to_string(subnet_id)), %{}} + @spec fetch_subnet_info(non_neg_integer()) :: {:ok, SubnetInfo.t()} | :not_found + defp fetch_subnet_info(subnet_id) do + result = + :telemetry.span([:subnet, :fetch], %{}, fn -> + {Db.get(@subnet_prefix <> Integer.to_string(subnet_id)), %{}} end) - :erlang.binary_to_term(attestations) - end - - defp persist_attnet(subnet_id, data) do - :telemetry.span([:attnet, :persist], %{}, fn -> - {Db.put(@attnet_prefix <> Integer.to_string(subnet_id), :erlang.term_to_binary(data)), %{}} - end) + case result do + {:ok, binary} -> {:ok, SubnetInfo.decode(binary)} + :not_found -> result + end end - defp fetch_attnet!(subnet_id) do - {:ok, data} = - :telemetry.span([:attnet, :fetch], %{}, fn -> - {Db.get(@attnet_prefix <> Integer.to_string(subnet_id)), %{}} - end) - - :erlang.binary_to_term(data) + @spec fetch_subnet_info!(non_neg_integer()) :: SubnetInfo.t() + defp fetch_subnet_info!(subnet_id) do + {:ok, subnet_info} = fetch_subnet_info(subnet_id) + subnet_info end - defp has_attnet?(subnet_id), do: Db.has_key?(@attnet_prefix <> Integer.to_string(subnet_id)) - - defp delete_attestations(subnet_id), - do: Db.delete(@attestations_prefix <> Integer.to_string(subnet_id)) - - defp delete_subnet(subnet_id), do: Db.delete(@attnet_prefix <> Integer.to_string(subnet_id)) + @spec delete_subnet(non_neg_integer()) :: :ok + defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id)) end diff --git a/lib/types/subnet_info.ex b/lib/types/subnet_info.ex new file mode 100644 index 000000000..1edb39971 --- /dev/null +++ b/lib/types/subnet_info.ex @@ -0,0 +1,42 @@ +defmodule Types.SubnetInfo do + @moduledoc """ + Struct to hold subnet attestations for easier db storing: + - data: An attestation data. + - attestations: List with all the collected Attestations. + """ + alias Types.Attestation + alias Types.AttestationData + alias Types.SubnetInfo + + defstruct [:data, :attestations] + + @type t :: %__MODULE__{ + data: AttestationData.t(), + attestations: Attestation + } + + @spec new_subnet_with_attestation(Attestation.t()) :: t() + def new_subnet_with_attestation(attestation) do + %SubnetInfo{data: attestation.data, attestations: [attestation]} + end + + @spec aggregate_attestation(t(), Attestation.t()) :: t() + def aggregate_attestation(subnet_info, attestation) do + if subnet_info.data == attestation.data do + %SubnetInfo{subnet_info | attestations: [attestation | subnet_info.attestations]} + else + subnet_info + end + end + + @spec encode(t()) :: binary() + def encode(%__MODULE__{} = subnet_info) do + {subnet_info.data, subnet_info.attestations} |> :erlang.term_to_binary() + end + + @spec decode(binary()) :: t() + def decode(bin) do + {data, attestations} = :erlang.binary_to_term(bin) + %__MODULE__{data: data, attestations: attestations} + end +end