Skip to content

Commit

Permalink
Update query for scaling statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 12, 2024
1 parent e278c68 commit a2ebf45
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 44 deletions.
18 changes: 14 additions & 4 deletions processed_data/src/graphql/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl From<processing_job_parameter::Model> for ProcessingJobParameter {

/// Represents an auto processed job
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProc", unresolvable, complex)]
#[graphql(name = "AutoProc", unresolvable)]
pub struct AutoProc {
/// An opaque unique identifier for the auto processing
pub auto_proc_id: u32,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl From<auto_proc::Model> for AutoProc {

/// Represents an auto processed program
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProcProgram", unresolvable, complex)]
#[graphql(name = "AutoProcProgram", unresolvable)]
pub struct AutoProcProgram {
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: u32,
Expand Down Expand Up @@ -170,7 +170,7 @@ impl From<auto_proc_integration::Model> for AutoProcIntegration {

/// Represents and auto processing scaling
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProcScaling", unresolvable, complex)]
#[graphql(name = "AutoProcScaling", unresolvable)]
pub struct AutoProcScaling {
/// An opaque unique identifier for the auto processing scaling
pub auto_proc_scaling_id: u32,
Expand Down Expand Up @@ -270,13 +270,23 @@ pub struct DataCollection {
#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "ProcessJob", unresolvable = "processingJobId")]
pub struct ProcessJob {
#[graphql(flatten)]
pub processing_job: ProcessingJob,
pub parameters: Option<ProcessingJobParameter>,
}

#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "AutoProcessing", unresolvable = "auto_proc_integration_id")]
#[graphql(name = "AutoProcessing", unresolvable = "autoProcIntegrationId", complex)]
pub struct AutoProcessing {
#[graphql(flatten)]
pub auto_proc_integration: AutoProcIntegration,
pub auto_proc_program: Option<AutoProcProgram>,
}

#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "Process", unresolvable = "autoProcId", complex)]
pub struct Process {
#[graphql(flatten)]
pub auto_proc: AutoProc,
pub auto_proc_scaling: Option<AutoProcScaling>,
}
80 changes: 40 additions & 40 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ use async_graphql::{
use aws_sdk_s3::presigning::PresigningConfig;
use entities::{
AutoProc, AutoProcIntegration, AutoProcScaling, AutoProcScalingStatics, AutoProcessing,
DataCollection, DataProcessing, ProcessJob, ProcessingJob, ProcessingJobParameter,
DataCollection, DataProcessing, Process, ProcessJob, ProcessingJob, ProcessingJobParameter,
};
use models::{
auto_proc, auto_proc_integration, auto_proc_program, auto_proc_scaling,
auto_proc_scaling_statistics, data_collection_file_attachment, processing_job,
processing_job_parameter,
};
use sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait,
};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use std::collections::HashMap;
use std::time::Duration;
use tracing::{instrument, Span};
Expand Down Expand Up @@ -47,7 +45,7 @@ impl AddDataLoadersExt for async_graphql::Request {
tokio::spawn,
))
.data(DataLoader::new(
AutoProcDataLoader::new(database.clone()),
AutoProcessingDataLoader::new(database.clone()),
tokio::spawn,
))
.data(DataLoader::new(
Expand Down Expand Up @@ -92,7 +90,7 @@ pub struct AutoProcIntegrationDataLoader {
parent_span: Span,
}

pub struct AutoProcDataLoader {
pub struct AutoProcessingDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
Expand Down Expand Up @@ -140,7 +138,7 @@ impl AutoProcIntegrationDataLoader {
}
}

impl AutoProcDataLoader {
impl AutoProcessingDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self {
database,
Expand Down Expand Up @@ -254,14 +252,6 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc_integration::Entity::find()
// .join_rev(
// JoinType::InnerJoin,
// auto_proc_program::Entity::belongs_to(auto_proc_integration::Entity)
// .from(auto_proc_program::Column::AutoProcProgramId)
// .to(auto_proc_integration::Column::AutoProcProgramId)
// .into()
// )
// .join(JoinType::InnerJoin, auto_proc_program::Relation::AutoProcIntegration.def())
.find_also_related(auto_proc_program::Entity)
.filter(auto_proc_integration::Column::DataCollectionId.is_in(keys_vec))
.all(&self.database)
Expand All @@ -285,25 +275,31 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
}
}

impl Loader<u32> for AutoProcDataLoader {
type Value = AutoProc;
impl Loader<u32> for AutoProcessingDataLoader {
type Value = Process;
type Error = async_graphql::Error;

#[instrument(name = "load_auto_proc", skip(self))]
#[instrument(name = "load_process", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_processed_data");
let span = tracing::info_span!(parent: &self.parent_span, "load_process");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc::Entity::find()
.filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec))
.find_also_related(auto_proc_scaling::Entity)
.all(&self.database)
.await?;
.await?
.into_iter()
.map(|(auto_proc, scaling)| Process {
auto_proc: AutoProc::from(auto_proc),
auto_proc_scaling: scaling.map(AutoProcScaling::from),
})
.collect::<Vec<_>>();

for record in records {
let program_id = record.auto_proc_program_id.unwrap();
let data = AutoProc::from(record);
results.insert(program_id, data);
let program_id = record.auto_proc.auto_proc_program_id.unwrap();
results.insert(program_id, record);
}

Ok(results)
Expand Down Expand Up @@ -463,48 +459,52 @@ impl DataProcessing {
}

#[ComplexObject]
impl AutoProcProgram {
impl AutoProcessing {
/// Fetched the automatic process
async fn auto_proc(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<AutoProc>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcDataLoader>>();
loader.load_one(self.auto_proc_program_id).await
async fn auto_proc(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<Process>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcessingDataLoader>>();
let id = self.auto_proc_integration.auto_proc_program_id.clone();
loader.load_one(id.unwrap()).await
}
}

#[ComplexObject]
impl AutoProc {
/// Fetches the scaling for automatic process
async fn scaling(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<AutoProcScaling>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
loader.load_one(self.auto_proc_id).await
}
}

#[ComplexObject]
impl AutoProcScaling {
impl Process {
/// Fetches the scaling statistics
async fn overall(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingOverall>>();
loader.load_one(self.auto_proc_scaling_id).await
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id
.clone();
loader.load_one(id.unwrap()).await
}

async fn inner_shell(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingInnerShell>>();
loader.load_one(self.auto_proc_scaling_id).await
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id
.clone();
loader.load_one(id.unwrap()).await
}

async fn outer_shell(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingOuterShell>>();
loader.load_one(self.auto_proc_scaling_id).await
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id
.clone();
loader.load_one(id.unwrap()).await
}
}

Expand Down

0 comments on commit a2ebf45

Please sign in to comment.