Skip to content

Commit

Permalink
ProcessingJob refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
iamvigneshwars committed Apr 17, 2024
1 parent fd0280f commit 541b7e5
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 130 deletions.
2 changes: 1 addition & 1 deletion charts/processed_data/charts/processed_data/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ type: application

version: 0.1.0

appVersion: 0.1.0-rc12
appVersion: 0.1.0-rc13
172 changes: 90 additions & 82 deletions processed_data/src/graphql/entities.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,80 @@
use async_graphql::{Enum, SimpleObject};
use models::{
auto_proc_scaling, auto_proc_scaling_statistics, data_collection_file_attachment,
processing_job, processing_job_parameter, sea_orm_active_enums::ScalingStatisticsType,
sea_orm_active_enums::ScalingStatisticsType,
};
use sea_orm::QueryResult;

/// Combines autoproc integration, autoproc program, autoproc and autoproc scaling
#[derive(Debug, Clone, SimpleObject)]
#[graphql(
name = "AutoProcessing",
unresolvable = "autoProcIntegrationId",
complex
)]
pub struct AutoProcessing {
/// An opaque unique identifier for the auto processing integration
pub auto_proc_integration_id: u32,
/// An opaque unique identifier for the data collection
pub data_collection_id: u32,
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: Option<u32>,
/// Refined X position of the beam
pub refined_x_beam: Option<f32>,
/// Refined Y position of the beam
pub refined_y_beam: Option<f32>,
/// Name of the processing programs
pub processing_programs: Option<String>,
/// Processing program status
pub processing_status: Option<i8>,
/// Processing program message
pub processing_message: Option<String>,
/// An opaque unique identifier for the processing processing job
pub processing_job_id: Option<u32>,
/// An opaque unique identifier for the auto processing
pub auto_proc_id: Option<u32>,
/// Space group of the processing job
pub space_group: Option<String>,
/// Refined cell a in the auto processing job
pub refined_cell_a: Option<f32>,
/// Refined cell b in the auto processing job
pub refined_cell_b: Option<f32>,
/// Refined cell c in the auto processing job
pub refined_cell_c: Option<f32>,
/// Refined cell alpha in the auto processing job
pub refined_cell_alpha: Option<f32>,
/// Refined cell beta in the auto processing job
pub refined_cell_beta: Option<f32>,
/// Refined cell gamma in the auto processing job
pub refined_cell_gamma: Option<f32>,
/// An opaque unique identifier for the auto processing scaling
pub auto_proc_scaling_id: Option<u32>,
}

impl From<QueryResult> for AutoProcessing {
fn from(value: QueryResult) -> Self {
Self {
auto_proc_integration_id: value.try_get("", "autoProcIntegrationId").unwrap(),
data_collection_id: value.try_get("", "dataCollectionId").unwrap(),
auto_proc_program_id: value.try_get("", "autoProcProgramId").unwrap_or(None),
refined_x_beam: value.try_get("", "refinedXBeam").unwrap_or(None),
refined_y_beam: value.try_get("", "refinedYBeam").unwrap_or(None),
processing_programs: value.try_get("", "processingPrograms").unwrap_or(None),
processing_status: value.try_get("", "processingStatus").unwrap_or(None),
processing_message: value.try_get("", "processingMessage").unwrap_or(None),
processing_job_id: value.try_get("", "processingJobId").unwrap_or(None),
auto_proc_id: value.try_get("", "autoProcId").unwrap_or(None),
space_group: value.try_get("", "spaceGroup").unwrap_or(None),
refined_cell_a: value.try_get("", "refinedCell_a").unwrap_or(None),
refined_cell_b: value.try_get("", "refinedCell_b").unwrap_or(None),
refined_cell_c: value.try_get("", "refinedCell_c").unwrap_or(None),
refined_cell_alpha: value.try_get("", "refinedCell_alpha").unwrap_or(None),
refined_cell_beta: value.try_get("", "refinedCell_beta").unwrap_or(None),
refined_cell_gamma: value.try_get("", "refinedCell_gamma").unwrap_or(None),
auto_proc_scaling_id: value.try_get("", "autoProcScalingId").unwrap_or(None),
}
}
}

