Skip to content

Commit

Permalink
Autoprocessing refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 17, 2024
1 parent 681dac1 commit fd0280f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 209 deletions.
2 changes: 1 addition & 1 deletion charts/processed_data/charts/processed_data/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ type: application

version: 0.1.0

appVersion: 0.1.0-rc13
appVersion: 0.1.0-rc12
131 changes: 4 additions & 127 deletions processed_data/src/graphql/entities.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use async_graphql::{Enum, SimpleObject};
use models::{
auto_proc, auto_proc_integration, auto_proc_program, auto_proc_scaling,
auto_proc_scaling_statistics, data_collection_file_attachment, processing_job,
processing_job_parameter, sea_orm_active_enums::ScalingStatisticsType,
auto_proc_scaling, auto_proc_scaling_statistics, data_collection_file_attachment,
processing_job, processing_job_parameter, sea_orm_active_enums::ScalingStatisticsType,
};

/// Represents processed image file stored in s3 bucket
Expand Down Expand Up @@ -72,102 +71,6 @@ impl From<processing_job_parameter::Model> for ProcessingJobParameter {
}
}

/// Represents an auto processed job
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProc", unresolvable)]
pub struct AutoProc {
/// An opaque unique identifier for the auto processing
pub auto_proc_id: u32,
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: Option<u32>,
/// Space group of the processing job
pub space_group: Option<String>,
/// Refined cell a in the auto processing job
pub refined_cell_a: Option<f32>,
/// Refined cell b in the auto processing job
pub refined_cell_b: Option<f32>,
/// Refined cell c in the auto processing job
pub refined_cell_c: Option<f32>,
/// Refined cell alpha in the auto processing job
pub refined_cell_alpha: Option<f32>,
/// Refined cell beta in the auto processing job
pub refined_cell_beta: Option<f32>,
/// Refined cell gamma in the auto processing job
pub refined_cell_gamma: Option<f32>,
}

impl From<auto_proc::Model> for AutoProc {
fn from(value: auto_proc::Model) -> Self {
Self {
auto_proc_id: value.auto_proc_id,
auto_proc_program_id: value.auto_proc_program_id,
space_group: value.space_group,
refined_cell_a: value.refined_cell_a,
refined_cell_b: value.refined_cell_b,
refined_cell_c: value.refined_cell_c,
refined_cell_alpha: value.refined_cell_alpha,
refined_cell_beta: value.refined_cell_beta,
refined_cell_gamma: value.refined_cell_gamma,
}
}
}

/// Represents an auto processed program
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProcProgram", unresolvable)]
pub struct AutoProcProgram {
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: u32,
/// Name of the processing programs
pub processing_programs: Option<String>,
/// Processing program status
pub processing_status: Option<i8>,
/// Processing program message
pub processing_message: Option<String>,
/// An opaque unique identifier for the processing processing job
pub processing_job_id: Option<u32>,
}

impl From<auto_proc_program::Model> for AutoProcProgram {
fn from(value: auto_proc_program::Model) -> Self {
Self {
auto_proc_program_id: value.auto_proc_program_id,
processing_programs: value.processing_programs,
processing_status: value.processing_status,
processing_message: value.processing_message,
processing_job_id: value.processing_job_id,
}
}
}

/// Represents an auto processing integration
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProcIntegration", unresolvable)]
pub struct AutoProcIntegration {
/// An opaque unique identifier for the auto processing integration
pub auto_proc_integration_id: u32,
/// An opaque unique identifier for the data collection
pub data_collection_id: u32,
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: Option<u32>,
/// Refined X position of the beam
pub refined_x_beam: Option<f32>,
/// Refined Y position of the beam
pub refined_y_beam: Option<f32>,
}

impl From<auto_proc_integration::Model> for AutoProcIntegration {
fn from(value: auto_proc_integration::Model) -> Self {
Self {
auto_proc_integration_id: value.auto_proc_integration_id,
data_collection_id: value.data_collection_id,
auto_proc_program_id: value.auto_proc_program_id,
refined_x_beam: value.refined_x_beam,
refined_y_beam: value.refined_y_beam,
}
}
}

/// Represents and auto processing scaling
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "AutoProcScaling", unresolvable)]
Expand Down Expand Up @@ -287,36 +190,10 @@ pub struct ProcessJob {
pub parameters: Option<ProcessingJobParameter>,
}

/// Combines auto proc integration and its programs
#[derive(Debug, Clone, SimpleObject)]
#[graphql(
name = "AutoProcessing",
unresolvable = "autoProcIntegrationId",
complex
)]
pub struct AutoProcessing {
#[graphql(flatten)]
/// Represents auto proc integration table
pub auto_proc_integration: AutoProcIntegration,
/// Represents auto proc program table
pub auto_proc_program: Option<AutoProcProgram>,
}

/// Combines autoproc and its scaling and statistics
#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "AutoProcess", unresolvable = "autoProcId", complex)]
pub struct AutoProcess {
#[graphql(flatten)]
/// Represents autoproc table
pub auto_proc: AutoProc,
/// Represents auto proc scaling table
pub auto_proc_scaling: Option<AutoProcScaling>,
}

