From abef2e8f3083b109b7217d1d3a2b9586d77707b4 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 19 Nov 2024 01:03:26 +0000 Subject: [PATCH 1/5] Type annotation fixes --- lib/galaxy/celery/tasks.py | 2 +- lib/galaxy/datatypes/protocols.py | 10 ++++++++-- lib/galaxy/jobs/__init__.py | 23 ++++++++++++++--------- lib/galaxy/model/__init__.py | 4 +--- lib/galaxy/model/store/__init__.py | 2 +- lib/galaxy/tools/data_fetch.py | 4 ++-- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index 2c80ede45a71..35b133fd5845 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -202,7 +202,7 @@ def set_metadata( try: if overwrite: hda_manager.overwrite_metadata(dataset_instance) - dataset_instance.datatype.set_meta(dataset_instance) # type:ignore [arg-type] + dataset_instance.datatype.set_meta(dataset_instance) dataset_instance.set_peek() # Reset SETTING_METADATA state so the dataset instance getter picks the dataset state dataset_instance.set_metadata_success_state() diff --git a/lib/galaxy/datatypes/protocols.py b/lib/galaxy/datatypes/protocols.py index 4f5bd1593371..07d2c04d379f 100644 --- a/lib/galaxy/datatypes/protocols.py +++ b/lib/galaxy/datatypes/protocols.py @@ -2,10 +2,16 @@ Location of protocols used in datatypes """ -from typing import Any +from typing import ( + Any, + TYPE_CHECKING, +) from typing_extensions import Protocol +if TYPE_CHECKING: + from sqlalchemy.orm import Mapped + class HasClearAssociatedFiles(Protocol): def clear_associated_files(self, metadata_safe: bool = False, purge: bool = False) -> None: ... @@ -39,7 +45,7 @@ class HasHid(Protocol): class HasId(Protocol): - id: int + id: "Mapped[int]" class HasInfo(Protocol): diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 3e9a306e4116..0bf12219ce3f 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -63,6 +63,7 @@ ) from galaxy.metadata import get_metadata_compute_strategy from galaxy.model import ( + Dataset, Job, store, Task, @@ -100,6 +101,7 @@ if TYPE_CHECKING: from galaxy.jobs.handler import JobHandlerQueue + from galaxy.model import DatasetInstance from galaxy.tools import Tool log = logging.getLogger(__name__) @@ -986,7 +988,7 @@ class MinimalJobWrapper(HasResourceParameters): def __init__( self, - job: model.Job, + job: Job, app: MinimalManagerApp, use_persisted_destination: bool = False, tool: Optional["Tool"] = None, @@ -1190,7 +1192,7 @@ def job_destination(self) -> JobDestination: def galaxy_url(self): return self.get_destination_configuration("galaxy_infrastructure_url") - def get_job(self) -> model.Job: + def get_job(self) -> Job: job = self.sa_session.get(Job, self.job_id) assert job return job @@ -1540,7 +1542,7 @@ def mark_as_resubmitted(self, info=None): self.sa_session.refresh(job) if info is not None: job.info = info - job.set_state(model.Job.states.RESUBMITTED) + job.set_state(Job.states.RESUBMITTED) self.sa_session.add(job) with transaction(self.sa_session): self.sa_session.commit() @@ -1555,7 +1557,7 @@ def change_state(self, state, info=False, flush=True, job=None): # on the current job state value to minimize race conditions. self.sa_session.expire(job, ["state"]) - if job.state in model.Job.terminal_states: + if job.state in Job.terminal_states: log.warning( "(%s) Ignoring state change from '%s' to '%s' for job that is already terminal", job.id, @@ -1610,7 +1612,7 @@ def get_destination_configuration(self, key, default=None): def enqueue(self): job = self.get_job() # Change to queued state before handing to worker thread so the runner won't pick it up again - self.change_state(model.Job.states.QUEUED, flush=False, job=job) + self.change_state(Job.states.QUEUED, flush=False, job=job) # Persist the destination so that the job will be included in counts if using concurrency limits self.set_job_destination(self.job_destination, None, flush=False, job=job) # Set object store after job destination so can leverage parameters... @@ -1732,7 +1734,9 @@ def split_object_stores(output_name): # noqa: F811 https://github.com/PyCQA/pyf job.object_store_id_overrides = object_store_id_overrides self._setup_working_directory(job=job) - def _finish_dataset(self, output_name, dataset, job, context, final_job_state, remote_metadata_directory): + def _finish_dataset( + self, output_name, dataset: "DatasetInstance", job: Job, context, final_job_state, remote_metadata_directory + ): implicit_collection_jobs = job.implicit_collection_jobs_association purged = dataset.dataset.purged if not purged and dataset.dataset.external_filename is None: @@ -1779,6 +1783,7 @@ def _finish_dataset(self, output_name, dataset, job, context, final_job_state, r # it would be quicker to just copy the metadata from the originating output dataset, # but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() retry_internally = util.asbool(self.get_destination_configuration("retry_metadata_internally", True)) + assert self.tool if not retry_internally and self.tool.tool_type == "interactive": retry_internally = util.asbool( self.get_destination_configuration("retry_interactivetool_metadata_internally", retry_internally) @@ -1992,16 +1997,16 @@ def fail(message=job.info, exception=None): if ( not final_job_state == job.states.ERROR and not dataset_assoc.dataset.dataset.state == job.states.ERROR - and not dataset_assoc.dataset.dataset.state == model.Dataset.states.DEFERRED + and not dataset_assoc.dataset.dataset.state == Dataset.states.DEFERRED ): # We don't set datsets in error state to OK because discover_outputs may have already set the state to error - dataset_assoc.dataset.dataset.state = model.Dataset.states.OK + dataset_assoc.dataset.dataset.state = Dataset.states.OK if job.states.ERROR == final_job_state: for dataset_assoc in output_dataset_associations: log.debug("(%s) setting dataset %s state to ERROR", job.id, dataset_assoc.dataset.dataset.id) # TODO: This is where the state is being set to error. Change it! - dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR + dataset_assoc.dataset.dataset.state = Dataset.states.ERROR # Pause any dependent jobs (and those jobs' outputs) for dep_job_assoc in dataset_assoc.dataset.dependent_jobs: self.pause( diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index f741da9b1c82..ec59eb2bb534 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -5108,9 +5108,7 @@ def find_conversion_destination( self, accepted_formats: List[str], **kwd ) -> Tuple[bool, Optional[str], Optional["DatasetInstance"]]: """Returns ( target_ext, existing converted dataset )""" - return self.datatype.find_conversion_destination( - self, accepted_formats, _get_datatypes_registry(), **kwd # type:ignore[arg-type] - ) + return self.datatype.find_conversion_destination(self, accepted_formats, _get_datatypes_registry(), **kwd) def add_validation_error(self, validation_error): self.validation_errors.append(validation_error) diff --git a/lib/galaxy/model/store/__init__.py b/lib/galaxy/model/store/__init__.py index a3bf6a6ec896..3ddafe11ad93 100644 --- a/lib/galaxy/model/store/__init__.py +++ b/lib/galaxy/model/store/__init__.py @@ -718,7 +718,7 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs): # Try to set metadata directly. @mvdbeek thinks we should only record the datasets try: if dataset_instance.has_metadata_files: - dataset_instance.datatype.set_meta(dataset_instance) # type:ignore[arg-type] + dataset_instance.datatype.set_meta(dataset_instance) except Exception: log.debug(f"Metadata setting failed on {dataset_instance}", exc_info=True) dataset_instance.state = dataset_instance.dataset.states.FAILED_METADATA diff --git a/lib/galaxy/tools/data_fetch.py b/lib/galaxy/tools/data_fetch.py index bad82540d7fa..8c25990ea73b 100644 --- a/lib/galaxy/tools/data_fetch.py +++ b/lib/galaxy/tools/data_fetch.py @@ -366,7 +366,7 @@ def walk_extra_files(items, prefix=""): # TODO: # in galaxy json add 'extra_files' and point at target derived from extra_files: - needs_grooming = not link_data_only and datatype and datatype.dataset_content_needs_grooming(path) # type: ignore[arg-type] + needs_grooming = not link_data_only and datatype and datatype.dataset_content_needs_grooming(path) if needs_grooming: # Groom the dataset content if necessary transform.append( @@ -623,7 +623,7 @@ def __new_dataset_path(self): self.__upload_count += 1 return path - def ensure_in_working_directory(self, path, purge_source, in_place): + def ensure_in_working_directory(self, path: str, purge_source, in_place) -> str: if in_directory(path, self.__workdir): return path From 6929fbb3974ee93ca5aa3ae5de52de2ed2a70d57 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 19 Nov 2024 01:04:11 +0000 Subject: [PATCH 2/5] Remove duplicated statement And fix a typo. --- lib/galaxy/managers/datasets.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/galaxy/managers/datasets.py b/lib/galaxy/managers/datasets.py index 6d90e47eb9ce..59137af76e9a 100644 --- a/lib/galaxy/managers/datasets.py +++ b/lib/galaxy/managers/datasets.py @@ -171,7 +171,6 @@ def compute_hash(self, request: ComputeDatasetHashTaskRequest): file_path = dataset.get_file_name() hash_function = request.hash_function calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=file_path) - extra_files_path = request.extra_files_path dataset_hash = model.DatasetHash( hash_function=hash_function, hash_value=calculated_hash_value, @@ -433,7 +432,7 @@ def is_composite(self, dataset_assoc: U): """ Return True if this hda/ldda is a composite type dataset. - .. note:: see also (whereever we keep information on composite datatypes?) + .. note:: see also (wherever we keep information on composite datatypes?) """ return dataset_assoc.extension in self.app.datatypes_registry.get_composite_extensions() From fe4589db022f5202ef246ba92c53b03200fe9c0d Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 19 Nov 2024 01:05:55 +0000 Subject: [PATCH 3/5] Replace redundant ``HashFunctionNames`` with ``HashFunctionNameEnum`` And fix a typo. --- client/src/api/schema/schema.ts | 10 ++-------- lib/galaxy/schema/schema.py | 12 ++---------- lib/galaxy/util/hash_util.py | 2 +- lib/galaxy/webapps/galaxy/services/datasets.py | 4 ++-- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 7da851a70511..1e73e27175a7 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -8002,7 +8002,7 @@ export interface components { * Hash Function * @description The hash function used to generate the hash. */ - hash_function: components["schemas"]["HashFunctionNames"]; + hash_function: components["schemas"]["HashFunctionNameEnum"]; /** * Hash Value * @description The hash value. @@ -10868,16 +10868,10 @@ export interface components { }; /** * HashFunctionNameEnum - * @description Particular pieces of information that can be requested for a dataset. + * @description Hash function names that can be used to generate checksums for files. * @enum {string} */ HashFunctionNameEnum: "MD5" | "SHA-1" | "SHA-256" | "SHA-512"; - /** - * HashFunctionNames - * @description Hash function names that can be used to generate checksums for datasets. - * @enum {string} - */ - HashFunctionNames: "MD5" | "SHA-1" | "SHA-256" | "SHA-512"; /** HdaDestination */ HdaDestination: { /** diff --git a/lib/galaxy/schema/schema.py b/lib/galaxy/schema/schema.py index d1e05c577cec..3febee546896 100644 --- a/lib/galaxy/schema/schema.py +++ b/lib/galaxy/schema/schema.py @@ -50,6 +50,7 @@ OffsetNaiveDatetime, RelativeUrl, ) +from galaxy.util.hash_util import HashFunctionNameEnum from galaxy.util.sanitize_html import sanitize_html USER_MODEL_CLASS = Literal["User"] @@ -121,15 +122,6 @@ class DatasetCollectionPopulatedState(str, Enum): FAILED = "failed" # some problem populating state, won't be populated -class HashFunctionNames(str, Enum): - """Hash function names that can be used to generate checksums for datasets.""" - - md5 = "MD5" - sha1 = "SHA-1" - sha256 = "SHA-256" - sha512 = "SHA-512" - - # Generic and common Field annotations that can be reused across models RelativeUrlField = Annotated[ @@ -733,7 +725,7 @@ class DatasetHash(Model): title="ID", description="Encoded ID of the dataset hash.", ) - hash_function: HashFunctionNames = Field( + hash_function: HashFunctionNameEnum = Field( ..., title="Hash Function", description="The hash function used to generate the hash.", diff --git a/lib/galaxy/util/hash_util.py b/lib/galaxy/util/hash_util.py index efb5d0ce7e1b..addf26fb90bc 100644 --- a/lib/galaxy/util/hash_util.py +++ b/lib/galaxy/util/hash_util.py @@ -34,7 +34,7 @@ class HashFunctionNameEnum(str, Enum): - """Particular pieces of information that can be requested for a dataset.""" + """Hash function names that can be used to generate checksums for files.""" md5 = "MD5" sha1 = "SHA-1" diff --git a/lib/galaxy/webapps/galaxy/services/datasets.py b/lib/galaxy/webapps/galaxy/services/datasets.py index 4f6fef304697..20c22a720591 100644 --- a/lib/galaxy/webapps/galaxy/services/datasets.py +++ b/lib/galaxy/webapps/galaxy/services/datasets.py @@ -537,11 +537,11 @@ def get_drs_object(self, trans: ProvidesHistoryContext, object_id: str, request_ checksums.append(Checksum(type=type, checksum=checksum)) if len(checksums) == 0: - hash_funciton = HashFunctionNameEnum.md5 + hash_function = HashFunctionNameEnum.md5 request = ComputeDatasetHashTaskRequest( dataset_id=dataset_instance.dataset.id, extra_files_path=None, - hash_function=hash_funciton, + hash_function=hash_function, user=None, ) compute_dataset_hash.delay(request=request, task_user_id=getattr(trans.user, "id", None)) From aceaa11117a12902f0216425673f10ef5200f406 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 19 Nov 2024 22:30:37 +0000 Subject: [PATCH 4/5] Remove redundant check --- lib/galaxy/model/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index ec59eb2bb534..f5f1085dd850 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4801,8 +4801,6 @@ def set_skipped(self, object_store_populator: "ObjectStorePopulator") -> None: self.set_total_size() def get_file_name(self, sync_cache: bool = True) -> str: - if self.dataset.purged: - return "" return self.dataset.get_file_name(sync_cache=sync_cache) def set_file_name(self, filename: str): From 1b54c0bcd850e6eeff068a9343911c2066834ca8 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Tue, 19 Nov 2024 01:11:01 +0000 Subject: [PATCH 5/5] Calculate hash for new non-deferred datasets when finishing a job This is configurable with two new options: - `calculate_dataset_hash`: in which cases Galaxy should calculate a hash for a new dataset. Possible values: 'always', 'upload' (the default), 'never'. - `hash_function`. Possible values: 'md5', 'sha1', 'sha256', 'sha512' Hashes are calculated via a Celery task, so currently only if the 'enable_celery_tasks' option is set to true. --- doc/source/admin/galaxy_options.rst | 27 ++++++++++ lib/galaxy/config/__init__.py | 9 ++++ lib/galaxy/config/sample/galaxy.yml.sample | 12 +++++ lib/galaxy/config/schemas/config_schema.yml | 22 +++++++++ lib/galaxy/jobs/__init__.py | 20 ++++++++ lib/galaxy/model/__init__.py | 2 +- lib/galaxy/model/deferred.py | 22 ++++----- lib/galaxy_test/api/test_datasets.py | 32 ++---------- lib/galaxy_test/base/populators.py | 7 +++ test/integration/test_dataset_hashing.py | 49 +++++++++++++++++++ ...test_materialize_dataset_instance_tasks.py | 21 ++++---- 11 files changed, 172 insertions(+), 51 deletions(-) create mode 100644 test/integration/test_dataset_hashing.py diff --git a/doc/source/admin/galaxy_options.rst b/doc/source/admin/galaxy_options.rst index a2a077224750..d7fe4cd5f41f 100644 --- a/doc/source/admin/galaxy_options.rst +++ b/doc/source/admin/galaxy_options.rst @@ -4719,6 +4719,33 @@ :Type: float +~~~~~~~~~~~~~~~~~~~~~~~~~~ +``calculate_dataset_hash`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:Description: + In which cases Galaxy should calculate a hash for a new dataset. + Dataset hashes can be used by the Galaxy job cache/search to check + if job inputs match. Setting the 'enable_celery_tasks' option to + true is also required for dataset hash calculation. Possible + values are: 'always', 'upload' (the default), 'never'. If set to + 'upload', the hash is calculated only for the outputs of upload + jobs. +:Default: ``upload`` +:Type: str + + +~~~~~~~~~~~~~~~~~ +``hash_function`` +~~~~~~~~~~~~~~~~~ + +:Description: + Hash function to use if 'calculate_dataset_hash' is enabled. + Possible values are: 'md5', 'sha1', 'sha256', 'sha512' +:Default: ``sha256`` +:Type: str + + ~~~~~~~~~~~~~~~~~~~~~ ``metadata_strategy`` ~~~~~~~~~~~~~~~~~~~~~ diff --git a/lib/galaxy/config/__init__.py b/lib/galaxy/config/__init__.py index d253b3d0d9b6..1dc273bc7240 100644 --- a/lib/galaxy/config/__init__.py +++ b/lib/galaxy/config/__init__.py @@ -46,6 +46,7 @@ from galaxy.util.custom_logging import LOGLV_TRACE from galaxy.util.dynamic import HasDynamicProperties from galaxy.util.facts import get_facts +from galaxy.util.hash_util import HashFunctionNameEnum from galaxy.util.properties import ( read_properties_from_file, running_from_source, @@ -716,6 +717,7 @@ class GalaxyAppConfiguration(BaseAppConfiguration, CommonConfigurationMixin): galaxy_data_manager_data_path: str galaxy_infrastructure_url: str hours_between_check: int + hash_function: HashFunctionNameEnum integrated_tool_panel_config: str involucro_path: str len_file_path: str @@ -897,6 +899,13 @@ def _process_config(self, kwargs: Dict[str, Any]) -> None: self.update_integrated_tool_panel = kwargs.get("update_integrated_tool_panel", True) self.galaxy_data_manager_data_path = self.galaxy_data_manager_data_path or self.tool_data_path self.tool_secret = kwargs.get("tool_secret", "") + if self.calculate_dataset_hash not in ("always", "upload", "never"): + raise ConfigurationError( + f"Unrecognized value for calculate_dataset_hash option: {self.calculate_dataset_hash}" + ) + if self.hash_function not in HashFunctionNameEnum.__members__: + raise ConfigurationError(f"Unrecognized value for hash_function option: {self.hash_function}") + self.hash_function = HashFunctionNameEnum[self.hash_function] self.metadata_strategy = kwargs.get("metadata_strategy", "directory") self.use_remote_user = self.use_remote_user or self.single_user self.fetch_url_allowlist_ips = parse_allowlist_ips(listify(kwargs.get("fetch_url_allowlist"))) diff --git a/lib/galaxy/config/sample/galaxy.yml.sample b/lib/galaxy/config/sample/galaxy.yml.sample index d2da6d1723ca..713b3788d26c 100644 --- a/lib/galaxy/config/sample/galaxy.yml.sample +++ b/lib/galaxy/config/sample/galaxy.yml.sample @@ -2545,6 +2545,18 @@ galaxy: # handler processes. Float values are allowed. #workflow_monitor_sleep: 1.0 + # In which cases Galaxy should calculate a hash for a new dataset. + # Dataset hashes can be used by the Galaxy job cache/search to check + # if job inputs match. Setting the 'enable_celery_tasks' option to + # true is also required for dataset hash calculation. Possible values + # are: 'always', 'upload' (the default), 'never'. If set to 'upload', + # the hash is calculated only for the outputs of upload jobs. + #calculate_dataset_hash: upload + + # Hash function to use if 'calculate_dataset_hash' is enabled. + # Possible values are: 'md5', 'sha1', 'sha256', 'sha512' + #hash_function: sha256 + # Determines how metadata will be set. Valid values are `directory`, # `extended`, `directory_celery` and `extended_celery`. In extended # mode jobs will decide if a tool run failed, the object stores diff --git a/lib/galaxy/config/schemas/config_schema.yml b/lib/galaxy/config/schemas/config_schema.yml index e659efd6d21c..d1e55b6a6eac 100644 --- a/lib/galaxy/config/schemas/config_schema.yml +++ b/lib/galaxy/config/schemas/config_schema.yml @@ -2788,6 +2788,7 @@ mapping: Avoiding making this a boolean because we may add options such as 'in-single-form-view' or 'in-simplified-workflow-views'. https://github.com/galaxyproject/galaxy/pull/9809/files#r461889109 + allow_user_dataset_purge: type: bool default: true @@ -3454,6 +3455,26 @@ mapping: decreased if extremely high job throughput is necessary, but doing so can increase CPU usage of handler processes. Float values are allowed. + calculate_dataset_hash: + type: str + default: upload + required: false + enum: ['always', 'upload', 'never'] + desc: | + In which cases Galaxy should calculate a hash for a new dataset. + Dataset hashes can be used by the Galaxy job cache/search to check if job inputs match. + Setting the 'enable_celery_tasks' option to true is also required for dataset hash calculation. + Possible values are: 'always', 'upload' (the default), 'never'. If set to 'upload', the + hash is calculated only for the outputs of upload jobs. + + hash_function: + type: str + default: sha256 + required: false + desc: | + Hash function to use if 'calculate_dataset_hash' is enabled. Possible values + are: 'md5', 'sha1', 'sha256', 'sha512' + metadata_strategy: type: str required: false @@ -3547,6 +3568,7 @@ mapping: default: always required: false reloadable: true + enum: ['always', 'onsuccess', 'never'] desc: | Clean up various bits of jobs left on the filesystem after completion. These bits include the job working directory, external metadata temporary files, diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 0bf12219ce3f..22a909466b41 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -75,6 +75,7 @@ ObjectStorePopulator, serialize_static_object_store_config, ) +from galaxy.schema.tasks import ComputeDatasetHashTaskRequest from galaxy.structured_app import MinimalManagerApp from galaxy.tool_util.deps import requirements from galaxy.tool_util.output_checker import ( @@ -2038,6 +2039,25 @@ def fail(message=job.info, exception=None): dataset.full_delete() collected_bytes = 0 + # Calculate dataset hash + for dataset_assoc in output_dataset_associations: + dataset = dataset_assoc.dataset.dataset + if not dataset.purged and dataset.state != Dataset.states.DEFERRED and not dataset.hashes: + if self.app.config.calculate_dataset_hash == "always" or ( + self.app.config.calculate_dataset_hash == "upload" and job.tool_id in ("upload1", "__DATA_FETCH__") + ): + # Calculate dataset hash via a celery task + if self.app.config.enable_celery_tasks: + from galaxy.celery.tasks import compute_dataset_hash + + extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None + request = ComputeDatasetHashTaskRequest( + dataset_id=dataset.id, + extra_files_path=extra_files_path, + hash_function=self.app.config.hash_function, + ) + compute_dataset_hash.delay(request=request) + user = job.user if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use: user.adjust_total_disk_usage(collected_bytes, quota_source_info.label) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index f5f1085dd850..a5e03373dfb7 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4290,7 +4290,7 @@ def get_file_name(self, sync_cache: bool = True) -> str: if not file_name and self.state not in (self.states.NEW, self.states.QUEUED): # Queued datasets can be assigned an object store and have a filename, but they aren't guaranteed to. # Anything after queued should have a file name. - log.warning(f"Failed to determine file name for dataset {self.id}") + log.warning(f"Failed to determine file name for dataset {self.id} in state {self.state}") return file_name else: filename = self.external_filename diff --git a/lib/galaxy/model/deferred.py b/lib/galaxy/model/deferred.py index 06eb995adc93..784e4e9a8ba3 100644 --- a/lib/galaxy/model/deferred.py +++ b/lib/galaxy/model/deferred.py @@ -4,7 +4,6 @@ import shutil from typing import ( cast, - List, NamedTuple, Optional, Union, @@ -26,7 +25,6 @@ Dataset, DatasetCollection, DatasetCollectionElement, - DatasetHash, DatasetSource, DescribesHash, History, @@ -142,7 +140,7 @@ def ensure_materialized( sa_session.commit() object_store_populator.set_dataset_object_store_id(materialized_dataset) try: - path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes) + path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset) object_store.update_from_file(materialized_dataset, file_name=path) materialized_dataset.set_size() except Exception as e: @@ -152,9 +150,9 @@ def ensure_materialized( assert transient_path_mapper transient_paths = transient_path_mapper.transient_paths_for(dataset) # TODO: optimize this by streaming right to this path... - # TODO: take into acount transform and ensure we are and are not modifying the file as appropriate. + # TODO: take into account transform and ensure we are and are not modifying the file as appropriate. try: - path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset_hashes) + path = self._stream_source(target_source, dataset_instance.datatype, materialized_dataset) shutil.move(path, transient_paths.external_filename) materialized_dataset.external_filename = transient_paths.external_filename except Exception as e: @@ -178,9 +176,9 @@ def ensure_materialized( materialized_dataset_instance = cast(HistoryDatasetAssociation, dataset_instance) if exception_materializing is not None: materialized_dataset.state = Dataset.states.ERROR - materialized_dataset_instance.info = ( - f"Failed to materialize deferred dataset with exception: {exception_materializing}" - ) + error_msg = f"Failed to materialize deferred dataset with exception: {exception_materializing}" + materialized_dataset_instance.info = error_msg + log.error(error_msg) if attached: sa_session = self._sa_session if sa_session is None: @@ -206,7 +204,7 @@ def ensure_materialized( materialized_dataset_instance.metadata_deferred = False return materialized_dataset_instance - def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes: List[DatasetHash]) -> str: + def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str: source_uri = target_source.source_uri if source_uri is None: raise Exception("Cannot stream from dataset source without specified source_uri") @@ -236,9 +234,11 @@ def _stream_source(self, target_source: DatasetSource, datatype, dataset_hashes: path = convert_result.converted_path if datatype_groom: datatype.groom_dataset_content(path) + # Grooming is not reproducible (e.g. temporary paths in BAM headers), so invalidate hashes + dataset.hashes = [] - if dataset_hashes: - for dataset_hash in dataset_hashes: + if dataset.hashes: + for dataset_hash in dataset.hashes: _validate_hash(path, dataset_hash, "dataset contents") return path diff --git a/lib/galaxy_test/api/test_datasets.py b/lib/galaxy_test/api/test_datasets.py index 87ec35daf517..b74a230b213d 100644 --- a/lib/galaxy_test/api/test_datasets.py +++ b/lib/galaxy_test/api/test_datasets.py @@ -758,23 +758,13 @@ def test_composite_datatype_download(self, history_id): def test_compute_md5_on_primary_dataset(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5") def test_compute_sha1_on_composite_dataset(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value( hda_details, @@ -785,11 +775,6 @@ def test_compute_sha1_on_composite_dataset(self, history_id): def test_duplicated_hash_requests_on_primary(self, history_id): hda = self.dataset_populator.new_dataset(history_id, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - self.dataset_populator.compute_hash(hda["id"]) self.dataset_populator.compute_hash(hda["id"]) hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda) @@ -797,19 +782,12 @@ def test_duplicated_hash_requests_on_primary(self, history_id): def test_duplicated_hash_requests_on_extra_files(self, history_id): output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True) - hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) - assert "hashes" in hda_details, str(hda_details.keys()) - hashes = hda_details["hashes"] - assert len(hashes) == 0 - # 4 unique requests, but make them twice... for _ in range(2): - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-256", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash(hda_details["id"], hash_function="SHA-1", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash(hda_details["id"], hash_function="MD5", extra_files_path="Roadmaps") - self.dataset_populator.compute_hash( - hda_details["id"], hash_function="SHA-256", extra_files_path="Sequences" - ) + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-1", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="MD5", extra_files_path="Roadmaps") + self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Sequences") hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output) self.assert_hash_value(hda_details, "ce0c0ef1073317ff96c896c249b002dc", "MD5", extra_files_path="Roadmaps") diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 684689de2f6f..3e47c70c18f8 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1398,6 +1398,13 @@ def validated(): return wait_on(validated, "dataset validation") + def wait_for_dataset_hashes(self, history_id: str, dataset_id: str): + def dataset_hashes_present(): + hda = self.get_history_dataset_details(history_id=history_id, dataset_id=dataset_id) + return hda["hashes"] or None + + return wait_on(dataset_hashes_present, "dataset hash presence") + def setup_history_for_export_testing(self, history_name): using_requirement("new_history") history_id = self.new_history(name=history_name) diff --git a/test/integration/test_dataset_hashing.py b/test/integration/test_dataset_hashing.py new file mode 100644 index 000000000000..a3a123b735f1 --- /dev/null +++ b/test/integration/test_dataset_hashing.py @@ -0,0 +1,49 @@ +from typing import Optional + +from galaxy_test.base.populators import DatasetPopulator +from galaxy_test.driver import integration_util + + +class TestDatasetHashingIntegration(integration_util.IntegrationTestCase): + dataset_populator: DatasetPopulator + calculate_dataset_hash: Optional[str] = None + + def setUp(self) -> None: + super().setUp() + self.dataset_populator = DatasetPopulator(self.galaxy_interactor) + + @classmethod + def handle_galaxy_config_kwds(cls, config) -> None: + super().handle_galaxy_config_kwds(config) + if cls.calculate_dataset_hash is not None: + config["enable_celery_tasks"] = True + config["calculate_dataset_hash"] = cls.calculate_dataset_hash + + def test_hashing(self, history_id: str) -> None: + hda = self.dataset_populator.new_dataset(history_id, wait=True) + if self.calculate_dataset_hash in [None, "always", "upload"]: + hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=hda["id"]) + assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea" + else: + assert hda["hashes"] == [], hda + inputs = {"input1": {"src": "hda", "id": hda["id"]}} + run_response = self.dataset_populator.run_tool_raw("cat1", inputs=inputs, history_id=history_id) + self.dataset_populator.wait_for_tool_run(history_id=history_id, run_response=run_response) + cat_dataset = self.dataset_populator.get_history_dataset_details(history_id=history_id) + if self.calculate_dataset_hash == "always": + hashes = self.dataset_populator.wait_for_dataset_hashes(history_id=history_id, dataset_id=cat_dataset["id"]) + assert hashes[0]["hash_value"] == "a17dcdfd36f47303a4824f1309d43ac14d7491ab3b8abb28782ac8e8d3b680ea" + else: + assert cat_dataset["hashes"] == [], cat_dataset + + +class TestDatasetHashingAlwaysIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "always" + + +class TestDatasetHashingUploadIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "upload" + + +class TestDatasetHashingNeverIntegration(TestDatasetHashingIntegration): + calculate_dataset_hash = "never" diff --git a/test/integration/test_materialize_dataset_instance_tasks.py b/test/integration/test_materialize_dataset_instance_tasks.py index 817098994a5f..aef30d0a29c6 100644 --- a/test/integration/test_materialize_dataset_instance_tasks.py +++ b/test/integration/test_materialize_dataset_instance_tasks.py @@ -160,7 +160,7 @@ def test_upload_vs_materialize_simplest_upload(self, history_id: str): assert len(uploaded_details["sources"]) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This is a line of text." - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This is a line of text." @@ -186,7 +186,7 @@ def test_upload_vs_materialize_to_posix_lines(self, history_id: str): assert len(transform) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This is a line of text.\n" - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This is a line of text.\n" @@ -212,7 +212,7 @@ def test_upload_vs_materialize_space_to_tab(self, history_id: str): assert len(transform) == 1 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This\tis\ta\tline\tof\ttext." - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This\tis\ta\tline\tof\ttext." @@ -239,7 +239,7 @@ def test_upload_vs_materialize_to_posix_and_space_to_tab(self, history_id: str): assert len(transform) == 2 content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output, assert_ok=True) assert content == "This\tis\ta\tline\tof\ttext.\n" - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) content = self.dataset_populator.get_history_dataset_content(new_history_id, hid=2, assert_ok=False) assert content == "This\tis\ta\tline\tof\ttext.\n" @@ -262,20 +262,17 @@ def test_upload_vs_materialize_grooming(self, history_id: str): transform = source_0["transform"] assert isinstance(transform, list) assert len(transform) == 1 - original_details = self.dataset_populator.get_history_dataset_details( - history_id, dataset=output, assert_ok=True - ) - new_history_id = self._reupload_and_then_materialize(history_id, output) + new_history_id = self._reupload_and_then_materialize(output) new_details = self.dataset_populator.get_history_dataset_details(new_history_id, hid=2, assert_ok=False) - for key in original_details.keys(): + for key in uploaded_details.keys(): if key in ["metadata_bam_header", "metadata_bam_index"]: # differs because command-line different, index path different, and such... continue if key.startswith("metadata_"): - assert original_details[key] == new_details[key], f"Mismatched on key {key}" - assert original_details["file_ext"] == new_details["file_ext"] + assert uploaded_details[key] == new_details[key], f"Mismatched on key {key}" + assert uploaded_details["file_ext"] == new_details["file_ext"] - def _reupload_and_then_materialize(self, history_id, dataset): + def _reupload_and_then_materialize(self, dataset): new_history_id, uploaded_hdas = self.dataset_populator.reupload_contents(dataset) assert len(uploaded_hdas) == 1 deferred_hda = uploaded_hdas[0]