Skip to content

Commit

Permalink
sqlite: separate rw and ro connection pools (#412)
Browse files Browse the repository at this point in the history
* sqlite: separate rw and ro connection pools
* use the read_only connect option (potential source of issues)
  • Loading branch information
awoimbee authored Feb 11, 2025
1 parent 890a3ef commit d93c6f8
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 87 deletions.
16 changes: 9 additions & 7 deletions src/init_db.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use sqlx::migrate::MigrateError;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};

pub async fn init_db(filename: &str, in_memory: bool) -> Result<SqlitePool, MigrateError> {
pub async fn init_db(filename: &str) -> Result<(SqlitePool, SqlitePool), MigrateError> {
let options = SqliteConnectOptions::new()
.filename(filename)
.create_if_missing(true)
.in_memory(in_memory)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.foreign_keys(true);

let conn = SqlitePoolOptions::new()
let writer_conn = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.connect_with(options.clone().create_if_missing(true))
.await?;

sqlx::migrate!().run(&conn).await?;
let reader_conn = SqlitePoolOptions::new()
.connect_with(options.read_only(true))
.await?;

sqlx::migrate!().run(&writer_conn).await?;

Ok(conn)
Ok((reader_conn, writer_conn))
}
13 changes: 7 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub struct NetAddr {
pub struct TrowServerState {
pub registry: TrowServer,
pub config: TrowConfig,
pub db: SqlitePool,
pub db_ro: SqlitePool,
pub db_rw: SqlitePool,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -173,21 +174,21 @@ impl TrowConfig {

let registry = TrowServer::new(self.data_dir.clone(), self.config_file.clone())?;

let db_in_memory = self.db_connection == Some(":memory:".to_string());
let db_file = match (&self.db_connection, db_in_memory) {
(Some(conn), false) => conn.clone(),
let db_file = match &self.db_connection {
Some(conn) => conn.clone(),
_ => {
let mut p = self.data_dir.clone();
p.push("trow.db");
p.to_string_lossy().to_string()
}
};
let db = init_db::init_db(&db_file, db_in_memory).await?;
let (db_ro, db_rw) = init_db::init_db(&db_file).await?;

let server_state = TrowServerState {
config: self,
registry,
db,
db_ro,
db_rw,
};
Ok(Arc::new(server_state))
}
Expand Down
28 changes: 14 additions & 14 deletions src/registry/proxy/proxy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl SingleRegistryProxyConfig {
&self,
image: &RemoteImage,
registry: &TrowServer,
db: &SqlitePool,
db_rw: &SqlitePool,
) -> Result<String, DownloadRemoteImageError> {
// Replace eg f/docker/alpine by f/docker/library/alpine
let repo_name = format!("f/{}/{}", self.alias, image.get_repo());
Expand All @@ -142,7 +142,7 @@ impl SingleRegistryProxyConfig {
repo_name,
t
)
.fetch_optional(&mut *db.acquire().await?)
.fetch_optional(db_rw)
.await?;
if let Some((cl, auth)) = &try_cl {
match cl.fetch_manifest_digest(&image_ref, auth).await {
Expand All @@ -151,7 +151,7 @@ impl SingleRegistryProxyConfig {
digests.push(Digest::try_from_raw(&d)?);
}
}
Err(e) => tracing::warn!("Failed to fetch manifest digest: {}", e),
Err(e) => tracing::warn!("Failed to fetch remote tag digest: {}", e),
}
}
if let Some(local_digest) = local_digest {
Expand All @@ -167,7 +167,7 @@ impl SingleRegistryProxyConfig {
r#"SELECT EXISTS(SELECT 1 FROM manifest WHERE digest = $1)"#,
mani_digest_str
)
.fetch_one(&mut *db.acquire().await?)
.fetch_one(db_rw)
.await?;
if has_manifest == 1 {
return Ok(mani_digest.to_string());
Expand All @@ -178,7 +178,7 @@ impl SingleRegistryProxyConfig {
let manifest_download = download_manifest_and_layers(
cl,
auth,
db.clone(),
db_rw.clone(),
&registry.storage,
&ref_to_dl,
&repo_name,
Expand All @@ -197,7 +197,7 @@ impl SingleRegistryProxyConfig {
tag,
mani_digest_str
)
.execute(&mut *db.acquire().await?)
.execute(db_rw)
.await?;
}
return Ok(mani_digest.to_string());
Expand Down Expand Up @@ -260,14 +260,14 @@ async fn get_aws_ecr_password_from_env(ecr_host: &str) -> Result<String, EcrPass
async fn download_manifest_and_layers(
cl: &oci_client::Client,
auth: &RegistryAuth,
db: SqlitePool,
db_rw: SqlitePool,
storage: &TrowStorageBackend,
ref_: &Reference,
local_repo_name: &str,
) -> Result<(), DownloadRemoteImageError> {
async fn download_blob(
cl: &oci_client::Client,
db: SqlitePool,
db_rw: SqlitePool,
storage: &TrowStorageBackend,
ref_: &Reference,
layer_digest: &str,
Expand All @@ -278,7 +278,7 @@ async fn download_manifest_and_layers(
"SELECT EXISTS(SELECT 1 FROM blob WHERE digest = $1);",
layer_digest,
)
.fetch_one(&mut *db.acquire().await?)
.fetch_one(&db_rw)
.await?
== 1;

Expand All @@ -293,15 +293,15 @@ async fn download_manifest_and_layers(
layer_digest,
size
)
.execute(&mut *db.acquire().await?)
.execute(&db_rw)
.await?;
}
sqlx::query!(
"INSERT INTO repo_blob_association (repo_name, blob_digest) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
local_repo_name,
layer_digest
)
.execute(&mut *db.acquire().await?)
.execute(&db_rw)
.await?;

Ok(())
Expand Down Expand Up @@ -331,15 +331,15 @@ async fn download_manifest_and_layers(
.map(|digest| ref_.clone_with_digest(digest.to_string()))
.collect::<Vec<_>>();
let futures = images_to_dl.iter().map(|img| {
download_manifest_and_layers(cl, auth, db.clone(), storage, img, local_repo_name)
download_manifest_and_layers(cl, auth, db_rw.clone(), storage, img, local_repo_name)
});
try_join_all(futures).await?;
}
OCIManifest::V2(_) => {
let layer_digests = manifest.get_local_asset_digests();
let futures = layer_digests
.iter()
.map(|l| download_blob(cl, db.clone(), storage, ref_, l, local_repo_name));
.map(|l| download_blob(cl, db_rw.clone(), storage, ref_, l, local_repo_name));
try_join_all(futures).await?;
}
}
Expand All @@ -352,7 +352,7 @@ async fn download_manifest_and_layers(
local_repo_name,
digest
)
.execute(&mut *db.acquire().await?)
.execute(&db_rw)
.await?;

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/routes/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ async fn get_blob(
State(state): State<Arc<TrowServerState>>,
Path((mut repo, digest)): Path<(String, Digest)>,
) -> Result<BlobReader<impl tokio::io::AsyncRead>, Error> {
let mut conn = state.db.acquire().await?;
let digest_str = digest.as_str();
if repo.starts_with(PROXY_DIR) {
let (proxy_cfg, image) = match state
Expand Down Expand Up @@ -57,7 +56,7 @@ async fn get_blob(
digest_str,
repo
)
.fetch_one(&mut *conn)
.fetch_one(&state.db_ro)
.await?;

let stream = match state.registry.storage.get_blob_stream(&repo, &digest).await {
Expand Down
56 changes: 20 additions & 36 deletions src/routes/blob_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use hyper::StatusCode;

use super::macros::endpoint_fn_7_levels;
use crate::registry::server::PROXY_DIR;
use crate::registry::{digest, ContentInfo, TrowServer};
use crate::registry::{digest, ContentInfo};
use crate::routes::macros::route_7_levels;
use crate::routes::response::errors::Error;
use crate::routes::response::trow_token::TrowToken;
Expand All @@ -21,14 +21,12 @@ use crate::TrowServerState;
mod utils {
use std::ops::RangeInclusive;

use sqlx::SqlitePool;
use uuid::Uuid;

use super::*;

pub async fn complete_upload(
db: &SqlitePool,
registry: &TrowServer,
state: Arc<TrowServerState>,
upload_id: &str,
digest: &Digest,
data: Body,
Expand All @@ -41,16 +39,18 @@ mod utils {
"#,
upload_id,
)
.fetch_one(&mut *db.acquire().await?)
.fetch_one(&state.db_ro)
.await?;
let upload_id_bin = Uuid::parse_str(upload_id).unwrap();

let size = registry
let size = state
.registry
.storage
.write_blob_part_stream(&upload_id_bin, data.into_data_stream(), range)
.await?;

registry
state
.registry
.storage
.complete_blob_write(&upload_id_bin, digest)
.await?;
Expand All @@ -62,7 +62,7 @@ mod utils {
"#,
upload.uuid
)
.execute(&mut *db.acquire().await?)
.execute(&state.db_rw)
.await?;

let digest_str = digest.as_str();
Expand All @@ -75,15 +75,15 @@ mod utils {
digest_str,
size_i64
)
.execute(&mut *db.acquire().await?)
.execute(&state.db_rw)
.await?;

sqlx::query!(
"INSERT INTO repo_blob_association VALUES ($1, $2) ON CONFLICT DO NOTHING",
upload.repo,
digest_str,
)
.execute(&mut *db.acquire().await?)
.execute(&state.db_rw)
.await?;

Ok(AcceptedUpload::new(
Expand Down Expand Up @@ -124,19 +124,12 @@ async fn put_blob_upload(
"#,
uuid_str,
)
.fetch_one(&mut *state.db.acquire().await?)
.fetch_one(&state.db_ro)
.await?;
assert_eq!(upload.repo, repo);

let accepted_upload = utils::complete_upload(
&state.db,
&state.registry,
&uuid_str,
&digest.digest,
chunk,
None,
)
.await?;
let accepted_upload =
utils::complete_upload(state.clone(), &uuid_str, &digest.digest, chunk, None).await?;

// missing location header
Ok(accepted_upload)
Expand Down Expand Up @@ -188,7 +181,7 @@ async fn patch_blob_upload(
"#,
uuid_str,
)
.fetch_one(&mut *state.db.acquire().await?)
.fetch_one(&state.db_ro)
.await?;

let content_range = content_info.map(|ci| ci.range.0..=ci.range.1);
Expand All @@ -203,7 +196,7 @@ async fn patch_blob_upload(
uuid_str,
total_stored,
)
.execute(&mut *state.db.acquire().await?)
.execute(&state.db_rw)
.await?;

Ok(UploadInfo::new(
Expand Down Expand Up @@ -255,20 +248,12 @@ async fn post_blob_upload(
repo_name,
0_i64
)
.execute(&mut *state.db.acquire().await?)
.execute(&state.db_rw)
.await?;

if let Some(digest) = digest.digest {
// Have a monolithic upload with data
return match utils::complete_upload(
&state.db,
&state.registry,
&upload_uuid,
&digest,
data,
None,
)
.await
return match utils::complete_upload(state.clone(), &upload_uuid, &digest, data, None).await
{
Ok(accepted_upload) => Ok(Upload::Accepted(accepted_upload)),
Err(e) => Err(e),
Expand Down Expand Up @@ -309,7 +294,7 @@ async fn get_blob_upload(
upload_id_str,
repo_name
)
.fetch_one(&state.db)
.fetch_one(&state.db_ro)
.await?;
let location = format!("/v2/{}/blobs/uploads/{}", repo_name, upload_id);

Expand Down Expand Up @@ -384,7 +369,6 @@ mod tests {
_ => panic!("Invalid value: {resp:?}"),
};
assert_eq!(upload.range(), (0, 0)); // Haven't uploaded anything yet
let mut conn = state.db.acquire().await.unwrap();
let upload_uuid = upload.uuid().to_string();
sqlx::query!(
r#"
Expand All @@ -393,7 +377,7 @@ mod tests {
"#,
upload_uuid
)
.fetch_one(&mut *conn)
.fetch_one(&state.db_rw)
.await
.unwrap();
}
Expand Down Expand Up @@ -496,7 +480,7 @@ mod tests {
"#,
upload_uuid_str
)
.execute(&mut *state.db.acquire().await.unwrap())
.execute(&state.db_rw)
.await
.unwrap();
state
Expand Down
Loading

0 comments on commit d93c6f8

Please sign in to comment.