Skip to content

Commit

Permalink
Add Client.next_raw_update() (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimentyy authored Aug 11, 2024
1 parent b49bb9d commit 49aa302
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
3 changes: 2 additions & 1 deletion lib/grammers-client/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use grammers_mtproto::{mtp, transport};
use grammers_mtsender::{self as sender, ReconnectionPolicy, Sender};
use grammers_session::{ChatHashCache, MessageBox, Session};
use grammers_tl_types as tl;
use sender::Enqueuer;
use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -136,7 +137,7 @@ pub(crate) struct ClientState {
// When did we last warn the user that the update queue filled up?
// This is used to avoid spamming the log.
pub(crate) last_update_limit_warn: Option<Instant>,
pub(crate) updates: VecDeque<crate::types::Update>,
pub(crate) updates: VecDeque<(tl::enums::Update, Arc<crate::types::ChatMap>)>,
}

pub(crate) struct Connection {
Expand Down
51 changes: 38 additions & 13 deletions lib/grammers-client/src/client/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::channel_id;
pub use grammers_session::{PrematureEndReason, UpdateState};
use grammers_tl_types as tl;
use log::warn;
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -27,10 +26,7 @@ const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
impl Client {
/// Returns the next update from the buffer where they are queued until used.
///
/// Similar using an iterator manually, this method will return `Some` until no more updates
/// are available (e.g. a graceful disconnection occurred).
///
/// # Examples
/// # Example
///
/// ```
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -50,11 +46,42 @@ impl Client {
/// # }
/// ```
pub async fn next_update(&self) -> Result<Update, InvocationError> {
loop {
let (update, chats) = self.next_raw_update().await?;

if let Some(update) = Update::new(&self, update, &chats) {
return Ok(update);
}
}
}

/// Returns the next raw update and associated chat map from the buffer where they are queued until used.
///
/// # Example
///
/// ```
/// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// loop {
/// let (update, chats) = client.next_raw_update().await?;
///
/// // Print all incoming updates in their raw form
/// dbg!(update);
/// }
/// # Ok(())
/// # }
///
/// ```
///
/// P.S. To receive updateBotInlineSend, go to [@BotFather](https://t.me/BotFather), select your bot and click "Bot Settings", then "Inline Feedback" and select probability.
///
pub async fn next_raw_update(
&self,
) -> Result<(tl::enums::Update, Arc<ChatMap>), InvocationError> {
loop {
let (deadline, get_diff, get_channel_diff) = {
let state = &mut *self.0.state.write().unwrap();
if let Some(updates) = state.updates.pop_front() {
return Ok(updates);
if let Some(update) = state.updates.pop_front() {
return Ok(update);
}
(
state.message_box.check_deadlines(), // first, as it might trigger differences
Expand Down Expand Up @@ -224,7 +251,7 @@ impl Client {

updates.truncate(updates.len() - exceeds);
if notify {
warn!(
log::warn!(
"{} updates were dropped because the update_queue_limit was exceeded",
exceeds
);
Expand All @@ -234,11 +261,9 @@ impl Client {
}
}

state.updates.extend(
updates
.into_iter()
.flat_map(|u| Update::new(self, u, &chat_map)),
);
state
.updates
.extend(updates.into_iter().map(|u| (u, chat_map.clone())));
}

/// Synchronize the updates state to the session.
Expand Down

0 comments on commit 49aa302

Please sign in to comment.