Skip to content

Commit

Permalink
fix: always return if error
Browse files Browse the repository at this point in the history
  • Loading branch information
dubadub committed Jan 10, 2025
1 parent 9ab20ac commit 3d59270
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion client/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ pub fn is_text(p: &Path) -> bool {
if let Some(ext) = p.extension() {
let ext = ext.to_ascii_lowercase();

ext == "cook" || ext == "conf" || ext == "yaml" || ext == "yml" || ext == "md" || ext == "cplan"
ext == "cook"
|| ext == "conf"
|| ext == "yaml"
|| ext == "yml"
|| ext == "md"
|| ext == "cplan"
} else {
false
}
Expand Down
47 changes: 25 additions & 22 deletions client/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use uuid::Uuid;
use log::trace;

use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest::{StatusCode};
use reqwest::StatusCode;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};

use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -85,10 +85,7 @@ impl Remote {
pub async fn upload_batch(&self, chunks: Vec<(String, Vec<u8>)>) -> Result<()> {
trace!(
"uploading chunks {:?}",
chunks
.iter()
.map(|(c, _)| c)
.collect::<Vec<_>>()
chunks.iter().map(|(c, _)| c).collect::<Vec<_>>()
);

// Generate a random boundary string
Expand All @@ -102,24 +99,26 @@ impl Remote {
let final_boundary = format!("--{}--\r\n", &boundary).into_bytes();

// Create a stream of chunk data
let stream = futures::stream::iter(chunks).map(move |(chunk_id, content)| {
let part = format!(
"--{boundary}\r\n\
let stream = futures::stream::iter(chunks)
.map(move |(chunk_id, content)| {
let part = format!(
"--{boundary}\r\n\
Content-Disposition: form-data; name=\"{chunk_id}\"\r\n\
Content-Type: application/octet-stream\r\n\r\n",
boundary = &boundary,
chunk_id = chunk_id
);
boundary = &boundary,
chunk_id = chunk_id
);

let end = "\r\n".to_string();
let end = "\r\n".to_string();

// Combine part header, content, and end into a single stream
futures::stream::iter(vec![
Ok::<_, SyncError>(part.into_bytes()),
Ok::<_, SyncError>(content),
Ok::<_, SyncError>(end.into_bytes()),
])
}).flatten();
// Combine part header, content, and end into a single stream
futures::stream::iter(vec![
Ok::<_, SyncError>(part.into_bytes()),
Ok::<_, SyncError>(content),
Ok::<_, SyncError>(end.into_bytes()),
])
})
.flatten();

// Add final boundary

Expand Down Expand Up @@ -312,7 +311,8 @@ impl Remote {
fn extract_next_part(buffer: &[u8], boundary: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
if let Some(start) = find_boundary(buffer, boundary) {
if let Some(next_boundary) = find_boundary(&buffer[start + boundary.len()..], boundary) {
let part = buffer[start + boundary.len()..start + boundary.len() + next_boundary].to_vec();
let part =
buffer[start + boundary.len()..start + boundary.len() + next_boundary].to_vec();
let remaining = buffer[start + boundary.len() + next_boundary..].to_vec();
Ok(Some((part, remaining)))
} else {
Expand All @@ -333,7 +333,9 @@ fn process_part(part: &[u8]) -> Result<Option<(String, Vec<u8>)>> {
.lines()
.find(|line| line.starts_with("X-Chunk-ID:"))
.and_then(|line| line.split(": ").nth(1))
.ok_or(SyncError::BatchDownloadError("No chunk ID found".to_string()))?
.ok_or(SyncError::BatchDownloadError(
"No chunk ID found".to_string(),
))?
.trim()
.to_string();

Expand All @@ -347,7 +349,8 @@ fn process_part(part: &[u8]) -> Result<Option<(String, Vec<u8>)>> {

// Helper function to find boundary in buffer
fn find_boundary(data: &[u8], boundary: &[u8]) -> Option<usize> {
data.windows(boundary.len()).position(|window| window == boundary)
data.windows(boundary.len())
.position(|window| window == boundary)
}

// Helper function to find double CRLF
Expand Down
3 changes: 1 addition & 2 deletions client/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::Duration;

use log::{debug, error, trace, warn};
use log::{debug, error, trace};

use crate::chunker::Chunker;
use crate::connection::{get_connection, ConnectionPool};
Expand All @@ -18,7 +18,6 @@ use crate::remote::{CommitResultStatus, Remote, REQUEST_TIMEOUT_SECS};
type Result<T, E = SyncError> = std::result::Result<T, E>;

const INTERVAL_CHECK_UPLOAD_SEC: Duration = Duration::from_secs(47);
const NO_INTERNET_SLEEP_SEC: Duration = Duration::from_secs(61);
// TODO should be in sync in multiple places
const MAX_UPLOAD_SIZE: usize = 3_000_000;

Expand Down

0 comments on commit 3d59270

Please sign in to comment.