Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate schema files for each collection #14

Merged
merged 9 commits into from
Mar 27, 2024
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This changelog documents the changes between release versions.

## [Unreleased]
Changes to be included in the next upcoming release
- Use separate schema files for each collection
- Don't sample from collections that already have a schema

## [0.0.2] - 2024-03-26
- Rename CLI plugin to ndc-mongodb ([PR #13](https://github.com/hasura/ndc-mongodb/pull/13))
Expand Down
22 changes: 11 additions & 11 deletions crates/cli/src/introspection/sampling.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};

use super::type_unification::{
unify_object_types, unify_schema, unify_type, TypeUnificationContext, TypeUnificationResult,
unify_object_types, unify_type, TypeUnificationContext, TypeUnificationResult,
};
use configuration::{
schema::{self, Type},
Expand All @@ -22,21 +22,21 @@ type ObjectType = WithName<schema::ObjectType>;
pub async fn sample_schema_from_db(
sample_size: u32,
config: &MongoConfig,
) -> anyhow::Result<Schema> {
let mut schema = Schema {
collections: BTreeMap::new(),
object_types: BTreeMap::new(),
};
existing_schemas: &HashSet<std::string::String>,
) -> anyhow::Result<BTreeMap<std::string::String, Schema>> {
let mut schemas = BTreeMap::new();
let db = config.client.database(&config.database);
let mut collections_cursor = db.list_collections(None, None).await?;

while let Some(collection_spec) = collections_cursor.try_next().await? {
let collection_name = collection_spec.name;
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
schema = unify_schema(schema, collection_schema);
if !existing_schemas.contains(&collection_name) {
let collection_schema =
sample_schema_from_collection(&collection_name, sample_size, config).await?;
schemas.insert(collection_name, collection_schema);
}
}
Ok(schema)
Ok(schemas)
}

async fn sample_schema_from_collection(
Expand Down
20 changes: 1 addition & 19 deletions crates/cli/src/introspection/type_unification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
use configuration::{
schema::{self, Type},
Schema, WithName,
WithName,
};
use indexmap::IndexMap;
use itertools::Itertools as _;
Expand Down Expand Up @@ -255,24 +255,6 @@ pub fn unify_object_types(
Ok(merged_type_map.into_values().collect())
}

/// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections.
pub fn unify_schema(schema_a: Schema, schema_b: Schema) -> Schema {
let collections = schema_a
.collections
.into_iter()
.chain(schema_b.collections)
.collect();
let object_types = schema_a
.object_types
.into_iter()
.chain(schema_b.object_types)
.collect();
Schema {
collections,
object_types,
}
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
42 changes: 29 additions & 13 deletions crates/cli/src/introspection/validation_schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use configuration::{
schema::{self, Type},
Schema, WithName,
Expand All @@ -16,14 +18,14 @@ type ObjectField = WithName<schema::ObjectField>;

pub async fn get_metadata_from_validation_schema(
config: &MongoConfig,
) -> Result<Schema, MongoAgentError> {
) -> Result<BTreeMap<String, Schema>, MongoAgentError> {
let db = config.client.database(&config.database);
let collections_cursor = db.list_collections(None, None).await?;

let (object_types, collections) = collections_cursor
let schemas: Vec<WithName<Schema>> = collections_cursor
.into_stream()
.map(
|collection_spec| -> Result<(Vec<ObjectType>, Collection), MongoAgentError> {
|collection_spec| -> Result<WithName<Schema>, MongoAgentError> {
let collection_spec_value = collection_spec?;
let name = &collection_spec_value.name;
let schema_bson_option = collection_spec_value
Expand All @@ -49,16 +51,27 @@ pub async fn get_metadata_from_validation_schema(
properties: IndexMap::new(),
}),
}
.map(|validator_schema| make_collection(name, &validator_schema))
.map(|validator_schema| make_collection_schema(name, &validator_schema))
},
)
.try_collect::<(Vec<Vec<ObjectType>>, Vec<Collection>)>()
.try_collect::<Vec<WithName<Schema>>>()
.await?;

Ok(Schema {
collections: WithName::into_map(collections),
object_types: WithName::into_map(object_types.concat()),
})
Ok(WithName::into_map(schemas))
}

fn make_collection_schema(
collection_name: &str,
validator_schema: &ValidatorSchema,
) -> WithName<Schema> {
let (object_types, collection) = make_collection(collection_name, validator_schema);
WithName::named(
collection.name.clone(),
Schema {
collections: WithName::into_map(vec![collection]),
object_types: WithName::into_map(object_types),
},
)
}

fn make_collection(
Expand Down Expand Up @@ -100,10 +113,13 @@ fn make_collection(

object_type_defs.push(collection_type);

let collection_info = WithName::named(collection_name, schema::Collection {
description: validator_schema.description.clone(),
r#type: collection_name.to_string(),
});
let collection_info = WithName::named(
collection_name,
schema::Collection {
description: validator_schema.description.clone(),
r#type: collection_name.to_string(),
},
);

(object_type_defs, collection_info)
}
Expand Down
15 changes: 9 additions & 6 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::path::PathBuf;

use clap::{Parser, Subcommand};

use configuration::Configuration;
use mongodb_agent_common::interface_types::MongoConfig;

#[derive(Debug, Clone, Parser)]
Expand Down Expand Up @@ -37,15 +36,19 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {

/// Update the configuration in the current directory by introspecting the database.
async fn update(context: &Context, args: &UpdateArgs) -> anyhow::Result<()> {
let schema = match args.sample_size {
let schemas = match args.sample_size {
None => introspection::get_metadata_from_validation_schema(&context.mongo_config).await?,
Some(sample_size) => {
introspection::sample_schema_from_db(sample_size, &context.mongo_config).await?
let existing_schemas = configuration::list_existing_schemas(&context.path).await?;
introspection::sample_schema_from_db(
sample_size,
&context.mongo_config,
&existing_schemas,
)
.await?
}
};
let configuration = Configuration::from_schema(schema)?;

configuration::write_directory(&context.path, &configuration).await?;
configuration::write_schema_directory(&context.path, schemas).await?;

Ok(())
}
94 changes: 51 additions & 43 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashSet},
path::{Path, PathBuf},
};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

use crate::{with_name::WithName, Configuration};
use crate::{with_name::WithName, Configuration, Schema};

pub const SCHEMA_FILENAME: &str = "schema";
pub const SCHEMA_DIRNAME: &str = "schema";
pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries";

pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] =
Expand All @@ -33,7 +33,10 @@ pub async fn read_directory(
) -> anyhow::Result<Configuration> {
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;
let schemas = read_subdir_configs(&dir.join(SCHEMA_DIRNAME))
.await?
.unwrap_or_default();
let schema = schemas.into_values().fold(Schema::default(), Schema::merge);

let native_queries = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME))
.await?
Expand Down Expand Up @@ -100,41 +103,6 @@ where
}
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let (path, format) = find_file(configuration_dir, basename).await?;
parse_config_file(path, format).await
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(
configuration_dir: &Path,
basename: &str,
) -> anyhow::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
}

Err(anyhow!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
))
}

