Skip to content

Commit

Permalink
Run rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesbornholt committed Sep 3, 2022
1 parent 6a163ea commit 7947b12
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 109 deletions.
4 changes: 1 addition & 3 deletions s3-client/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ fn init_tracing_subscriber() {
.with_writer(std::io::stderr)
.finish();

subscriber
.try_init()
.expect("unable to install global subscriber");
subscriber.try_init().expect("unable to install global subscriber");
}

fn main() {
Expand Down
4 changes: 1 addition & 3 deletions s3-client/examples/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ fn init_tracing_subscriber() {
.with_writer(std::io::stderr)
.finish();

subscriber
.try_init()
.expect("unable to install global subscriber");
subscriber.try_init().expect("unable to install global subscriber");
}

fn main() {
Expand Down
30 changes: 11 additions & 19 deletions s3-client/src/s3_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::sync::Arc;

use aws_c_s3_sys::{
aws_allocator, aws_client_bootstrap, aws_client_bootstrap_new, aws_client_bootstrap_options,
aws_client_bootstrap_release, aws_credentials_provider,
aws_credentials_provider_chain_default_options, aws_credentials_provider_new_chain_default,
aws_credentials_provider_release, aws_default_allocator, aws_event_loop_group,
aws_event_loop_group_new_default, aws_event_loop_group_release, aws_host_resolver,
aws_host_resolver_default_options, aws_host_resolver_new_default, aws_host_resolver_release,
aws_s3_client, aws_s3_client_config, aws_s3_client_new, aws_s3_client_release,
aws_s3_init_default_signing_config, aws_signing_config_aws,
aws_client_bootstrap_release, aws_credentials_provider, aws_credentials_provider_chain_default_options,
aws_credentials_provider_new_chain_default, aws_credentials_provider_release, aws_default_allocator,
aws_event_loop_group, aws_event_loop_group_new_default, aws_event_loop_group_release, aws_host_resolver,
aws_host_resolver_default_options, aws_host_resolver_new_default, aws_host_resolver_release, aws_s3_client,
aws_s3_client_config, aws_s3_client_new, aws_s3_client_release, aws_s3_init_default_signing_config,
aws_signing_config_aws,
};

use crate::crt_init;
Expand Down Expand Up @@ -65,16 +64,14 @@ impl S3Client {

// Safety: I think the CRT expects an event loop never to move across threads, so S3Client
// must not be `Send`
let event_loop_group =
unsafe { aws_event_loop_group_new_default(allocator, 0, ptr::null()) };
let event_loop_group = unsafe { aws_event_loop_group_new_default(allocator, 0, ptr::null()) };

let mut resolver_options = aws_host_resolver_default_options {
el_group: event_loop_group,
max_entries: 8,
..Default::default()
};
let host_resolver =
unsafe { aws_host_resolver_new_default(allocator, &mut resolver_options as *mut _) };
let host_resolver = unsafe { aws_host_resolver_new_default(allocator, &mut resolver_options as *mut _) };

let mut bootstrap_options = aws_client_bootstrap_options {
event_loop_group,
Expand All @@ -90,12 +87,8 @@ impl S3Client {
bootstrap: client_bootstrap,
..Default::default()
};
let creds_provider = unsafe {
aws_credentials_provider_new_chain_default(
allocator,
&mut creds_provider_options as *mut _,
)
};
let creds_provider =
unsafe { aws_credentials_provider_new_chain_default(allocator, &mut creds_provider_options as *mut _) };

let mut signing_config = Box::new(MaybeUninit::uninit());
let region = "us-east-1".to_string();
Expand Down Expand Up @@ -126,8 +119,7 @@ impl S3Client {
..Default::default()
};
let s3_client = unsafe {
aws_s3_client_new(allocator, &client_config as *const _)
.ok_or("failed to create s3 client".to_string())?
aws_s3_client_new(allocator, &client_config as *const _).ok_or("failed to create s3 client".to_string())?
};

Ok(Self {
Expand Down
7 changes: 3 additions & 4 deletions s3-client/src/s3_client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::sync::mpsc::{Receiver, Sender};
use aws_c_s3_sys::{
aws_byte_cursor, aws_http_header, aws_http_message_add_header, aws_http_message_new_request,
aws_http_message_set_request_method, aws_http_message_set_request_path, aws_http_method_get,
aws_s3_client_make_meta_request, aws_s3_meta_request, aws_s3_meta_request_options,
aws_s3_meta_request_result, aws_s3_meta_request_type, AWS_OP_SUCCESS,
aws_s3_client_make_meta_request, aws_s3_meta_request, aws_s3_meta_request_options, aws_s3_meta_request_result,
aws_s3_meta_request_type, AWS_OP_SUCCESS,
};
use tracing::{error, trace};

Expand Down Expand Up @@ -172,8 +172,7 @@ extern "C" fn get_object_finish_callback(

if result.error_code != 0 {
let error_body = if let Some(error_body) = unsafe { result.error_response_body.as_ref() } {
let error_body: &[u8] =
unsafe { slice::from_raw_parts(error_body.buffer, error_body.len) };
let error_body: &[u8] = unsafe { slice::from_raw_parts(error_body.buffer, error_body.len) };
let error_body = std::str::from_utf8(error_body).expect("error wasn't UTF-8");
error_body.to_string()
} else {
Expand Down
49 changes: 12 additions & 37 deletions s3-client/src/s3_client/list_objects_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ impl S3Client {
on_object: Some(on_object_callback),
on_list_finished: Some(on_list_finished_callback),
user_data: user_data.as_mut() as *mut ListObjectsV2UserData as *mut libc::c_void,
continuation_token: continuation_token
.map(|s| s.as_aws_byte_cursor())
.unwrap_or_default(),
continuation_token: continuation_token.map(|s| s.as_aws_byte_cursor()).unwrap_or_default(),
};

let paginator = aws_s3_initiate_list_objects(self.allocator, &list_objects_params);
Expand Down Expand Up @@ -78,37 +76,22 @@ struct ListObjectsV2UserData {
// This struct needs to be Send because we give it to the CRT
static_assertions::assert_impl_all!(Box<ListObjectsV2UserData>: Send);

extern "C" fn on_object_callback(
info: *const aws_s3_object_info,
user_data_ptr: *mut libc::c_void,
) -> bool {
extern "C" fn on_object_callback(info: *const aws_s3_object_info, user_data_ptr: *mut libc::c_void) -> bool {
unsafe {
let user_data = (user_data_ptr as *mut ListObjectsV2UserData)
.as_mut()
.unwrap();
let user_data = (user_data_ptr as *mut ListObjectsV2UserData).as_mut().unwrap();

let info = info.as_ref().unwrap();

let prefix = byte_cursor_as_osstr(info.prefix);
let key = byte_cursor_as_osstr(info.key);

user_data
.result
.as_mut()
.unwrap()
.objects
.push(S3ObjectInfo {
prefix: prefix.to_owned(),
key: key.to_owned(),
size: info.size,
});
user_data.result.as_mut().unwrap().objects.push(S3ObjectInfo {
prefix: prefix.to_owned(),
key: key.to_owned(),
size: info.size,
});

trace!(
?prefix,
?key,
size = info.size,
"ListObjectsV2 on_object callback"
);
trace!(?prefix, ?key, size = info.size, "ListObjectsV2 on_object callback");

true
}
Expand All @@ -124,22 +107,15 @@ extern "C" fn on_list_finished_callback(
// Turn the user_data pointer into a box so it will be dropped when this callback is done.
// If there are more pages to get, then we will call Box::leak on it (again) until
// all the pages are consumed.
let user_data = unsafe {
(user_data_ptr as *mut ListObjectsV2UserData)
.as_mut()
.unwrap()
};
let user_data = unsafe { (user_data_ptr as *mut ListObjectsV2UserData).as_mut().unwrap() };

if error_code != 0 {
error!(error_code, "ListObjectsV2 on_list_finished_callback error");
user_data
.tx
.take()
.unwrap()
.send(Err(format!(
"on_list_finish callback error: {}",
error_code
)))
.send(Err(format!("on_list_finish callback error: {}", error_code)))
.unwrap();
return;
}
Expand All @@ -156,8 +132,7 @@ extern "C" fn on_list_finished_callback(
return;
}

let result =
unsafe { aws_s3_paginator_continue(paginator, (*user_data.signing_config).as_ref()) };
let result = unsafe { aws_s3_paginator_continue(paginator, (*user_data.signing_config).as_ref()) };

if result != 0 {
error!(result, "ListObjectsV2 aws_s3_paginator_continue failed");
Expand Down
13 changes: 4 additions & 9 deletions s3-client/src/streaming_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,7 @@ impl StreamingGetObject {

let mut parallel_chunks = self.parallel_chunks.read().unwrap();

let mut current_chunk = parallel_chunks
.front()
.expect("cannot read with no chunks left");
let mut current_chunk = parallel_chunks.front().expect("cannot read with no chunks left");
// Check if we're done with the current chunk. If so, push it off the queue and grab the
// next one instead.
if current_chunk.finished() {
Expand Down Expand Up @@ -346,8 +344,7 @@ impl StreamingGetObject {
let my_max_connections = max_connections
.saturating_div(INFLIGHT_REQUEST_TRACKER.load(Ordering::Relaxed))
.max(PARTS_PER_CHUNK);
let my_max_chunks =
(my_max_connections + PARTS_PER_CHUNK - 1) * MAX_CHUNKS_MULTIPLIER / PARTS_PER_CHUNK;
let my_max_chunks = (my_max_connections + PARTS_PER_CHUNK - 1) * MAX_CHUNKS_MULTIPLIER / PARTS_PER_CHUNK;

while parallel_chunks.len() < my_max_chunks {
// We'll only spawn a new chunk if there is room in the queue and the current chunk is
Expand All @@ -356,8 +353,7 @@ impl StreamingGetObject {
let first_chunk_nearing_completion = parallel_chunks
.front()
.map(|chunk| {
chunk.remaining_to_consume()
< chunk.total_chunk_size / PARALLEL_CHUNK_REFILL_FACTOR as u64
chunk.remaining_to_consume() < chunk.total_chunk_size / PARALLEL_CHUNK_REFILL_FACTOR as u64
})
.unwrap_or(true);

Expand All @@ -372,8 +368,7 @@ impl StreamingGetObject {
{
let mut parallel_chunks = self.parallel_chunks.write().unwrap();

let range =
Some(self.next_chunk_offset..self.next_chunk_offset + self.next_chunk_size);
let range = Some(self.next_chunk_offset..self.next_chunk_offset + self.next_chunk_size);
let chunk = Arc::new(StreamingChunk::new(self.next_chunk_size));

self.next_chunk_offset += self.next_chunk_size;
Expand Down
43 changes: 9 additions & 34 deletions s3-file-connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::time::{Duration, UNIX_EPOCH};

use clap::{Arg, Command};
use fuser::{
BackgroundSession, FileAttr, FileType, Filesystem, KernelConfig, MountOption, ReplyAttr,
ReplyData, ReplyDirectory, ReplyEntry, ReplyOpen, Request, Session,
BackgroundSession, FileAttr, FileType, Filesystem, KernelConfig, MountOption, ReplyAttr, ReplyData, ReplyDirectory,
ReplyEntry, ReplyOpen, Request, Session,
};
use s3_client::{S3Client, S3ClientConfig, StreamingGetObject};

Expand Down Expand Up @@ -89,11 +89,7 @@ impl Filesystem for FuseSyncFS {
async fn lookup(&self, _req: &Request<'_>, parent: u64, name: &OsStr, reply: ReplyEntry) {
if parent == 1 {
if name.to_str().map(|s| s == self.key).unwrap_or(false) {
reply.entry(
&TTL_ZERO,
&make_benchmark_file_attr(FILE_INODE, self.size),
0,
);
reply.entry(&TTL_ZERO, &make_benchmark_file_attr(FILE_INODE, self.size), 0);
} else {
reply.error(libc::ENOENT);
}
Expand Down Expand Up @@ -134,12 +130,8 @@ impl Filesystem for FuseSyncFS {
drop(inflight_reads);
let mut inflight_reads_mut = self.inflight_reads.write().unwrap();
println!("{} {} {}", &self.bucket, &self.key, self.size as u64);
let request = StreamingGetObject::new(
Arc::clone(&self.client),
&self.bucket,
&self.key,
self.size as u64,
);
let request =
StreamingGetObject::new(Arc::clone(&self.client), &self.bucket, &self.key, self.size as u64);
inflight_reads_mut.insert(fh, Mutex::new(request));
drop(inflight_reads_mut);
inflight_reads = self.inflight_reads.read().unwrap();
Expand All @@ -152,23 +144,13 @@ impl Filesystem for FuseSyncFS {
}
}

async fn readdir(
&self,
_req: &Request<'_>,
ino: u64,
_fh: u64,
offset: i64,
mut reply: ReplyDirectory,
) {
async fn readdir(&self, _req: &Request<'_>, ino: u64, _fh: u64, offset: i64, mut reply: ReplyDirectory) {
if ino != 1 {
reply.error(libc::ENOENT);
return;
}

let mut entries = vec![
(1, FileType::Directory, "."),
(1, FileType::Directory, ".."),
];
let mut entries = vec![(1, FileType::Directory, "."), (1, FileType::Directory, "..")];

entries.push((FILE_INODE, FileType::RegularFile, &self.key));

Expand Down Expand Up @@ -205,11 +187,7 @@ fn main() {
.required(true)
.help("Act as a client, and mount FUSE at given path"),
)
.arg(
Arg::new("BUCKET_NAME")
.required(true)
.help("Bucket to mount"),
)
.arg(Arg::new("BUCKET_NAME").required(true).help("Bucket to mount"))
.arg(Arg::new("KEY_NAME").required(true).help("Key to mount"))
.arg(
Arg::new("FILE_SIZE")
Expand Down Expand Up @@ -260,10 +238,7 @@ fn main() {
let part_size = matches.get_one::<u64>("part-size");
let thread_count = matches.get_one::<u64>("threads");

let mut options = vec![
MountOption::RO,
MountOption::FSName("fuse_sync".to_string()),
];
let mut options = vec![MountOption::RO, MountOption::FSName("fuse_sync".to_string())];
if matches.is_present("auto_unmount") {
options.push(MountOption::AutoUnmount);
}
Expand Down

0 comments on commit 7947b12

Please sign in to comment.