Skip to content

Commit

Permalink
refactor: add SubnetInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
avilagaston9 committed Jun 10, 2024
1 parent 670f088 commit b3ae855
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 49 deletions.
87 changes: 38 additions & 49 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Store.Db
alias LambdaEthereumConsensus.Store.Db
alias Types.SubnetInfo

@behaviour Handler
@attestations_prefix "attestations"
@attnet_prefix "attnet"
@subnet_prefix "subnet"

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
Expand Down Expand Up @@ -83,8 +83,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do

defp update_enr() do
enr_fork_id = compute_enr_fork_id()
%{attnets: attnets, syncnets: syncnets} = P2P.Metadata.get_metadata()
Libp2pPort.update_enr(enr_fork_id, attnets, syncnets)
%{attnets: subnets, syncnets: syncnets} = P2P.Metadata.get_metadata()
Libp2pPort.update_enr(enr_fork_id, subnets, syncnets)
end

defp compute_enr_fork_id() do
Expand All @@ -107,20 +107,20 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do

@impl true
def handle_call({:collect, subnet_id, attestation}, _from, _state) do
persist_attestations(subnet_id, [attestation])
persist_attnet(subnet_id, attestation.data)
subnet_info = SubnetInfo.new_subnet_with_attestation(attestation)
persist_subnet_info(subnet_info, subnet_id)
Libp2pPort.subscribe_to_topic(topic(subnet_id), __MODULE__)
{:reply, :ok, nil}
end

def handle_call({:stop_collecting, subnet_id}, _from, _state) do
if has_attnet?(subnet_id) do
collected = fetch_attestations!(subnet_id)
delete_attestations(subnet_id)
delete_subnet(subnet_id)
{:reply, {:ok, collected}, nil}
else
{:reply, {:error, "subnet not joined"}, nil}
case fetch_subnet_info(subnet_id) do
{:ok, subnet_info} ->
delete_subnet(subnet_id)
{:reply, {:ok, subnet_info.attestations}, nil}

:not_found ->
{:reply, {:error, "subnet not joined"}, nil}
end
end

Expand All @@ -132,7 +132,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
{:ok, attestation} <- Ssz.from_ssz(uncompressed, Types.Attestation) do
# TODO: validate before accepting
Libp2pPort.validate_message(msg_id, :accept)
aggregate_attestation(subnet_id, attestation)

fetch_subnet_info!(subnet_id)
|> SubnetInfo.aggregate_attestation(attestation)
|> persist_subnet_info(subnet_id)

{:noreply, nil}
else
{:error, _} ->
Expand All @@ -147,50 +151,35 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
id_with_trailer |> String.trim_trailing("/ssz_snappy") |> String.to_integer()
end

defp aggregate_attestation(subnet_id, attestation) do
if fetch_attnet!(subnet_id) == attestation.data do
attestations = [attestation | fetch_attestations!(subnet_id)]
persist_attestations(subnet_id, attestations)
end
end

defp persist_attestations(subnet_id, attestations) do
:telemetry.span([:attestations, :persist], %{}, fn ->
@spec persist_subnet_info(SubnetInfo.t(), non_neg_integer()) :: :ok
defp persist_subnet_info(subnet_info, subnet_id) do
:telemetry.span([:subnet, :persist], %{}, fn ->
{Db.put(
@attestations_prefix <> Integer.to_string(subnet_id),
:erlang.term_to_binary(attestations)
@subnet_prefix <> Integer.to_string(subnet_id),
SubnetInfo.encode(subnet_info)
), %{}}
end)
end

defp fetch_attestations!(subnet_id) do
{:ok, attestations} =
:telemetry.span([:attestations, :fetch], %{}, fn ->
{Db.get(@attestations_prefix <> Integer.to_string(subnet_id)), %{}}
@spec fetch_subnet_info(non_neg_integer()) :: {:ok, SubnetInfo.t()} | :not_found
defp fetch_subnet_info(subnet_id) do
result =
:telemetry.span([:subnet, :fetch], %{}, fn ->
{Db.get(@subnet_prefix <> Integer.to_string(subnet_id)), %{}}
end)

:erlang.binary_to_term(attestations)
end

defp persist_attnet(subnet_id, data) do
:telemetry.span([:attnet, :persist], %{}, fn ->
{Db.put(@attnet_prefix <> Integer.to_string(subnet_id), :erlang.term_to_binary(data)), %{}}
end)
case result do
{:ok, binary} -> {:ok, SubnetInfo.decode(binary)}
:not_found -> result
end
end

defp fetch_attnet!(subnet_id) do
{:ok, data} =
:telemetry.span([:attnet, :fetch], %{}, fn ->
{Db.get(@attnet_prefix <> Integer.to_string(subnet_id)), %{}}
end)

:erlang.binary_to_term(data)
@spec fetch_subnet_info!(non_neg_integer()) :: SubnetInfo.t()
defp fetch_subnet_info!(subnet_id) do
{:ok, subnet_info} = fetch_subnet_info(subnet_id)
subnet_info
end

defp has_attnet?(subnet_id), do: Db.has_key?(@attnet_prefix <> Integer.to_string(subnet_id))

defp delete_attestations(subnet_id),
do: Db.delete(@attestations_prefix <> Integer.to_string(subnet_id))

defp delete_subnet(subnet_id), do: Db.delete(@attnet_prefix <> Integer.to_string(subnet_id))
@spec delete_subnet(non_neg_integer()) :: :ok
defp delete_subnet(subnet_id), do: Db.delete(@subnet_prefix <> Integer.to_string(subnet_id))
end
42 changes: 42 additions & 0 deletions lib/types/subnet_info.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Types.SubnetInfo do
@moduledoc """
Struct to hold subnet attestations for easier db storing:
- data: An attestation data.
- attestations: List with all the collected Attestations.
"""
alias Types.Attestation
alias Types.AttestationData
alias Types.SubnetInfo

defstruct [:data, :attestations]

@type t :: %__MODULE__{
data: AttestationData.t(),
attestations: Attestation
}

@spec new_subnet_with_attestation(Attestation.t()) :: t()
def new_subnet_with_attestation(attestation) do
%SubnetInfo{data: attestation.data, attestations: [attestation]}
end

@spec aggregate_attestation(t(), Attestation.t()) :: t()
def aggregate_attestation(subnet_info, attestation) do
if subnet_info.data == attestation.data do
%SubnetInfo{subnet_info | attestations: [attestation | subnet_info.attestations]}
else
subnet_info
end
end

@spec encode(t()) :: binary()
def encode(%__MODULE__{} = subnet_info) do
{subnet_info.data, subnet_info.attestations} |> :erlang.term_to_binary()
end

@spec decode(binary()) :: t()
def decode(bin) do
{data, attestations} = :erlang.binary_to_term(bin)
%__MODULE__{data: data, attestations: attestations}
end
end

0 comments on commit b3ae855

Please sign in to comment.