/// Combines autoproc integration, autoproc program, autoproc and autoproc scaling
#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "AP", unresolvable = "autoProcIntegrationId")]
pub struct AP {
#[graphql(name = "AutoProcessing", unresolvable = "autoProcIntegrationId")]
pub struct AutoProcessing {
/// An opaque unique identifier for the auto processing integration
pub auto_proc_integration_id: u32,
/// An opaque unique identifier for the data collection
Expand Down
92 changes: 11 additions & 81 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use async_graphql::{
};
use aws_sdk_s3::presigning::PresigningConfig;
use entities::{
AutoProc, AutoProcScaling, AutoProcScalingStatics, AutoProcess, AutoProcessing, DataCollection,
DataProcessing, ProcessJob, ProcessingJob, ProcessingJobParameter, StatisticsType, AP,
AutoProcScalingStatics, AutoProcessing, DataCollection, DataProcessing, ProcessJob,
ProcessingJob, ProcessingJobParameter, StatisticsType,
};
use models::{
auto_proc, auto_proc_integration, auto_proc_program, auto_proc_scaling,
Expand Down Expand Up @@ -43,10 +43,6 @@ impl AddDataLoadersExt for async_graphql::Request {
ProcessJobDataLoader::new(database.clone()),
tokio::spawn,
))
.data(DataLoader::new(
AutoProcIntegrationDataLoader::new(database.clone()),
tokio::spawn,
))
.data(DataLoader::new(
AutoProcessingDataLoader::new(database.clone()),
tokio::spawn,
Expand Down Expand Up @@ -79,12 +75,6 @@ pub struct ProcessJobDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
/// DataLoader for AutoProcIntegration
#[allow(clippy::missing_docs_in_private_items)]
pub struct AutoProcIntegrationDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
/// DataLoader for AutoProcessing
#[allow(clippy::missing_docs_in_private_items)]
pub struct AutoProcessingDataLoader {
Expand Down Expand Up @@ -118,16 +108,6 @@ impl ProcessedDataLoader {
}
}

#[allow(clippy::missing_docs_in_private_items)]
impl AutoProcIntegrationDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self {
database,
parent_span: Span::current(),
}
}
}

#[allow(clippy::missing_docs_in_private_items)]
impl AutoProcessingDataLoader {
fn new(database: DatabaseConnection) -> Self {
Expand Down Expand Up @@ -206,8 +186,8 @@ impl Loader<u32> for ProcessJobDataLoader {
}
}

impl Loader<u32> for AutoProcIntegrationDataLoader {
type Value = Vec<AP>;
impl Loader<u32> for AutoProcessingDataLoader {
type Value = Vec<AutoProcessing>;
type Error = async_graphql::Error;

#[instrument(name = "load_auto_proc_integration", skip(self))]
Expand Down Expand Up @@ -265,7 +245,7 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
))
.await?
.into_iter()
.map(|record| AP {
.map(|record| AutoProcessing {
auto_proc_integration_id: record.try_get("", "autoProcIntegrationId").unwrap(),
data_collection_id: record.try_get("", "dataCollectionId").unwrap(),
auto_proc_program_id: record.try_get("", "autoProcProgramId").unwrap_or(None),
Expand Down Expand Up @@ -299,37 +279,6 @@ impl Loader<u32> for AutoProcIntegrationDataLoader {
}
}

impl Loader<u32> for AutoProcessingDataLoader {
type Value = AutoProcess;
type Error = async_graphql::Error;

#[instrument(name = "load_process", skip(self))]
async fn load(&self, keys: &[u32]) -> Result<HashMap<u32, Self::Value>, Self::Error> {
let span = tracing::info_span!(parent: &self.parent_span, "load_process");
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = auto_proc::Entity::find()
.filter(auto_proc::Column::AutoProcProgramId.is_in(keys_vec))
.find_also_related(auto_proc_scaling::Entity)
.all(&self.database)
.await?
.into_iter()
.map(|(auto_proc, scaling)| AutoProcess {
auto_proc: AutoProc::from(auto_proc),
auto_proc_scaling: scaling.map(AutoProcScaling::from),
})
.collect::<Vec<_>>();

for record in records {
let program_id = record.auto_proc.auto_proc_program_id.unwrap();
results.insert(program_id, record);
}

Ok(results)
}
}

impl Loader<(u32, StatisticsType)> for AutoProcScalingDataLoader {
type Value = AutoProcScalingStatics;
type Error = async_graphql::Error;
Expand Down Expand Up @@ -406,11 +355,11 @@ impl DataCollection {
}

/// Fetches all the automatic process
async fn auto_proc_integration(
async fn auto_processing(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<Vec<AP>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcIntegrationDataLoader>>();
) -> async_graphql::Result<Option<Vec<AutoProcessing>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcessingDataLoader>>();
loader.load_one(self.id).await
}
}
Expand All @@ -436,27 +385,14 @@ impl DataProcessing {

#[ComplexObject]
impl AutoProcessing {
/// Fetched the automatic process
async fn auto_proc(&self, ctx: &Context<'_>) -> async_graphql::Result<Option<AutoProcess>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcessingDataLoader>>();
let id = self.auto_proc_integration.auto_proc_program_id;
loader.load_one(id.unwrap()).await
}
}

#[ComplexObject]
impl AutoProcess {
/// Fetches the overall scaling statistics type
async fn overall(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id;
loader
.load_one((id.unwrap(), StatisticsType::Overall))
.load_one((self.auto_proc_id.unwrap(), StatisticsType::Overall))
.await
}

Expand All @@ -466,11 +402,8 @@ impl AutoProcess {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id;
loader
.load_one((id.unwrap(), StatisticsType::InnerShell))
.load_one((self.auto_proc_id.unwrap(), StatisticsType::InnerShell))
.await
}

Expand All @@ -480,11 +413,8 @@ impl AutoProcess {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
let id = <Option<entities::AutoProcScaling> as Clone>::clone(&self.auto_proc_scaling)
.unwrap()
.auto_proc_id;
loader
.load_one((id.unwrap(), StatisticsType::OuterShell))
.load_one((self.auto_proc_id.unwrap(), StatisticsType::OuterShell))
.await
}
}
Expand Down

0 comments on commit fd0280f

Please sign in to comment.