async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
Expand All @@ -149,14 +117,41 @@ where
Ok(value)
}

/// Currently only writes `schema.json`
pub async fn write_directory(
async fn write_subdir_configs<T>(
subdir: &Path,
configs: impl IntoIterator<Item = (String, T)>,
) -> anyhow::Result<()>
where
T: Serialize,
{
if !(fs::try_exists(subdir).await?) {
fs::create_dir(subdir).await?;
}

for (name, config) in configs {
let with_name: WithName<T> = (name.clone(), config).into();
write_file(subdir, &name, &with_name).await?;
}

Ok(())
}

pub async fn write_schema_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
schemas: impl IntoIterator<Item = (String, Schema)>,
) -> anyhow::Result<()> {
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
let subdir = configuration_dir.as_ref().join(SCHEMA_DIRNAME);
write_subdir_configs(&subdir, schemas).await
}

// /// Currently only writes `schema.json`
// pub async fn write_directory(
// configuration_dir: impl AsRef<Path>,
// configuration: &Configuration,
// ) -> anyhow::Result<()> {
// write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a little commented code here


fn default_file_path(configuration_dir: impl AsRef<Path>, basename: &str) -> PathBuf {
let dir = configuration_dir.as_ref();
dir.join(format!("{basename}.{DEFAULT_EXTENSION}"))
Expand All @@ -176,3 +171,16 @@ where
.await
.with_context(|| format!("error writing {:?}", path))
}

pub async fn list_existing_schemas(
configuration_dir: impl AsRef<Path>,
) -> anyhow::Result<HashSet<String>> {
let dir = configuration_dir.as_ref();

// TODO: we don't really need to read and parse all the schema files here, just get their names.
let schemas = read_subdir_configs::<Schema>(&dir.join(SCHEMA_DIRNAME))
.await?
.unwrap_or_default();

Ok(schemas.into_keys().collect())
}
3 changes: 2 additions & 1 deletion crates/configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ pub mod schema;
mod with_name;

pub use crate::configuration::Configuration;
pub use crate::directory::list_existing_schemas;
pub use crate::directory::read_directory;
pub use crate::directory::write_directory;
pub use crate::directory::write_schema_directory;
pub use crate::schema::Schema;
pub use crate::with_name::{WithName, WithNameRef};
19 changes: 19 additions & 0 deletions crates/configuration/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,23 @@ impl Schema {
.iter()
.map(|(name, field)| WithNameRef::named(name, field))
}

/// Unify two schemas. Assumes that the schemas describe mutually exclusive sets of collections.
pub fn merge(schema_a: Schema, schema_b: Schema) -> Schema {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to emit errors if there are conflicting collection or object type names? I suppose if they're auto-generated it's not likely to come up

let collections = schema_a
.collections
.into_iter()
.chain(schema_b.collections)
.collect();
let object_types = schema_a
.object_types
.into_iter()
.chain(schema_b.object_types)
.collect();
Schema {
collections,
object_types,
}
}

}
Loading