Skip to content

Commit

Permalink
✨ Implement heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenein committed Nov 15, 2024
1 parent 225d61b commit bb0105e
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT"
publish = true
readme = "README.md"
repository = "https://github.com/eigenein/mrktpltsbot"
version = "2.0.0-rc.1"
version = "2.0.0-rc.2"
rust-version = "1.82"

[badges]
Expand All @@ -33,6 +33,7 @@ pedantic = { level = "warn", priority = -1 }

# Individual flags:
future_not_send = "allow"
ignored_unit_patterns = "allow"
missing_errors_doc = "allow"
module_name_repetitions = "allow"
significant_drop_tightening = "allow"
Expand Down
44 changes: 29 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,32 @@ Self-hosted Marktplaats notifications for Telegram
cargo install mrktpltsbot
```

## Configuration

`mrktpltsbot` supports conventional CLI options as well as environment variables.
On launch, it also automatically loads `.env` from the working directory.

See `mrktpltsbot --help` for the complete list of options,
including the corresponding environment variable names.

> [!TIP]
> Right now, the only required option is a
> [Telegram bot token](https://core.telegram.org/bots/api#authorizing-your-bot).
> [!IMPORTANT]
> `mrktpltsbot` stores all the data in an SQLite database.
> By default, it creates the database file in the working directory.
## Usage

```text
Usage: mrktpltsbot [OPTIONS] --telegram-bot-token <BOT_TOKEN>
Options:
--sentry-dsn <SENTRY_DSN> Sentry DSN: <https://docs.sentry.io/concepts/key-terms/dsn-explainer/> [env: SENTRY_DSN]
--db <DB> SQLite database path [env: DB] [default: mrktpltsbot.sqlite3]
-h, --help Print help
-V, --version Print version
Telegram:
--telegram-bot-token <BOT_TOKEN>
Telegram bot token: <https://core.telegram.org/bots/api#authorizing-your-bot> [env: TELEGRAM_BOT_TOKEN]
--telegram-poll-timeout-secs <POLL_TIMEOUT_SECS>
Timeout for Telegram long polling, in seconds [env: TELEGRAM_POLL_TIMEOUT_SECS] [default: 60]
--telegram-authorize-chat-id <AUTHORIZED_CHAT_IDS>
Authorize chat ID to use the bot [env: TELEGRAM_AUTHORIZED_CHAT_IDS]
--telegram-heartbeat-url <telegram_heartbeat_url>
Heartbeat URL for the Telegram bot [env: TELEGRAM_HEARTBEAT_URL]
Marktplaats:
--marktplaats-crawl-interval-secs <CRAWL_INTERVAL_SECS>
Crawling interval, in seconds [env: MARKTPLAATS_CRAWL_INTERVAL_SECS] [default: 60]
--marktplaats-search-limit <SEARCH_LIMIT>
Limit of Marktplaats search results per query [env: MARKTPLAATS_SEARCH_LIMIT] [default: 30]
--marktplaats-heartbeat-url <marktplaats_heartbeat_url>
Heartbeat URL for the Marktplaats crawler [env: MARKTPLAATS_HEARTBEAT_URL]
```
39 changes: 32 additions & 7 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::path::PathBuf;

use clap::Parser;
use url::Url;

#[derive(Parser)]
#[command(author, version, about, long_about, propagate_version = true)]
pub struct Args {
/// Sentry DSN: <https://docs.sentry.io/concepts/key-terms/dsn-explainer/>.
#[clap(long, env = "SENTRY_DSN")]
#[clap(long, env = "SENTRY_DSN", hide_env_values = true)]
pub sentry_dsn: Option<String>,

/// SQLite database path.
#[expect(clippy::doc_markdown)]
#[clap(long, env = "DB", default_value = "mrktpltsbot.sqlite3")]
#[clap(long, env = "DB", default_value = "mrktpltsbot.sqlite3", hide_env_values = true)]
pub db: PathBuf,

#[command(flatten)]
Expand All @@ -22,35 +23,49 @@ pub struct Args {
}

#[derive(Parser)]
#[clap(next_help_heading = "Marktplaats")]
pub struct MarktplaatsArgs {
/// Crawling interval, in seconds.
#[clap(
long = "marktplaats-crawl-interval-secs",
env = "MARKTPLAATS_CRAWL_INTERVAL_SECS",
default_value = "60"
default_value = "60",
hide_env_values = true
)]
pub crawl_interval_secs: u64,

/// Limit of Marktplaats search results per query.
#[clap(
long = "marktplaats-search-limit",
env = "MARKTPLAATS_SEARCH_LIMIT",
default_value = "30"
default_value = "30",
hide_env_values = true
)]
pub search_limit: u32,

/// Better Stack heartbeat URL for the Marktplaats crawler.
#[clap(
long = "marktplaats-heartbeat-url",
env = "MARKTPLAATS_HEARTBEAT_URL",
id = "marktplaats_heartbeat_url",
hide_env_values = true
)]
pub heartbeat_url: Option<Url>,
}

#[derive(Parser)]
#[clap(next_help_heading = "Telegram")]
pub struct TelegramArgs {
/// Telegram bot token: <https://core.telegram.org/bots/api#authorizing-your-bot>.
#[clap(long = "telegram-bot-token", env = "TELEGRAM_BOT_TOKEN")]
#[clap(long = "telegram-bot-token", env = "TELEGRAM_BOT_TOKEN", hide_env_values = true)]
pub bot_token: String,

/// Timeout for Telegram long polling, in seconds.
#[clap(
long = "telegram-poll-timeout-secs",
env = "TELEGRAM_POLL_TIMEOUT_SECS",
default_value = "60"
default_value = "60",
hide_env_values = true
)]
pub poll_timeout_secs: u64,

Expand All @@ -59,7 +74,17 @@ pub struct TelegramArgs {
long = "telegram-authorize-chat-id",
env = "TELEGRAM_AUTHORIZED_CHAT_IDS",
value_delimiter = ',',
alias = "chat-id"
alias = "chat-id",
hide_env_values = true
)]
pub authorized_chat_ids: Vec<i64>,

/// Better Stack heartbeat URL for the Telegram bot.
#[clap(
long = "telegram-heartbeat-url",
env = "TELEGRAM_HEARTBEAT_URL",
id = "telegram_heartbeat_url",
hide_env_values = true
)]
pub heartbeat_url: Option<Url>,
}
32 changes: 14 additions & 18 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{any::type_name, time::Duration};

use clap::crate_version;
use reqwest::{
Body,
IntoUrl,
Method,
header,
Expand All @@ -13,6 +14,7 @@ use serde::{Serialize, de::DeserializeOwned};

use crate::prelude::*;

/// [`reqwest::Client`] wrapper that encapsulates the client's settings.
#[derive(Clone)]
pub struct Client(reqwest::Client);

Expand All @@ -25,12 +27,9 @@ impl Client {
" (Rust; https://github.com/eigenein/mrktpltsbot)",
);

pub fn new() -> Result<Self> {
pub fn try_new() -> Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(
header::USER_AGENT,
HeaderValue::from_static(Self::USER_AGENT),
);
headers.insert(header::USER_AGENT, HeaderValue::from_static(Self::USER_AGENT));
reqwest::Client::builder()
.gzip(true)
.use_rustls_tls()
Expand All @@ -48,14 +47,17 @@ impl Client {
}
}

/// [`reqwest::RequestBuilder`] wrapper that traces the requests.
pub struct RequestBuilder(reqwest::RequestBuilder);

impl RequestBuilder {
pub fn try_clone(&self) -> Result<Self> {
self.0
.try_clone()
.context("failed to clone the request builder")
.map(Self)
self.0.try_clone().context("failed to clone the request builder").map(Self)
}

/// Send a body.
pub fn body(self, body: impl Into<Body>) -> Self {
Self(self.0.body(body))
}

/// Send a JSON body.
Expand All @@ -72,22 +74,16 @@ impl RequestBuilder {
pub async fn read_json<R: DeserializeOwned>(self, error_for_status: bool) -> Result<R> {
let body = self.read_text(error_for_status).await?;
serde_json::from_str(&body).with_context(|| {
format!(
"failed to deserialize the response into `{}`",
type_name::<R>()
)
format!("failed to deserialize the response into `{}`", type_name::<R>())
})
}

#[instrument(skip_all, ret(level = Level::DEBUG), err(level = Level::DEBUG))]
async fn read_text(self, error_for_status: bool) -> Result<String> {
pub async fn read_text(self, error_for_status: bool) -> Result<String> {
let response = self.0.send().await.context("failed to send the request")?;
let status = response.status();
trace!(url = ?response.url(), ?status, "Reading response…");
let body = response
.text()
.await
.context("failed to read the response")?;
let body = response.text().await.context("failed to read the response")?;
trace!(?status, body, "Received response");
if error_for_status && (status.is_client_error() || status.is_server_error()) {
Err(anyhow!("HTTP {status:?}"))
Expand Down
76 changes: 76 additions & 0 deletions src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use reqwest::Method;
use url::Url;

use crate::{client::Client, prelude::*};

pub struct Heartbeat<'a>(Option<HeartbeatInner<'a>>);

impl<'a> Heartbeat<'a> {
pub fn try_new(client: &'a Client, success_url: Option<Url>) -> Result<Self> {
success_url
.map(|success_url| HeartbeatInner::try_new(client, success_url))
.transpose()
.map(Self)
}

pub async fn report_success(&self) {
let Some(inner) = &self.0 else {
return;
};
let success_url = inner.success_url.clone();
if let Err(error) = inner.client.request(Method::POST, success_url).read_text(true).await {
warn!("Failed to send the heartbeat: {error:#}");
}
}

pub async fn report_failure(&self, error: &Error) {
let Some(inner) = &self.0 else {
return;
};
let failure_url = inner.failure_url.clone();
if let Err(error) = inner
.client
.request(Method::POST, failure_url)
.body(error.to_string())
.read_text(true)
.await
{
warn!("Failed to report the failure: {error:#}");
}
}
}

struct HeartbeatInner<'a> {
client: &'a Client,
success_url: Url,
failure_url: Url,
}

impl<'a> HeartbeatInner<'a> {
fn try_new(client: &'a Client, success_url: Url) -> Result<Self> {
let mut failure_url = success_url.clone();
failure_url
.path_segments_mut()
.map_err(|_| anyhow!("could not add a segment to `{success_url}`"))?
.push("fail");
Ok(Self { client, success_url, failure_url })
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn try_new_ok() -> Result {
let client = Client::try_new()?;
let url = Url::parse("https://uptime.betterstack.com/api/v1/heartbeat/XYZ1234")?;
let heartbeat = Heartbeat::try_new(&client, Some(url))?;
let inner = heartbeat.0.expect("inner should be `Some`");
assert_eq!(
inner.failure_url,
Url::parse("https://uptime.betterstack.com/api/v1/heartbeat/XYZ1234/fail")?,
);
Ok(())
}
}
21 changes: 13 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc = include_str!("../README.md")]

use std::time::Duration;
use std::{pin::pin, time::Duration};

use clap::Parser;
use futures::{StreamExt, TryFutureExt};
Expand All @@ -9,6 +9,7 @@ use crate::{
cli::Args,
client::Client,
db::{Db, notification::Notifications},
heartbeat::Heartbeat,
marktplaats::Marktplaats,
prelude::*,
telegram::{Telegram, reaction::Reaction},
Expand All @@ -17,6 +18,7 @@ use crate::{
mod cli;
mod client;
mod db;
mod heartbeat;
mod logging;
mod marktplaats;
mod prelude;
Expand All @@ -38,14 +40,16 @@ fn main() -> Result {
}

async fn async_main(cli: Args) -> Result {
let client = Client::new()?;
let client = Client::try_new()?;
let telegram = Telegram::new(client.clone(), cli.telegram.bot_token.into())?;
let marktplaats = Marktplaats(client);
let marktplaats = Marktplaats(client.clone());
let db = Db::try_new(&cli.db).await?;
let command_builder = telegram::bot::try_init(&telegram).await?;

// Handle Telegram updates:
let telegram_updates = telegram.clone().into_updates(0, cli.telegram.poll_timeout_secs);
let telegram_heartbeat = Heartbeat::try_new(&client, cli.telegram.heartbeat_url)?;
let telegram_updates =
telegram.clone().into_updates(0, cli.telegram.poll_timeout_secs, &telegram_heartbeat);
let telegram_reactor = telegram::bot::Reactor::builder()
.authorized_chat_ids(cli.telegram.authorized_chat_ids.into_iter().collect())
.db(&db)
Expand All @@ -62,16 +66,17 @@ async fn async_main(cli: Args) -> Result {
.command_builder(&command_builder)
.search_limit(cli.marktplaats.search_limit)
.build();
let marktplaats_reactions = marktplaats_reactor.run();
let marktplaats_heartbeat = Heartbeat::try_new(&client, cli.marktplaats.heartbeat_url)?;
let marktplaats_reactions = marktplaats_reactor.run(&marktplaats_heartbeat);

// Now, merge all the reactions and execute them:
tokio_stream::StreamExt::merge(telegram_reactions, marktplaats_reactions)
let reactor = tokio_stream::StreamExt::merge(telegram_reactions, marktplaats_reactions)
.filter_map(|result| async {
// Log and skip error to keep the reactor going.
result.inspect_err(|error| error!("Reactor error: {error:#}")).ok()
})
.for_each(|reaction| execute_reaction(reaction, &telegram, &db))
.await;
.for_each(|reaction| execute_reaction(reaction, &telegram, &db));
pin!(reactor).await;

unreachable!()
}
Expand Down
Loading

0 comments on commit bb0105e

Please sign in to comment.