/// Represents processed image file stored in s3 bucket
#[derive(Clone, Debug, PartialEq, SimpleObject)]
Expand All @@ -26,47 +98,36 @@ impl From<data_collection_file_attachment::Model> for DataProcessing {

/// Represents a processing job
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "ProcessingJob", unresolvable)]
#[graphql(name = "ProcessingJobs", unresolvable)]
pub struct ProcessingJob {
/// An opaque unique identifier for the processing job
pub processing_job_id: u32,
pub processing_job_id: Option<u32>,
/// An opaque unique identifier for the data collection
pub data_collection_id: Option<u32>,
/// Processing job display name
pub display_name: Option<String>,
/// Represents if the job is automatic or downstream
pub automatic: Option<i8>,
}

impl From<processing_job::Model> for ProcessingJob {
fn from(value: processing_job::Model) -> Self {
Self {
processing_job_id: value.processing_job_id,
data_collection_id: value.data_collection_id,
display_name: value.display_name,
automatic: value.automatic,
}
}
}

/// Represents a processing job parameters
#[derive(Clone, Debug, PartialEq, SimpleObject)]
#[graphql(name = "ProcessingJobParameter", unresolvable)]
#[allow(clippy::missing_docs_in_private_items)]
pub struct ProcessingJobParameter {
pub processing_job_parameter_id: u32,
pub processing_job_id: Option<u32>,
/// An opaque unique identifier for the processing job parameter
pub processing_job_parameter_id: Option<u32>,
/// Parameter key
pub parameter_key: Option<String>,
/// Parameter values
pub parameter_value: Option<String>,
}

