Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into update-readme
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkenan committed Jul 26, 2024
2 parents ebb366b + 111036d commit fc5daae
Show file tree
Hide file tree
Showing 39 changed files with 1,559 additions and 812 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,7 @@ RUN mix compile
ARG IEX_ARGS=""
ENV IEX_ARGS_VALUE=${IEX_ARGS}

# TODO: This could be an issue regarding OS signals, we should use JSONArgs but shell form is the
# only way to pass args to ENTRYPOINT, specially important because of the cookie. Best
# solution would be to move to releases and avoid starting the node manually through iex.
ENTRYPOINT iex $IEX_ARGS_VALUE -S mix run -- $0 $@
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ kurtosis.setup.lambdaconsensus:
kurtosis.start:
kurtosis run --enclave lambdanet $(KURTOSIS_DIR) --args-file network_params.yaml

#💻 kurtosis.build-and-start: @ Builds the lambdaconsensus Docker image and starts the kurtosis environment.
kurtosis.clean-start: kurtosis.clean kurtosis.setup.lambdaconsensus kurtosis.start

#💻 kurtosis.stop: @ Stops the kurtosis environment
kurtosis.stop:
kurtosis enclave stop lambdanet
Expand Down Expand Up @@ -163,7 +166,7 @@ checkpoint-sync: compile-all

#▶️ sepolia: @ Run an interactive terminal using sepolia network
sepolia: compile-all
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia
iex -S mix run -- --checkpoint-sync-url https://sepolia.beaconstate.info --network sepolia --metrics

#▶️ holesky: @ Run an interactive terminal using holesky network
holesky: compile-all
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ Also of note is the `:sort` option, that allows sorting the list by, for example
:etop.start(sort: :msg_q)
```

_Note: If you want to use the `:observer` GUI and not just `etop`, you'll probably need `:wx` also set in your extra applications, there is an easy way to do this, just set the `EXTRA_APPLICATIONS` environment variable to `WX` (`export EXTRA_APPLICATIONS=WX`) before starting the node_

### eFlambè

When optimizing code, it might be useful to have a graphic way to determine bottlenecks in the system.
Expand Down
65 changes: 27 additions & 38 deletions bench/block_processing.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,54 @@ alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.StateTransition.Cache
alias LambdaEthereumConsensus.Store
alias LambdaEthereumConsensus.Store.BlockBySlot
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.StateDb
alias Types.BeaconState
alias Types.BlockInfo
alias Types.SignedBeaconBlock
alias Types.StateInfo

Logger.configure(level: :warning)

{:ok, _} = Store.Db.start_link([])
{:ok, _} = Store.Blocks.start_link([])
{:ok, _} = Store.BlockStates.start_link([])
Cache.initialize_cache()

# NOTE: this slot must be at the beginning of an epoch (i.e. a multiple of 32)
slot = 4_213_280
slot = 9_591_424

IO.puts("fetching blocks...")
{:ok, %BeaconState{} = state} = StateDb.get_state_by_slot(slot)
{:ok, %StateInfo{beacon_state: state}} = StateDb.get_state_by_slot(slot)
{:ok, %BlockInfo{signed_block: block}} = BlockDb.get_block_info_by_slot(slot)
{:ok, %BlockInfo{signed_block: new_block}} = BlockDb.get_block_info_by_slot(slot + 1)
{:ok, %BlockInfo{signed_block: new_block} = block_info} = BlockDb.get_block_info_by_slot(slot + 1)

IO.puts("initializing store...")
{:ok, store} = Types.Store.get_forkchoice_store(state, block)
store = Handlers.on_tick(store, store.time + 30)

attestations = new_block.message.body.attestations
attester_slashings = new_block.message.body.attester_slashings

{:ok, root} = BlockDb.get_block_root_by_slot(slot)
{:ok, root} = BlockBySlot.get(slot)

IO.puts("about to process block: #{slot + 1}, with root: #{Base.encode16(root)}...")
IO.puts("#{length(attestations)} attestations ; #{length(attester_slashings)} attester slashings")
IO.puts("")

on_block = fn ->
# process block attestations
{:ok, new_store} = Handlers.on_block(store, new_block)

{:ok, new_store} =
attestations
|> ForkChoice.apply_handler(new_store, &Handlers.on_attestation(&1, &2, true))

# process block attester slashings
{:ok, _} =
attester_slashings
|> ForkChoice.apply_handler(new_store, &Handlers.on_attester_slashing/2)
if System.get_env("FLAMA") do
Flama.run({ForkChoice, :process_block, [block_info, store]})
else
Benchee.run(
%{
"block (full cache)" => fn ->
ForkChoice.process_block(block_info, store)
end
},
time: 30
)

Benchee.run(
%{
"block (empty cache)" => fn _ ->
ForkChoice.process_block(block_info, store)
end
},
time: 30,
before_each: fn _ -> Cache.clear_cache() end
)
end

Benchee.run(
%{
"block (full cache)" => fn -> on_block.() end
},
time: 30
)

Benchee.run(
%{
"block (empty cache)" => fn _ -> on_block.() end
},
time: 30,
before_each: fn _ -> Cache.clear_cache() end
)
16 changes: 16 additions & 0 deletions lib/beacon_api/controllers/v1/node_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ defmodule BeaconApi.V1.NodeController do
def open_api_operation(:identity),
do: ApiSpec.spec().paths["/eth/v1/node/identity"].get

def open_api_operation(:version),
do: ApiSpec.spec().paths["/eth/v1/node/version"].get

@spec health(Plug.Conn.t(), any) :: Plug.Conn.t()
def health(conn, params) do
# TODO: respond with syncing status if we're still syncing
Expand Down Expand Up @@ -46,4 +49,17 @@ defmodule BeaconApi.V1.NodeController do
}
})
end

@spec version(Plug.Conn.t(), any) :: Plug.Conn.t()
def version(conn, _params) do
version = Application.spec(:lambda_ethereum_consensus)[:vsn]
arch = :erlang.system_info(:system_architecture)

conn
|> json(%{
"data" => %{
"version" => "Lambda/#{version}/#{arch}"
}
})
end
end
1 change: 1 addition & 0 deletions lib/beacon_api/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule BeaconApi.Router do
scope "/node" do
get("/health", NodeController, :health)
get("/identity", NodeController, :identity)
get("/version", NodeController, :version)
end
end

Expand Down
3 changes: 3 additions & 0 deletions lib/lambda_ethereum_consensus/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule LambdaEthereumConsensus.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
alias LambdaEthereumConsensus.Store.CheckpointStates

use Application
require Logger
Expand Down Expand Up @@ -33,6 +34,8 @@ defmodule LambdaEthereumConsensus.Application do
end

defp get_children(:db) do
CheckpointStates.new()

[
LambdaEthereumConsensus.Telemetry,
LambdaEthereumConsensus.Store.Db,
Expand Down
40 changes: 29 additions & 11 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.BlockDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.CheckpointStates
alias LambdaEthereumConsensus.Store.StateDb
alias LambdaEthereumConsensus.Store.StoreDb
alias LambdaEthereumConsensus.Validator.ValidatorManager
Expand Down Expand Up @@ -197,31 +198,48 @@ defmodule LambdaEthereumConsensus.ForkChoice do
end
end

@spec apply_handler(any(), any(), any()) :: any()
defp apply_handler(iter, state, handler) do
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
def apply_handler(iter, name, state, handler) do
Metrics.span_operation(name, nil, nil, fn ->
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
end)
end)
end

@spec process_block(BlockInfo.t(), Store.t()) :: Store.t()
defp process_block(%BlockInfo{signed_block: signed_block} = block_info, store) do
def process_block(%BlockInfo{signed_block: signed_block} = block_info, store) do
with {:ok, new_store} <- Handlers.on_block(store, block_info),
# process block attestations
{:ok, new_store} <-
signed_block.message.body.attestations
|> apply_handler(new_store, &Handlers.on_attestation(&1, &2, true)),
process_attestations(new_store, signed_block.message.body.attestations),
# process block attester slashings
{:ok, new_store} <-
signed_block.message.body.attester_slashings
|> apply_handler(new_store, &Handlers.on_attester_slashing/2) do
Handlers.prune_checkpoint_states(new_store)
|> apply_handler(:attester_slashings, new_store, &Handlers.on_attester_slashing/2) do
{:ok, new_store}
end
end

defp process_attestations(store, attestations) do
# prefetch states:
states =
attestations
|> Enum.map(& &1.data.target)
|> Enum.uniq()
|> Enum.flat_map(fn ch ->
case CheckpointStates.get_checkpoint_state(ch) do
{:ok, state} -> [{ch, state}]
_other -> []
end
end)
|> Map.new()

attestations
|> apply_handler(:attestations, store, &Handlers.on_attestation(&1, &2, true, states))
end

@spec recompute_head(Store.t()) :: :ok
def recompute_head(store) do
{:ok, head_root} = Head.get_head(store)
Expand Down
36 changes: 26 additions & 10 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
end
end

def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block) do
with {:ok, target_state} <- CheckpointStates.get_checkpoint_state(attestation.data.target) do
on_attestation_with_state(store, attestation, is_from_block, target_state)
end
end

def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block, states) do
case Map.fetch(states, attestation.data.target) do
{:ok, target_state} ->
on_attestation_with_state(store, attestation, is_from_block, target_state)

:error ->
if is_from_block do
{:ok, store}
else
{:error, "Checkpoint state not found for attestation."}
end
end
end

@doc """
Run ``on_attestation`` upon receiving a new ``attestation`` from either within a block or directly on the wire.
Expand All @@ -123,10 +143,14 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
"""
@spec on_attestation(Store.t(), Attestation.t(), boolean()) ::
{:ok, Store.t()} | {:error, String.t()}
def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block) do
def on_attestation_with_state(
%Store{} = store,
%Attestation{} = attestation,
is_from_block,
target_state
) do
with :ok <- check_attestation_valid(store, attestation, is_from_block),
# Get state at the `target` to fully validate attestation
{:ok, target_state} <- CheckpointStates.get_checkpoint_state(attestation.data.target),
{:ok, indexed_attestation} <-
Accessors.get_indexed_attestation(target_state, attestation),
:ok <- check_valid_indexed_attestation(target_state, indexed_attestation) do
Expand Down Expand Up @@ -407,14 +431,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
if target.epoch in [current_epoch, previous_epoch], do: :ok, else: {:error, "future epoch"}
end

@doc """
Removes the checkpoint states that are prior to the store's finalized checkpoint from
the key-value store.
"""
def prune_checkpoint_states(%Store{} = store) do
CheckpointStates.prune(store.finalized_checkpoint)
end

def update_latest_messages(%Store{} = store, attesting_indices, %Attestation{data: data}) do
%AttestationData{target: target, beacon_block_root: beacon_block_root} = data
messages = store.latest_messages
Expand Down
44 changes: 28 additions & 16 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,6 @@ defmodule LambdaEthereumConsensus.Metrics do
block_status_execute(root, new_status, slot, 1)
end

defp block_status_execute(root, status, slot, value) do
hex_root = Base.encode16(root)

Logger.debug(
"[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}"
)

:telemetry.execute([:blocks, :status], %{total: value}, %{
id: hex_root,
mainstat: status,
color: map_color(status),
title: slot,
detail__root: hex_root
})
end

def block_relationship(nil, _), do: :ok

def block_relationship(parent_root, root) do
Expand All @@ -120,6 +104,34 @@ defmodule LambdaEthereumConsensus.Metrics do
end
end

def span_operation(handler, transition, operation, f) do
:telemetry.span([:fork_choice, :latency], %{}, fn ->
{f.(), %{handler: handler, transition: transition, operation: operation}}
end)
end

def handler_span(module, action, f) do
:telemetry.span([:libp2pport, :handler], %{}, fn ->
{f.(), %{module: module, action: action}}
end)
end

defp block_status_execute(root, status, slot, value) do
hex_root = Base.encode16(root)

Logger.debug(
"[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}"
)

:telemetry.execute([:blocks, :status], %{total: value}, %{
id: hex_root,
mainstat: status,
color: map_color(status),
title: slot,
detail__root: hex_root
})
end

defp map_color(:transitioned), do: "blue"
defp map_color(:pending), do: "green"
defp map_color(:download_blobs), do: "yellow"
Expand Down
Loading

0 comments on commit fc5daae

Please sign in to comment.