diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index b54afd169066..622b1b58c2de 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -592,7 +592,7 @@ def read_workflow_from_path(self, app, user, path, allow_in_directory=None) -> m import_options = ImportOptions() import_options.deduplicate_subworkflows = True as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=None, import_options=import_options) - raw_description = RawWorkflowDescription(as_dict, path) + raw_description = RawWorkflowDescription(as_dict) created_workflow = self.build_workflow_from_raw_description(trans, raw_description, WorkflowCreateOptions()) return created_workflow.workflow @@ -925,8 +925,9 @@ def to_format_2(wf_dict, **kwds): return wf_dict def _sync_stored_workflow(self, trans, stored_workflow): - workflow_path = stored_workflow.from_path - self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans) + if trans.user_is_admin: + workflow_path = stored_workflow.from_path + self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans) def store_workflow_artifacts(self, directory, filename_base, workflow, **kwd): modern_workflow_path = os.path.join(directory, f"{filename_base}.gxwf.yml") diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 8d18afe555c0..bc0e5d88e83d 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -2489,7 +2489,7 @@ class PostJobAction(Base, RepresentById): workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), index=True, nullable=True) action_type = Column(String(255), nullable=False) output_name = Column(String(255), nullable=True) - action_arguments = Column(MutableJSONType, nullable=True) + _action_arguments = Column("action_arguments", MutableJSONType, nullable=True) workflow_step = relationship( "WorkflowStep", back_populates="post_job_actions", @@ -2503,6 +2503,18 @@ def __init__(self, action_type, workflow_step=None, output_name=None, action_arg self.workflow_step = workflow_step ensure_object_added_to_session(self, object_in_session=workflow_step) + @property + def action_arguments(self): + if self.action_type in ("HideDatasetAction", "DeleteIntermediatesAction") and self._action_arguments is True: + # Fix up broken workflows resulting from imports with gxformat2 <= 0.20.0 + return {} + else: + return self._action_arguments + + @action_arguments.setter + def action_arguments(self, value: Dict[str, Any]): + self._action_arguments = value + class PostJobActionAssociation(Base, RepresentById): __tablename__ = "post_job_action_association" @@ -6701,11 +6713,6 @@ class HistoryDatasetCollectionAssociation( primaryjoin=copied_from_history_dataset_collection_association_id == id, remote_side=[id], uselist=False, - back_populates="copied_to_history_dataset_collection_association", - ) - copied_to_history_dataset_collection_association = relationship( - "HistoryDatasetCollectionAssociation", - back_populates="copied_from_history_dataset_collection_association", ) implicit_input_collections = relationship( "ImplicitlyCreatedDatasetCollectionInput", diff --git a/lib/galaxy/model/store/__init__.py b/lib/galaxy/model/store/__init__.py index 5789dfb2f1b8..d9aab22b92b0 100644 --- a/lib/galaxy/model/store/__init__.py +++ b/lib/galaxy/model/store/__init__.py @@ -996,14 +996,16 @@ def _import_collection_copied_associations( # sense. hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks if copied_from_object_key in object_import_tracker.hdcas_by_key: - hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ - copied_from_object_key - ] + source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key] + if source_hdca is not hdca: + # We may not have the copied source, in which case the first included HDCA in the chain + # acts as the source, so here we make sure we don't create a cycle. + hdca.copied_from_history_dataset_collection_association = source_hdca else: if copied_from_object_key in hdca_copied_from_sinks: - hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[ - hdca_copied_from_sinks[copied_from_object_key] - ] + source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]] + if source_hdca is not hdca: + hdca.copied_from_history_dataset_collection_association = source_hdca else: hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key @@ -1070,7 +1072,7 @@ def attach_workflow_step(imported_object, attrs): for step_attrs in invocation_attrs["steps"]: imported_invocation_step = model.WorkflowInvocationStep() imported_invocation_step.workflow_invocation = imported_invocation - ensure_object_added_to_session(imported_invocation, session=self.sa_session) + ensure_object_added_to_session(imported_invocation_step, session=self.sa_session) attach_workflow_step(imported_invocation_step, step_attrs) restore_times(imported_invocation_step, step_attrs) imported_invocation_step.action = step_attrs["action"] @@ -1921,12 +1923,14 @@ def __init__( self.export_files = export_files self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {} self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {} - self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] + self.included_collections: Dict[ + Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], + Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation], + ] = {} self.included_libraries: List[model.Library] = [] self.included_library_folders: List[model.LibraryFolder] = [] self.included_invocations: List[model.WorkflowInvocation] = [] self.collection_datasets: Set[int] = set() - self.collections_attrs: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = [] self.dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]] = {} self.job_output_dataset_associations: Dict[int, Dict[str, model.DatasetInstance]] = {} @@ -2287,8 +2291,7 @@ def export_collection( def add_dataset_collection( self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation] ) -> None: - self.collections_attrs.append(collection) - self.included_collections.append(collection) + self.included_collections[collection] = collection def add_implicit_conversion_dataset( self, @@ -2343,7 +2346,7 @@ def to_json(attributes): collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS) with open(collections_attrs_filename, "w") as collections_attrs_out: - collections_attrs_out.write(to_json(self.collections_attrs)) + collections_attrs_out.write(to_json(self.included_collections.values())) conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS) with open(conversions_attrs_filename, "w") as conversions_attrs_out: @@ -2364,12 +2367,12 @@ def to_json(attributes): # # Get all jobs associated with included HDAs. - jobs_dict: Dict[str, model.Job] = {} + jobs_dict: Dict[int, model.Job] = {} implicit_collection_jobs_dict = {} def record_job(job): - if not job: - # No viable job. + if not job or job.id in jobs_dict: + # No viable job or job already recorded. return jobs_dict[job.id] = job @@ -2395,10 +2398,11 @@ def record_associated_jobs(obj): ) job_hda = hda while job_hda.copied_from_history_dataset_association: # should this check library datasets as well? + # record job (if one exists) even if dataset was copied + # copy could have been created manually through UI/API or using database operation tool, + # in which case we have a relevant job to export. + record_associated_jobs(job_hda) job_hda = job_hda.copied_from_history_dataset_association - if not job_hda.creating_job_associations: - # No viable HDA found. - continue record_associated_jobs(job_hda) diff --git a/test/integration/test_workflow_tasks.py b/test/integration/test_workflow_tasks.py index c250b624439f..3f8869dc9109 100644 --- a/test/integration/test_workflow_tasks.py +++ b/test/integration/test_workflow_tasks.py @@ -17,6 +17,7 @@ from galaxy_test.base import api_asserts from galaxy_test.base.api import UsesCeleryTasks from galaxy_test.base.populators import ( + DatasetCollectionPopulator, DatasetPopulator, RunJobsSummary, WorkflowPopulator, @@ -27,6 +28,7 @@ class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, UsesCeleryTasks, RunsWorkflowFixtures): dataset_populator: DatasetPopulator + dataset_collection_populator: DatasetCollectionPopulator framework_tool_and_types = True @classmethod @@ -37,6 +39,7 @@ def handle_galaxy_config_kwds(cls, config): def setUp(self): super().setUp() self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor) self.workflow_populator = WorkflowPopulator(self.galaxy_interactor) self._write_file_fixtures() @@ -124,6 +127,47 @@ def test_export_import_invocation_with_step_parameter(self): invocation_details = self._export_and_import_workflow_invocation(summary, use_uris) self._rerun_imported_workflow(summary, invocation_details) + def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(self): + with self.dataset_populator.test_history() as history_id: + self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True).json() + new_history = self.dataset_populator.copy_history(history_id=history_id).json() + copied_collection = self.dataset_populator.get_history_collection_details(new_history["id"]) + workflow_id = self.workflow_populator.upload_yaml_workflow( + """class: GalaxyWorkflow +inputs: + input: + type: collection + collection_type: list +steps: + extract_dataset: + tool_id: __EXTRACT_DATASET__ + in: + input: + source: input +""" + ) + inputs = {"input": {"src": "hdca", "id": copied_collection["id"]}} + workflow_request = {"history": f"hist_id={new_history['id']}", "inputs_by": "name", "inputs": inputs} + invocation = self.workflow_populator.invoke_workflow_raw( + workflow_id, workflow_request, assert_ok=True + ).json() + invocation_id = invocation["id"] + self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) + jobs = self.workflow_populator.get_invocation_jobs(invocation_id) + summary = RunJobsSummary( + history_id=history_id, + workflow_id=workflow_id, + invocation_id=invocation["id"], + inputs=inputs, + jobs=jobs, + invocation=invocation, + workflow_request=workflow_request, + ) + imported_invocation_details = self._export_and_import_workflow_invocation(summary) + original_contents = self.dataset_populator.get_history_contents(new_history["id"]) + contents = self.dataset_populator.get_history_contents(imported_invocation_details["history_id"]) + assert len(contents) == len(original_contents) == 5 + def _export_and_import_workflow_invocation( self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz" ) -> Dict[str, Any]: