From 5fe9c32655aef50592ecd8866186792394329d60 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Tr=C4=85d?= <jakubtrad@gmail.com>
Date: Thu, 10 Oct 2024 18:22:12 +0200
Subject: [PATCH] Dzejkop/timeout-when-waiting-for-blocks (#58)

* Add timeout

* Mae it configurable
---
 src/config.rs                   | 15 +++++++++++++++
 src/tasks/index.rs              | 28 +++++++++++++++++++++++-----
 tests/common/service_builder.rs |  1 +
 3 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/src/config.rs b/src/config.rs
index 1978ace..6974943 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -63,6 +63,13 @@ pub struct TxSitterConfig {
     )]
     pub hard_reorg_interval: Duration,
 
+    /// Max amount of time to wait for a new block from the RPC block stream
+    #[serde(
+        with = "humantime_serde",
+        default = "default::block_stream_timeout"
+    )]
+    pub block_stream_timeout: Duration,
+
     #[serde(default, skip_serializing_if = "Option::is_none")]
     pub predefined: Option<Predefined>,
 
@@ -219,6 +226,10 @@ mod default {
         Duration::from_secs(60 * 60)
     }
 
+    pub fn block_stream_timeout() -> Duration {
+        Duration::from_secs(60)
+    }
+
     pub mod metrics {
         pub fn host() -> String {
             "127.0.0.1".to_string()
@@ -249,6 +260,7 @@ mod tests {
         escalation_interval = "1h"
         soft_reorg_interval = "1m"
         hard_reorg_interval = "1h"
+        block_stream_timeout = "1m"
 
         [server]
         host = "127.0.0.1:3000"
@@ -266,6 +278,7 @@ mod tests {
         escalation_interval = "1h"
         soft_reorg_interval = "1m"
         hard_reorg_interval = "1h"
+        block_stream_timeout = "1m"
 
         [server]
         host = "127.0.0.1:3000"
@@ -289,6 +302,7 @@ mod tests {
                 escalation_interval: Duration::from_secs(60 * 60),
                 soft_reorg_interval: default::soft_reorg_interval(),
                 hard_reorg_interval: default::hard_reorg_interval(),
+                block_stream_timeout: default::block_stream_timeout(),
                 predefined: None,
                 telemetry: None,
             },
@@ -317,6 +331,7 @@ mod tests {
                 escalation_interval: Duration::from_secs(60 * 60),
                 soft_reorg_interval: default::soft_reorg_interval(),
                 hard_reorg_interval: default::hard_reorg_interval(),
+                block_stream_timeout: default::block_stream_timeout(),
                 predefined: None,
                 telemetry: None,
             },
diff --git a/src/tasks/index.rs b/src/tasks/index.rs
index c60d3b5..f601a92 100644
--- a/src/tasks/index.rs
+++ b/src/tasks/index.rs
@@ -7,6 +7,7 @@ use ethers::types::{Block, BlockNumber, H256};
 use eyre::{Context, ContextCompat};
 use futures::stream::FuturesUnordered;
 use futures::StreamExt;
+use tokio::time::timeout;
 
 use crate::app::App;
 use crate::broadcast_utils::gas_estimation::{
@@ -34,18 +35,35 @@ async fn index_inner(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
     let rpc = app.http_provider(chain_id).await?;
 
     tracing::info!("Subscribing to new blocks");
-    // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream
     let mut blocks_stream = ws_rpc.subscribe_blocks().await?;
 
-    // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head
     tracing::info!("Backfilling blocks");
     if let Some(latest_block) = blocks_stream.next().await {
         backfill_to_block(app.clone(), chain_id, &rpc, latest_block).await?;
     }
 
-    // Index incoming blocks from the stream
-    while let Some(block) = blocks_stream.next().await {
-        index_block(app.clone(), chain_id, &rpc, block).await?;
+    loop {
+        let next_block = timeout(
+            app.config.service.block_stream_timeout,
+            blocks_stream.next(),
+        )
+        .await;
+
+        match next_block {
+            Ok(Some(block)) => {
+                index_block(app.clone(), chain_id, &rpc, block).await?;
+            }
+            Ok(None) => {
+                // Stream ended, break out of the loop
+                tracing::info!("Block stream ended");
+                break;
+            }
+            Err(_) => {
+                // Timeout occurred
+                tracing::warn!("Timed out waiting for a block");
+                break;
+            }
+        }
     }
 
     Ok(())
diff --git a/tests/common/service_builder.rs b/tests/common/service_builder.rs
index 05c428e..48074f8 100644
--- a/tests/common/service_builder.rs
+++ b/tests/common/service_builder.rs
@@ -58,6 +58,7 @@ impl ServiceBuilder {
                 escalation_interval: self.escalation_interval,
                 soft_reorg_interval: self.soft_reorg_interval,
                 hard_reorg_interval: self.hard_reorg_interval,
+                block_stream_timeout: Duration::from_secs(60),
                 telemetry: None,
                 predefined: Some(Predefined {
                     network: PredefinedNetwork {