Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement read-only native queries with no arguments #5

Merged
merged 17 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn run(command: Command, context: &Context) -> anyhow::Result<()> {
/// Update the configuration in the current directory by introspecting the database.
async fn update(context: &Context) -> anyhow::Result<()> {
let schema = introspection::get_metadata_from_validation_schema(&context.mongo_config).await?;
let configuration = Configuration::from_schema(schema);
let configuration = Configuration::from_schema(schema)?;

configuration::write_directory(&context.path, &configuration).await?;

Expand Down
1 change: 1 addition & 0 deletions crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
futures = "^0.3"
itertools = "^0.12"
mongodb = "2.8"
Expand Down
96 changes: 90 additions & 6 deletions crates/configuration/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{io, path::Path};
use std::path::Path;

use anyhow::ensure;
use itertools::Itertools;
use schemars::JsonSchema;
use serde::Deserialize;

use crate::{native_queries::NativeQuery, read_directory, Schema};
use crate::{native_queries::NativeQuery, read_directory, schema::ObjectType, Schema};

#[derive(Clone, Debug, Default, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand All @@ -18,16 +20,98 @@ pub struct Configuration {
}

impl Configuration {
pub fn from_schema(schema: Schema) -> Self {
Self {
pub fn validate(schema: Schema, native_queries: Vec<NativeQuery>) -> anyhow::Result<Self> {
let config = Configuration {
schema,
..Default::default()
native_queries,
};

{
let duplicate_type_names: Vec<&str> = config
.object_types()
.map(|t| t.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_type_names.is_empty(),
"configuration contains multiple definitions for these object type names: {}",
duplicate_type_names.join(", ")
);
}

{
let duplicate_collection_names: Vec<&str> = config
.schema
.collections
.iter()
.map(|c| c.name.as_ref())
.duplicates()
.collect();
ensure!(
duplicate_collection_names.is_empty(),
"configuration contains multiple definitions for these collection names: {}",
duplicate_collection_names.join(", ")
);
}

Ok(config)
}

pub fn from_schema(schema: Schema) -> anyhow::Result<Self> {
Self::validate(schema, Default::default())
}

pub async fn parse_configuration(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Self> {
) -> anyhow::Result<Self> {
read_directory(configuration_dir).await
}

/// Returns object types collected from schema and native queries
pub fn object_types(&self) -> impl Iterator<Item = &ObjectType> {
let object_types_from_schema = self.schema.object_types.iter();
let object_types_from_native_queries = self
.native_queries
.iter()
.flat_map(|native_query| &native_query.object_types);
object_types_from_schema.chain(object_types_from_native_queries)
}
}

#[cfg(test)]
mod tests {
use mongodb::bson::doc;

use super::*;
use crate::{schema::Type, Schema};

#[test]
fn fails_with_duplicate_object_types() {
let schema = Schema {
collections: Default::default(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
};
let native_queries = vec![NativeQuery {
name: "hello".to_owned(),
object_types: vec![ObjectType {
name: "Album".to_owned(),
fields: Default::default(),
description: Default::default(),
}],
result_type: Type::Object("Album".to_owned()),
command: doc! { "command": 1 },
arguments: Default::default(),
selection_criteria: Default::default(),
description: Default::default(),
mode: Default::default(),
}];
let result = Configuration::validate(schema, native_queries);
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("multiple definitions"));
assert!(error_msg.contains("Album"));
}
}
59 changes: 29 additions & 30 deletions crates/configuration/src/directory.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use anyhow::{anyhow, Context as _};
use futures::stream::TryStreamExt as _;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::{
io,
path::{Path, PathBuf},
};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;

Expand All @@ -29,7 +27,7 @@ const YAML: FileFormat = FileFormat::Yaml;
/// Read configuration from a directory
pub async fn read_directory(
configuration_dir: impl AsRef<Path> + Send,
) -> io::Result<Configuration> {
) -> anyhow::Result<Configuration> {
let dir = configuration_dir.as_ref();

let schema = parse_json_or_yaml(dir, SCHEMA_FILENAME).await?;
Expand All @@ -38,16 +36,13 @@ pub async fn read_directory(
.await?
.unwrap_or_default();

Ok(Configuration {
schema,
native_queries,
})
Configuration::validate(schema, native_queries)
}

/// Parse all files in a directory with one of the allowed configuration extensions according to
/// the given type argument. For example if `T` is `NativeQuery` this function assumes that all
/// json and yaml files in the given directory should be parsed as native query configurations.
async fn read_subdir_configs<T>(subdir: &Path) -> io::Result<Option<Vec<T>>>
async fn read_subdir_configs<T>(subdir: &Path) -> anyhow::Result<Option<Vec<T>>>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -57,6 +52,7 @@ where

let dir_stream = ReadDirStream::new(fs::read_dir(subdir).await?);
let configs = dir_stream
.map_err(|err| err.into())
.try_filter_map(|dir_entry| async move {
// Permits regular files and symlinks, does not filter out symlinks to directories.
let is_file = !(dir_entry.file_type().await?.is_dir());
Expand Down Expand Up @@ -86,7 +82,7 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc; reads the file; and parses it according to its extension.
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> io::Result<T>
async fn parse_json_or_yaml<T>(configuration_dir: &Path, basename: &str) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
Expand All @@ -96,38 +92,39 @@ where

/// Given a base name, like "connection", looks for files of the form "connection.json",
/// "connection.yaml", etc, and returns the found path with its file format.
async fn find_file(configuration_dir: &Path, basename: &str) -> io::Result<(PathBuf, FileFormat)> {
async fn find_file(
configuration_dir: &Path,
basename: &str,
) -> anyhow::Result<(PathBuf, FileFormat)> {
for (extension, format) in CONFIGURATION_EXTENSIONS {
let path = configuration_dir.join(format!("{basename}.{extension}"));
if fs::try_exists(&path).await? {
return Ok((path, format));
}
}

Err(io::Error::new(
io::ErrorKind::NotFound,
format!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
),
Err(anyhow!(
"could not find file, {:?}",
configuration_dir.join(format!(
"{basename}.{{{}}}",
CONFIGURATION_EXTENSIONS
.into_iter()
.map(|(ext, _)| ext)
.join(",")
))
))
}

async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> io::Result<T>
async fn parse_config_file<T>(path: impl AsRef<Path>, format: FileFormat) -> anyhow::Result<T>
where
for<'a> T: Deserialize<'a>,
{
let bytes = fs::read(path.as_ref()).await?;
let value = match format {
FileFormat::Json => serde_json::from_slice(&bytes)?,
FileFormat::Json => serde_json::from_slice(&bytes)
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
FileFormat::Yaml => serde_yaml::from_slice(&bytes)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
.with_context(|| format!("error parsing {:?}", path.as_ref()))?,
};
Ok(value)
}
Expand All @@ -136,7 +133,7 @@ where
pub async fn write_directory(
configuration_dir: impl AsRef<Path>,
configuration: &Configuration,
) -> io::Result<()> {
) -> anyhow::Result<()> {
write_file(configuration_dir, SCHEMA_FILENAME, &configuration.schema).await
}

Expand All @@ -149,11 +146,13 @@ async fn write_file<T>(
configuration_dir: impl AsRef<Path>,
basename: &str,
value: &T,
) -> io::Result<()>
) -> anyhow::Result<()>
where
T: Serialize,
{
let path = default_file_path(configuration_dir, basename);
let bytes = serde_json::to_vec_pretty(value)?;
fs::write(path, bytes).await
fs::write(path.clone(), bytes)
.await
.with_context(|| format!("error writing {:?}", path))
}
19 changes: 16 additions & 3 deletions crates/configuration/src/native_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use mongodb::{bson, options::SelectionCriteria};
use schemars::JsonSchema;
use serde::Deserialize;

use crate::schema::{ObjectField, Type};
use crate::schema::{ObjectField, ObjectType, Type};

/// An arbitrary database command using MongoDB's runCommand API.
/// See https://www.mongodb.com/docs/manual/reference/method/db.runCommand/
Expand All @@ -12,10 +12,19 @@ pub struct NativeQuery {
/// Name that will be used to identify the query in your data graph
pub name: String,

/// Type of data returned by the query.
/// You may define object types here to reference in `result_type`. Any types defined here will
/// be merged with the definitions in `schema.json`. This allows you to maintain hand-written
/// types for native queries without having to edit a generated `schema.json` file.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub object_types: Vec<ObjectType>,

/// Type of data returned by the query. You may reference object types defined in the
/// `object_types` list in this definition, or you may reference object types from
/// `schema.json`.
pub result_type: Type,

/// Arguments for per-query customization
#[serde(default)]
pub arguments: Vec<ObjectField>,

/// Command to run expressed as a BSON document
Expand All @@ -31,7 +40,11 @@ pub struct NativeQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,

/// Set to `readWrite` if this native query might modify data in the database.
/// Set to `readWrite` if this native query might modify data in the database. When refreshing
/// a dataconnector native queries will appear in the corresponding `DataConnectorLink`
/// definition as `functions` if they are read-only, or as `procedures` if they are read-write.
/// Functions are intended to map to GraphQL Query fields, while procedures map to Mutation
/// fields.
#[serde(default)]
pub mode: Mode,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use configuration::native_queries::NativeQuery;
use dc_api::JsonResponse;
use dc_api_types::{QueryResponse, ResponseFieldValue, RowSet};
use mongodb::Database;

use crate::interface_types::MongoAgentError;

pub async fn handle_native_query_request(
native_query: NativeQuery,
database: Database,
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
let result = database
.run_command(native_query.command, native_query.selection_criteria)
.await?;
let result_json =
serde_json::to_value(result).map_err(|err| MongoAgentError::AdHoc(err.into()))?;

// A function returs a single row with a single column called `__value`
// https://hasura.github.io/ndc-spec/specification/queries/functions.html
let response_row = [(
"__value".to_owned(),
ResponseFieldValue::Column(result_json),
)]
.into_iter()
.collect();

Ok(JsonResponse::Value(QueryResponse::Single(RowSet {
aggregates: None,
rows: Some(vec![response_row]),
})))
}
21 changes: 16 additions & 5 deletions crates/mongodb-agent-common/src/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod column_ref;
mod constants;
mod execute_native_query_request;
mod execute_query_request;
mod foreach;
mod make_selector;
Expand All @@ -17,7 +18,10 @@ pub use self::{
make_sort::make_sort,
pipeline::{is_response_faceted, pipeline_for_non_foreach, pipeline_for_query_request},
};
use crate::interface_types::{MongoAgentError, MongoConfig};
use crate::{
interface_types::{MongoAgentError, MongoConfig},
query::execute_native_query_request::handle_native_query_request,
};

pub fn collection_name(query_request_target: &Target) -> String {
query_request_target.name().join(".")
Expand All @@ -29,10 +33,17 @@ pub async fn handle_query_request(
) -> Result<JsonResponse<QueryResponse>, MongoAgentError> {
tracing::debug!(?config, query_request = %serde_json::to_string(&query_request).unwrap(), "executing query");

let collection = config
.client
.database(&config.database)
.collection::<Document>(&collection_name(&query_request.target));
let database = config.client.database(&config.database);

let target = &query_request.target;
if let Some(native_query) = config.native_queries.iter().find(|query| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now, but linear search (i.e. find) could lead to performance problems for users who have a lot of native queries. These should eventually be stored in some sort of hashmap.

let target_name = target.name();
target_name.len() == 1 && target_name[0] == query.name
}) {
return handle_native_query_request(native_query.clone(), database).await;
}

let collection = database.collection::<Document>(&collection_name(&query_request.target));

execute_query_request(&collection, query_request).await
}
Expand Down
Loading
Loading