From 3d59270fa51f70beb6131dc410a8f00cb1c391b5 Mon Sep 17 00:00:00 2001 From: Alexey Dubovskoy Date: Fri, 10 Jan 2025 20:33:13 +0000 Subject: [PATCH] fix: always return if error --- Cargo.lock | 2 +- client/src/chunker.rs | 7 ++++++- client/src/remote.rs | 47 +++++++++++++++++++++++-------------------- client/src/syncer.rs | 3 +-- 4 files changed, 33 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f50b009..316b72f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,7 +435,7 @@ dependencies = [ [[package]] name = "cooklang-sync-client" -version = "0.2.3" +version = "0.2.4" dependencies = [ "async-stream", "diesel", diff --git a/client/src/chunker.rs b/client/src/chunker.rs index 5f24103..99ad723 100644 --- a/client/src/chunker.rs +++ b/client/src/chunker.rs @@ -233,7 +233,12 @@ pub fn is_text(p: &Path) -> bool { if let Some(ext) = p.extension() { let ext = ext.to_ascii_lowercase(); - ext == "cook" || ext == "conf" || ext == "yaml" || ext == "yml" || ext == "md" || ext == "cplan" + ext == "cook" + || ext == "conf" + || ext == "yaml" + || ext == "yml" + || ext == "md" + || ext == "cplan" } else { false } diff --git a/client/src/remote.rs b/client/src/remote.rs index a43a498..95cba49 100644 --- a/client/src/remote.rs +++ b/client/src/remote.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use log::trace; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; -use reqwest::{StatusCode}; +use reqwest::StatusCode; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use futures::{Stream, StreamExt}; @@ -85,10 +85,7 @@ impl Remote { pub async fn upload_batch(&self, chunks: Vec<(String, Vec)>) -> Result<()> { trace!( "uploading chunks {:?}", - chunks - .iter() - .map(|(c, _)| c) - .collect::>() + chunks.iter().map(|(c, _)| c).collect::>() ); // Generate a random boundary string @@ -102,24 +99,26 @@ impl Remote { let final_boundary = format!("--{}--\r\n", &boundary).into_bytes(); // Create a stream of chunk data - let stream = futures::stream::iter(chunks).map(move |(chunk_id, content)| { - let part = format!( - "--{boundary}\r\n\ + let stream = futures::stream::iter(chunks) + .map(move |(chunk_id, content)| { + let part = format!( + "--{boundary}\r\n\ Content-Disposition: form-data; name=\"{chunk_id}\"\r\n\ Content-Type: application/octet-stream\r\n\r\n", - boundary = &boundary, - chunk_id = chunk_id - ); + boundary = &boundary, + chunk_id = chunk_id + ); - let end = "\r\n".to_string(); + let end = "\r\n".to_string(); - // Combine part header, content, and end into a single stream - futures::stream::iter(vec![ - Ok::<_, SyncError>(part.into_bytes()), - Ok::<_, SyncError>(content), - Ok::<_, SyncError>(end.into_bytes()), - ]) - }).flatten(); + // Combine part header, content, and end into a single stream + futures::stream::iter(vec![ + Ok::<_, SyncError>(part.into_bytes()), + Ok::<_, SyncError>(content), + Ok::<_, SyncError>(end.into_bytes()), + ]) + }) + .flatten(); // Add final boundary @@ -312,7 +311,8 @@ impl Remote { fn extract_next_part(buffer: &[u8], boundary: &[u8]) -> Result, Vec)>> { if let Some(start) = find_boundary(buffer, boundary) { if let Some(next_boundary) = find_boundary(&buffer[start + boundary.len()..], boundary) { - let part = buffer[start + boundary.len()..start + boundary.len() + next_boundary].to_vec(); + let part = + buffer[start + boundary.len()..start + boundary.len() + next_boundary].to_vec(); let remaining = buffer[start + boundary.len() + next_boundary..].to_vec(); Ok(Some((part, remaining))) } else { @@ -333,7 +333,9 @@ fn process_part(part: &[u8]) -> Result)>> { .lines() .find(|line| line.starts_with("X-Chunk-ID:")) .and_then(|line| line.split(": ").nth(1)) - .ok_or(SyncError::BatchDownloadError("No chunk ID found".to_string()))? + .ok_or(SyncError::BatchDownloadError( + "No chunk ID found".to_string(), + ))? .trim() .to_string(); @@ -347,7 +349,8 @@ fn process_part(part: &[u8]) -> Result)>> { // Helper function to find boundary in buffer fn find_boundary(data: &[u8], boundary: &[u8]) -> Option { - data.windows(boundary.len()).position(|window| window == boundary) + data.windows(boundary.len()) + .position(|window| window == boundary) } // Helper function to find double CRLF diff --git a/client/src/syncer.rs b/client/src/syncer.rs index 2199342..77ead6d 100644 --- a/client/src/syncer.rs +++ b/client/src/syncer.rs @@ -6,7 +6,7 @@ use time::OffsetDateTime; use tokio::sync::Mutex; use tokio::time::Duration; -use log::{debug, error, trace, warn}; +use log::{debug, error, trace}; use crate::chunker::Chunker; use crate::connection::{get_connection, ConnectionPool}; @@ -18,7 +18,6 @@ use crate::remote::{CommitResultStatus, Remote, REQUEST_TIMEOUT_SECS}; type Result = std::result::Result; const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47); -const NO_INTERNET_SLEEP_SEC: Duration = Duration::from_secs(61); // TODO should be in sync in multiple places const MAX_UPLOAD_SIZE: usize = 3_000_000;