impl From<processing_job_parameter::Model> for ProcessingJobParameter {
fn from(value: processing_job_parameter::Model) -> Self {
impl From<QueryResult> for ProcessingJob {
fn from(value: QueryResult) -> Self {
Self {
processing_job_id: value.processing_job_id,
processing_job_parameter_id: value.processing_job_parameter_id,
parameter_key: value.parameter_key,
parameter_value: value.parameter_value,
processing_job_id: value.try_get("", "processingJobId").unwrap_or(None),
data_collection_id: value.try_get("", "dataCollectionId").unwrap_or(None),
display_name: value.try_get("", "displayName").unwrap_or(None),
automatic: value.try_get("", "automatic").unwrap_or(None),
processing_job_parameter_id: value
.try_get("", "processingJobParameterId")
.unwrap_or(None),
parameter_key: value.try_get("", "parameterKey").unwrap_or(None),
parameter_value: value.try_get("", "parameterValue").unwrap_or(None),
}
}
}
Expand Down Expand Up @@ -178,56 +239,3 @@ pub struct DataCollection {
/// An opaque unique identifier for the data collection
pub id: u32,
}

/// Combines processing job and its paremeters
#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "ProcessJob", unresolvable = "processingJobId")]
pub struct ProcessJob {
#[graphql(flatten)]
/// Represents Processing Job table
pub processing_job: ProcessingJob,
/// Represents Processing Job Parameters table
pub parameters: Option<ProcessingJobParameter>,
}

/// Combines autoproc integration, autoproc program, autoproc and autoproc scaling
#[derive(Debug, Clone, SimpleObject)]
#[graphql(name = "AutoProcessing", unresolvable = "autoProcIntegrationId")]
pub struct AutoProcessing {
/// An opaque unique identifier for the auto processing integration
pub auto_proc_integration_id: u32,
/// An opaque unique identifier for the data collection
pub data_collection_id: u32,
/// An opaque unique identifier for the auto processing program
pub auto_proc_program_id: Option<u32>,
/// Refined X position of the beam
pub refined_x_beam: Option<f32>,
/// Refined Y position of the beam
pub refined_y_beam: Option<f32>,
/// Name of the processing programs
pub processing_programs: Option<String>,
/// Processing program status
pub processing_status: Option<i8>,
/// Processing program message
pub processing_message: Option<String>,
/// An opaque unique identifier for the processing processing job
pub processing_job_id: Option<u32>,
/// An opaque unique identifier for the auto processing
pub auto_proc_id: Option<u32>,
/// Space group of the processing job
pub space_group: Option<String>,
/// Refined cell a in the auto processing job
pub refined_cell_a: Option<f32>,
/// Refined cell b in the auto processing job
pub refined_cell_b: Option<f32>,
/// Refined cell c in the auto processing job
pub refined_cell_c: Option<f32>,
/// Refined cell alpha in the auto processing job
pub refined_cell_alpha: Option<f32>,
/// Refined cell beta in the auto processing job
pub refined_cell_beta: Option<f32>,
/// Refined cell gamma in the auto processing job
pub refined_cell_gamma: Option<f32>,
/// An opaque unique identifier for the auto processing scaling
pub auto_proc_scaling_id: Option<u32>,
}
108 changes: 61 additions & 47 deletions processed_data/src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use async_graphql::{
};
use aws_sdk_s3::presigning::PresigningConfig;
use entities::{
AutoProcScalingStatics, AutoProcessing, DataCollection, DataProcessing, ProcessJob,
ProcessingJob, ProcessingJobParameter, StatisticsType,
AutoProcScalingStatics, AutoProcessing, DataCollection, DataProcessing, ProcessingJob,
StatisticsType,
};
use models::{
auto_proc, auto_proc_integration, auto_proc_program, auto_proc_scaling,
Expand Down Expand Up @@ -40,7 +40,7 @@ impl AddDataLoadersExt for async_graphql::Request {
tokio::spawn,
))
.data(DataLoader::new(
ProcessJobDataLoader::new(database.clone()),
ProcessingJobDataLoader::new(database.clone()),
tokio::spawn,
))
.data(DataLoader::new(
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct ProcessedDataLoader {
}
/// DataLoader for Process Job
#[allow(clippy::missing_docs_in_private_items)]
pub struct ProcessJobDataLoader {
pub struct ProcessingJobDataLoader {
database: DatabaseConnection,
parent_span: Span,
}
Expand All @@ -89,7 +89,7 @@ pub struct AutoProcScalingDataLoader {
}

#[allow(clippy::missing_docs_in_private_items)]
impl ProcessJobDataLoader {
impl ProcessingJobDataLoader {
fn new(database: DatabaseConnection) -> Self {
Self {
database,
Expand Down Expand Up @@ -153,8 +153,8 @@ impl Loader<u32> for ProcessedDataLoader {
}
}

impl Loader<u32> for ProcessJobDataLoader {
type Value = Vec<ProcessJob>;
impl Loader<u32> for ProcessingJobDataLoader {
type Value = Vec<ProcessingJob>;
type Error = async_graphql::Error;

#[instrument(name = "load_process_job", skip(self))]
Expand All @@ -163,20 +163,43 @@ impl Loader<u32> for ProcessJobDataLoader {
let _span = span.enter();
let mut results = HashMap::new();
let keys_vec: Vec<u32> = keys.to_vec();
let records = processing_job::Entity::find()
.find_also_related(processing_job_parameter::Entity)
.filter(processing_job::Column::DataCollectionId.is_in(keys_vec))
.all(&self.database)

let query = sea_query::Query::select()
.column(Asterisk)
.from(processing_job::Entity)
.left_join(
processing_job_parameter::Entity,
Expr::col((
processing_job::Entity,
processing_job::Column::ProcessingJobId,
))
.equals((
processing_job_parameter::Entity,
processing_job_parameter::Column::ProcessingJobId,
)),
)
.and_where(Expr::col(processing_job::Column::DataCollectionId).is_in(keys_vec.clone()))
.build_any(
self.database
.get_database_backend()
.get_query_builder()
.deref(),
);

let records = self
.database
.query_all(Statement::from_sql_and_values(
self.database.get_database_backend(),
&query.0,
query.1,
))
.await?
.into_iter()
.map(|(job, parameter)| ProcessJob {
processing_job: ProcessingJob::from(job),
parameters: parameter.map(ProcessingJobParameter::from),
})
.map(ProcessingJob::from)
.collect::<Vec<_>>();

for record in records {
let data_collection_id = record.processing_job.data_collection_id.unwrap();
let data_collection_id = record.data_collection_id.unwrap();
results
.entry(data_collection_id)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -245,26 +268,7 @@ impl Loader<u32> for AutoProcessingDataLoader {
))
.await?
.into_iter()
.map(|record| AutoProcessing {
auto_proc_integration_id: record.try_get("", "autoProcIntegrationId").unwrap(),
data_collection_id: record.try_get("", "dataCollectionId").unwrap(),
auto_proc_program_id: record.try_get("", "autoProcProgramId").unwrap_or(None),
refined_x_beam: record.try_get("", "refinedXBeam").unwrap_or(None),
refined_y_beam: record.try_get("", "refinedYBeam").unwrap_or(None),
processing_programs: record.try_get("", "processingPrograms").unwrap_or(None),
processing_status: record.try_get("", "processingStatus").unwrap_or(None),
processing_message: record.try_get("", "processingMessage").unwrap_or(None),
processing_job_id: record.try_get("", "processingJobId").unwrap_or(None),
auto_proc_id: record.try_get("", "autoProcId").unwrap_or(None),
space_group: record.try_get("", "spaceGroup").unwrap_or(None),
refined_cell_a: record.try_get("", "refinedCell_a").unwrap_or(None),
refined_cell_b: record.try_get("", "refinedCell_b").unwrap_or(None),
refined_cell_c: record.try_get("", "refinedCell_c").unwrap_or(None),
refined_cell_alpha: record.try_get("", "refinedCell_alpha").unwrap_or(None),
refined_cell_beta: record.try_get("", "refinedCell_beta").unwrap_or(None),
refined_cell_gamma: record.try_get("", "refinedCell_gamma").unwrap_or(None),
auto_proc_scaling_id: record.try_get("", "autoProcScalingId").unwrap_or(None),
})
.map(AutoProcessing::from)
.collect::<Vec<_>>();

for record in records {
Expand Down Expand Up @@ -349,8 +353,8 @@ impl DataCollection {
async fn processing_jobs(
&self,
ctx: &Context<'_>,
) -> async_graphql::Result<Option<Vec<ProcessJob>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<ProcessJobDataLoader>>();
) -> async_graphql::Result<Option<Vec<ProcessingJob>>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<ProcessingJobDataLoader>>();
loader.load_one(self.id).await
}

Expand Down Expand Up @@ -391,9 +395,12 @@ impl AutoProcessing {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
loader
.load_one((self.auto_proc_id.unwrap(), StatisticsType::Overall))
.await
match self.auto_proc_id {
Some(id) => loader
.load_one((id, StatisticsType::Overall))
.await,
None => Ok(None)
}
}

/// Fetches the innershell scaling statistics type
Expand All @@ -402,9 +409,13 @@ impl AutoProcessing {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
loader
.load_one((self.auto_proc_id.unwrap(), StatisticsType::InnerShell))
.await
match self.auto_proc_id {
Some(id) => loader
.load_one((id, StatisticsType::InnerShell))
.await,
None => Ok(None)
}

}

/// Fetches the outershell scaling statistics type
Expand All @@ -413,9 +424,12 @@ impl AutoProcessing {
ctx: &Context<'_>,
) -> async_graphql::Result<Option<AutoProcScalingStatics>> {
let loader = ctx.data_unchecked::<DataLoader<AutoProcScalingDataLoader>>();
loader
.load_one((self.auto_proc_id.unwrap(), StatisticsType::OuterShell))
.await
match self.auto_proc_id {
Some(id) => loader
.load_one((id, StatisticsType::OuterShell))
.await,
None => Ok(None)
}
}
}

Expand Down

0 comments on commit 541b7e5

Please sign in to comment.