Skip to content

Commit

Permalink
Reimplement search_messages to use streams
Browse files Browse the repository at this point in the history
  • Loading branch information
YouKnow-sys authored and Lonami committed Dec 20, 2024
1 parent c104ea3 commit 40ff045
Showing 1 changed file with 38 additions and 18 deletions.
56 changes: 38 additions & 18 deletions lib/grammers-client/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ use crate::types::{InputReactions, IterBuffer, Message};
use crate::utils::{generate_random_id, generate_random_ids};
use crate::{types, ChatMap, Client, InputMedia};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
pub use grammers_mtsender::{AuthorizationError, InvocationError};
use grammers_session::PackedChat;
use grammers_tl_types as tl;
use log::{log_enabled, warn, Level};
use std::collections::HashMap;
use std::future::Future;
use std::task::Poll;
use tl::enums::InputPeer;

fn map_random_ids_to_messages(
Expand Down Expand Up @@ -251,9 +254,9 @@ impl MessageIter {
}
}

pub type SearchIter = IterBuffer<tl::functions::messages::Search, Message>;
pub type SearchStream = IterBuffer<tl::functions::messages::Search, Message>;

impl SearchIter {
impl SearchStream {
fn new(client: &Client, peer: PackedChat) -> Self {
// TODO let users tweak all the options from the request
Self::from_request(
Expand Down Expand Up @@ -351,27 +354,43 @@ impl SearchIter {
self.request.limit = 0;
self.get_total().await
}
}

/// Return the next `Message` from the internal buffer, filling the buffer previously if it's
/// empty.
///
/// Returns `None` if the `limit` is reached or there are no messages left.
pub async fn next(&mut self) -> Result<Option<Message>, InvocationError> {
impl Stream for SearchStream {
type Item = Result<Message, InvocationError>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(result) = self.next_raw() {
return result;
match result {
Ok(m) => return Poll::Ready(m.map(Ok)),
Err(e) => return Poll::Ready(Some(Err(e))),
}
}

self.request.limit = self.determine_limit(MAX_LIMIT);
self.fill_buffer(self.request.limit).await?;
{
self.request.limit = self.determine_limit(MAX_LIMIT);
let limit = self.request.limit;
let this = self.fill_buffer(limit);
futures::pin_mut!(this);
if let Err(e) = futures::ready!(this.poll(cx)) {
return Poll::Ready(Some(Err(e)));
}
}

// Don't bother updating offsets if this is the last time stuff has to be fetched.
if !self.last_chunk && !self.buffer.is_empty() {
let last = &self.buffer[self.buffer.len() - 1];
self.request.offset_id = last.raw.id;
self.request.max_date = last.raw.date;
let (last_id, last_date) = {
let last = &self.buffer[self.buffer.len() - 1];
(last.raw.id, last.raw.date)
};
self.request.offset_id = last_id;
self.request.max_date = last_date;
}

Ok(self.pop_item())
Poll::Ready(self.pop_item().map(Ok))
}
}

Expand Down Expand Up @@ -929,26 +948,27 @@ impl Client {
MessageIter::new(self, chat.into())
}

/// Iterate over the messages that match certain search criteria.
/// Get a stream over the messages that match certain search criteria.
///
/// This allows you to search by text within a chat or filter by media among other things.
///
/// # Examples
///
/// ```
/// # use futures::TryStreamExt;
/// # async fn f(chat: grammers_client::types::Chat, client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
/// // Let's print all the people who think grammers is cool.
/// let mut messages = client.search_messages(&chat).query("grammers is cool");
///
/// while let Some(message) = messages.next().await? {
/// while let Some(message) = messages.try_next().await? {
/// let sender = message.sender().unwrap();
/// println!("{}", sender.name().unwrap_or(&sender.id().to_string()));
/// }
/// # Ok(())
/// # }
/// ```
pub fn search_messages<C: Into<PackedChat>>(&self, chat: C) -> SearchIter {
SearchIter::new(self, chat.into())
pub fn search_messages<C: Into<PackedChat>>(&self, chat: C) -> SearchStream {
SearchStream::new(self, chat.into())
}

/// Iterate over the messages that match certain search criteria, without being restricted to
Expand Down

0 comments on commit 40ff045

Please sign in to comment.