Skip to content

Commit

Permalink
configuration for native queries (#3)
Browse files Browse the repository at this point in the history
Adds configuration for native queries, and includes native queries in schema response. Does not implement native queries in the query handler.

This changes the way the configuration directory is read:

- `schema.{json,yaml,yml}` is parsed according to `configuration::Schema`
- all json and yaml files in the `native_queries` subdirectory are parsed according to `configuration::native_queries::NativeQuery`, and are combined into a Vec
- the `configuration::read_directory` function combines the above results into one `configuration::Configuration` value

According to Rikin's feedback I changed the name of the `configuration::Metadata` type to `configuration::Schema`.

https://hasurahq.atlassian.net/browse/MDB-21
  • Loading branch information
hallettj authored Mar 15, 2024
1 parent c84dd63 commit f4e42fa
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 77 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions crates/cli/src/introspection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use configuration::{
metadata::{Collection, ObjectField, ObjectType, Type},
Metadata,
schema::{Collection, ObjectField, ObjectType, Type},
Schema,
};
use futures_util::{StreamExt, TryStreamExt};
use indexmap::IndexMap;
Expand All @@ -12,7 +12,7 @@ use mongodb_agent_common::interface_types::{MongoAgentError, MongoConfig};

pub async fn get_metadata_from_validation_schema(
config: &MongoConfig,
) -> Result<Metadata, MongoAgentError> {
) -> Result<Schema, MongoAgentError> {
let db = config.client.database(&config.database);
let collections_cursor = db.list_collections(None, None).await?;

Expand Down Expand Up @@ -51,7 +51,7 @@ pub async fn get_metadata_from_validation_schema(
.try_collect::<(Vec<Vec<ObjectType>>, Vec<Collection>)>()
.await?;

Ok(Metadata {
Ok(Schema {
collections,
object_types: object_types.concat(),
})
Expand Down Expand Up @@ -121,11 +121,11 @@ fn make_object_field(
}

fn maybe_nullable(
t: configuration::metadata::Type,
t: configuration::schema::Type,
is_nullable: bool,
) -> configuration::metadata::Type {
) -> configuration::schema::Type {
if is_nullable {
configuration::metadata::Type::Nullable(Box::new(t))
configuration::schema::Type::Nullable(Box::new(t))
} else {
t
}
Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {

/// Update the configuration in the current directory by introspecting the database.
async fn update(context: &Context) -> anyhow::Result<()> {
let metadata = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?;
let configuration = Configuration { metadata };
let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?;
let configuration = Configuration::from_schema(schema);

configuration::write_directory(&context.path, &configuration).await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn main() -> anyhow::Result<()> {
Some(path) => path,
None => env::current_dir()?,
};
let mongo_config = try_init_state_from_uri(&args.connection_uri)
let mongo_config = try_init_state_from_uri(&args.connection_uri, &Default::default())
.await
.map_err(|e| anyhow!("Error initializing MongoDB state {}", e))?;
let context = Context { path, mongo_config };
Expand Down
3 changes: 3 additions & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ version = "0.1.0"
edition = "2021"

[dependencies]
futures = "^0.3"
itertools = "^0.12"
mongodb = "2.8"
mongodb-support = { path = "../mongodb-support" }
schemars = "^0.8.12"
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
serde_yaml = "^0.9"
tokio = "1"
tokio-stream = { version = "^0.1", features = ["fs"] }
21 changes: 17 additions & 4 deletions crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
use std::{io, path::Path};

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde::Deserialize;

use crate::{read_directory, Metadata};
use crate::{native_queries::NativeQuery, read_directory, Schema};

#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Clone, Debug, Default, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Configuration {
pub metadata: Metadata,
/// Descriptions of collections and types used in the database
pub schema: Schema,

/// Native queries allow arbitrary MongoDB aggregation pipelines where types of results are
/// specified via user configuration.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub native_queries: Vec<NativeQuery>,
}

impl Configuration {
pub fn from_schema(schema: Schema) -> Self {
Self {
schema,
..Default::default()
}
}

pub async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Self> {
Expand Down
94 changes: 74 additions & 20 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::{
io,
path::{Path, PathBuf},
};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

use crate::Configuration;
use crate::{native_queries::NativeQuery, Configuration};

pub const SCHEMA_FILENAME: &str = "schema";
pub const NATIVE_QUERIES_DIRNAME: &str = "native_queries";

pub const CONFIGURATION_FILENAME: &str = "schema";
pub const CONFIGURATION_EXTENSIONS: [(&str, FileFormat); 3] =
[("json", JSON), ("yaml", YAML), ("yml", YAML)];
pub const DEFAULT_EXTENSION: &str = "json";
Expand All @@ -26,29 +30,75 @@ const YAML: FileFormat = FileFormat::Yaml;
pub async fn read_directory(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Configuration> {
parse_file(configuration_dir, CONFIGURATION_FILENAME).await
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;

let native_queries: Vec<NativeQuery> = read_subdir_configs(&dir.join(NATIVE_QUERIES_DIRNAME))
.await?
.unwrap_or_default();

Ok(Configuration {
schema,
native_queries,
})
}

/// Parse all files in a directory with one of the allowed configuration extensions according to
/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all
/// json and yaml files in the given directory should be parsed as native query configurations.
async fn read_subdir_configs<T>(subdir: &Path) -> io::Result<Option<Vec<T>>>
where
for<'a> T: Deserialize<'a>,
{
if !(fs::try_exists(subdir).await?) {
return Ok(None);
}

let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?);
let configs = dir_stream
.try_filter_map(|dir_entry| async move {
// Permits regular files and symlinks, does not filter out symlinks to directories.
let is_file = !(dir_entry.file_type().await?.is_dir());
if !is_file {
return Ok(None);
}

let path = dir_entry.path();
let extension = path.extension().and_then(|ext| ext.to_str());

let format_option = extension
.and_then(|ext| {
CONFIGURATION_EXTENSIONS
.iter()
.find(|(expected_ext, _)| ext == *expected_ext)
})
.map(|(_, format)| *format);

Ok(format_option.map(|format| (path, format)))
})
.and_then(|(path, format)| async move { parse_config_file::<T>(path, format).await })
.try_collect::<Vec<T>>()
.await?;

Ok(Some(configs))
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_file<T>(configuration_dir: impl AsRef<Path>, basename: &str) -> io::Result<T>
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> io::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let (path, format) = find_file(configuration_dir, basename).await?;
read_file(path, format).await
parse_config_file(path, format).await
}

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(
configuration_dir: impl AsRef<Path>,
basename: &str,
) -> io::Result<(PathBuf, FileFormat)> {
let dir = configuration_dir.as_ref();

async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = dir.join(format!("{basename}.{extension}"));
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
Expand All @@ -58,7 +108,7 @@ async fn find_file(
io::ErrorKind::NotFound,
format!(
"could not find file, {:?}",
dir.join(format!(
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
Expand All @@ -69,7 +119,7 @@ async fn find_file(
))
}

async fn read_file<T>(path: impl AsRef<Path>, format: FileFormat) -> io::Result<T>
async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> io::Result<T>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -82,24 +132,28 @@ where
Ok(value)
}

/// Currently only writes `schema.json`
pub async fn write_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
) -> io::Result<()> {
write_file(configuration_dir, CONFIGURATION_FILENAME, configuration).await
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
}

fn default_file_path(configuration_dir: impl AsRef<Path>, basename: &str) -> PathBuf {
let dir = configuration_dir.as_ref();
dir.join(format!("{basename}.{DEFAULT_EXTENSION}"))
}

async fn write_file(
async fn write_file<T>(
configuration_dir: impl AsRef<Path>,
basename: &str,
configuration: &Configuration,
) -> io::Result<()> {
value: &T,
) -> io::Result<()>
where
T: Serialize,
{
let path = default_file_path(configuration_dir, basename);
let bytes = serde_json::to_vec_pretty(configuration)?;
let bytes = serde_json::to_vec_pretty(value)?;
fs::write(path, bytes).await
}
5 changes: 3 additions & 2 deletions crates/configuration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
mod configuration;
pub mod metadata;
pub mod schema;
pub mod native_queries;
mod directory;

pub use crate::configuration::Configuration;
pub use crate::metadata::Metadata;
pub use crate::schema::Schema;
pub use crate::directory::read_directory;
pub use crate::directory::write_directory;
19 changes: 15 additions & 4 deletions crates/configuration/src/native_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria};
use schemars::JsonSchema;
use serde::Deserialize;

use crate::metadata::ObjectField;
use crate::schema::{ObjectField, Type};

/// An arbitrary database command using MongoDB's runCommand API.
/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/
Expand All @@ -12,9 +12,8 @@ pub struct NativeQuery {
/// Name that will be used to identify the query in your data graph
pub name: String,

/// The name of an object type that specifies the type of data returned from the query. This
/// must correspond to a configuration definition in `metadata.objectTypes`.
pub result_type: String,
/// Type of data returned by the query.
pub result_type: Type,

/// Arguments for per-query customization
pub arguments: Vec<ObjectField>,
Expand All @@ -31,6 +30,18 @@ pub struct NativeQuery {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,

/// Set to `readWrite` if this native query might modify data in the database.
#[serde(default)]
pub mode: Mode,
}

#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum Mode {
#[default]
ReadOnly,
ReadWrite,
}

type Object = serde_json::Map<String, serde_json::Value>;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use self::database::{Collection, ObjectField, ObjectType, Type};

#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
pub struct Schema {
#[serde(default)]
pub collections: Vec<Collection>,
#[serde(default)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use configuration::native_queries::NativeQuery;
use mongodb::Client;

#[derive(Clone, Debug)]
Expand All @@ -6,4 +7,6 @@ pub struct MongoConfig {

/// Name of the database to connect to
pub database: String,

pub native_queries: Vec<NativeQuery>,
}
14 changes: 11 additions & 3 deletions crates/mongodb-agent-common/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
use std::{env, error::Error};

use anyhow::anyhow;
use configuration::Configuration;

use crate::{interface_types::MongoConfig, mongodb_connection::get_mongodb_client};

pub const DATABASE_URI_ENV_VAR: &str = "MONGODB_DATABASE_URI";

/// Reads database connection URI from environment variable
pub async fn try_init_state() -> Result<MongoConfig, Box<dyn Error + Send + Sync>> {
pub async fn try_init_state(
configuration: &Configuration,
) -> Result<MongoConfig, Box<dyn Error + Send + Sync>> {
// Splitting this out of the `Connector` impl makes error translation easier
let database_uri = env::var(DATABASE_URI_ENV_VAR)?;
try_init_state_from_uri(&database_uri).await
try_init_state_from_uri(&database_uri, configuration).await
}

pub async fn try_init_state_from_uri(database_uri: &str) -> Result<MongoConfig, Box<dyn Error + Send + Sync>> {
pub async fn try_init_state_from_uri(
database_uri: &str,
configuration: &Configuration,
) -> Result<MongoConfig, Box<dyn Error + Send + Sync>> {
let client = get_mongodb_client(database_uri).await?;
let database_name = match client.default_database() {
Some(database) => Ok(database.name().to_owned()),
Expand All @@ -23,5 +30,6 @@ pub async fn try_init_state_from_uri(database_uri: &str) -> Result<MongoConfig,
Ok(MongoConfig {
client,
database: database_name,
native_queries: configuration.native_queries.clone(),
})
}
Loading

0 comments on commit f4e42fa

Please sign in to comment.