Skip to content

Commit

Permalink
Merge pull request #19181 from nsoranzo/dataset_hashes_at_job_finish
Browse files Browse the repository at this point in the history
Calculate hash for new non-deferred datasets when finishing a job
  • Loading branch information
jmchilton authored Nov 21, 2024
2 parents 701d0ff + 1b54c0b commit 4c46062
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 94 deletions.
10 changes: 2 additions & 8 deletions client/src/api/schema/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8002,7 +8002,7 @@ export interface components {
* Hash Function
* @description The hash function used to generate the hash.
*/
hash_function: components["schemas"]["HashFunctionNames"];
hash_function: components["schemas"]["HashFunctionNameEnum"];
/**
* Hash Value
* @description The hash value.
Expand Down Expand Up @@ -10868,16 +10868,10 @@ export interface components {
};
/**
* HashFunctionNameEnum
* @description Particular pieces of information that can be requested for a dataset.
* @description Hash function names that can be used to generate checksums for files.
* @enum {string}
*/
HashFunctionNameEnum: "MD5" | "SHA-1" | "SHA-256" | "SHA-512";
/**
* HashFunctionNames
* @description Hash function names that can be used to generate checksums for datasets.
* @enum {string}
*/
HashFunctionNames: "MD5" | "SHA-1" | "SHA-256" | "SHA-512";
/** HdaDestination */
HdaDestination: {
/**
Expand Down
27 changes: 27 additions & 0 deletions doc/source/admin/galaxy_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4719,6 +4719,33 @@
:Type: float


~~~~~~~~~~~~~~~~~~~~~~~~~~
``calculate_dataset_hash``
~~~~~~~~~~~~~~~~~~~~~~~~~~

:Description:
In which cases Galaxy should calculate a hash for a new dataset.
Dataset hashes can be used by the Galaxy job cache/search to check
if job inputs match. Setting the 'enable_celery_tasks' option to
true is also required for dataset hash calculation. Possible
values are: 'always', 'upload' (the default), 'never'. If set to
'upload', the hash is calculated only for the outputs of upload
jobs.
:Default: ``upload``
:Type: str


~~~~~~~~~~~~~~~~~
``hash_function``
~~~~~~~~~~~~~~~~~

:Description:
Hash function to use if 'calculate_dataset_hash' is enabled.
Possible values are: 'md5', 'sha1', 'sha256', 'sha512'
:Default: ``sha256``
:Type: str


~~~~~~~~~~~~~~~~~~~~~
``metadata_strategy``
~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def set_metadata(
try:
if overwrite:
hda_manager.overwrite_metadata(dataset_instance)
dataset_instance.datatype.set_meta(dataset_instance) # type:ignore [arg-type]
dataset_instance.datatype.set_meta(dataset_instance)
dataset_instance.set_peek()
# Reset SETTING_METADATA state so the dataset instance getter picks the dataset state
dataset_instance.set_metadata_success_state()
Expand Down
9 changes: 9 additions & 0 deletions lib/galaxy/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from galaxy.util.custom_logging import LOGLV_TRACE
from galaxy.util.dynamic import HasDynamicProperties
from galaxy.util.facts import get_facts
from galaxy.util.hash_util import HashFunctionNameEnum
from galaxy.util.properties import (
read_properties_from_file,
running_from_source,
Expand Down Expand Up @@ -716,6 +717,7 @@ class GalaxyAppConfiguration(BaseAppConfiguration, CommonConfigurationMixin):
galaxy_data_manager_data_path: str
galaxy_infrastructure_url: str
hours_between_check: int
hash_function: HashFunctionNameEnum
integrated_tool_panel_config: str
involucro_path: str
len_file_path: str
Expand Down Expand Up @@ -897,6 +899,13 @@ def _process_config(self, kwargs: Dict[str, Any]) -> None:
self.update_integrated_tool_panel = kwargs.get("update_integrated_tool_panel", True)
self.galaxy_data_manager_data_path = self.galaxy_data_manager_data_path or self.tool_data_path
self.tool_secret = kwargs.get("tool_secret", "")
if self.calculate_dataset_hash not in ("always", "upload", "never"):
raise ConfigurationError(
f"Unrecognized value for calculate_dataset_hash option: {self.calculate_dataset_hash}"
)
if self.hash_function not in HashFunctionNameEnum.__members__:
raise ConfigurationError(f"Unrecognized value for hash_function option: {self.hash_function}")
self.hash_function = HashFunctionNameEnum[self.hash_function]
self.metadata_strategy = kwargs.get("metadata_strategy", "directory")
self.use_remote_user = self.use_remote_user or self.single_user
self.fetch_url_allowlist_ips = parse_allowlist_ips(listify(kwargs.get("fetch_url_allowlist")))
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/config/sample/galaxy.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,18 @@ galaxy:
# handler processes. Float values are allowed.
#workflow_monitor_sleep: 1.0

# In which cases Galaxy should calculate a hash for a new dataset.
# Dataset hashes can be used by the Galaxy job cache/search to check
# if job inputs match. Setting the 'enable_celery_tasks' option to
# true is also required for dataset hash calculation. Possible values
# are: 'always', 'upload' (the default), 'never'. If set to 'upload',
# the hash is calculated only for the outputs of upload jobs.
#calculate_dataset_hash: upload

# Hash function to use if 'calculate_dataset_hash' is enabled.
# Possible values are: 'md5', 'sha1', 'sha256', 'sha512'
#hash_function: sha256

# Determines how metadata will be set. Valid values are `directory`,
# `extended`, `directory_celery` and `extended_celery`. In extended
# mode jobs will decide if a tool run failed, the object stores
Expand Down
22 changes: 22 additions & 0 deletions lib/galaxy/config/schemas/config_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,7 @@ mapping:
Avoiding making this a boolean because we may add options such as 'in-single-form-view'
or 'in-simplified-workflow-views'. https://github.com/galaxyproject/galaxy/pull/9809/files#r461889109
allow_user_dataset_purge:
type: bool
default: true
Expand Down Expand Up @@ -3454,6 +3455,26 @@ mapping:
decreased if extremely high job throughput is necessary, but doing so can increase CPU
usage of handler processes. Float values are allowed.
calculate_dataset_hash:
type: str
default: upload
required: false
enum: ['always', 'upload', 'never']
desc: |
In which cases Galaxy should calculate a hash for a new dataset.
Dataset hashes can be used by the Galaxy job cache/search to check if job inputs match.
Setting the 'enable_celery_tasks' option to true is also required for dataset hash calculation.
Possible values are: 'always', 'upload' (the default), 'never'. If set to 'upload', the
hash is calculated only for the outputs of upload jobs.
hash_function:
type: str
default: sha256
required: false
desc: |
Hash function to use if 'calculate_dataset_hash' is enabled. Possible values
are: 'md5', 'sha1', 'sha256', 'sha512'
metadata_strategy:
type: str
required: false
Expand Down Expand Up @@ -3547,6 +3568,7 @@ mapping:
default: always
required: false
reloadable: true
enum: ['always', 'onsuccess', 'never']
desc: |
Clean up various bits of jobs left on the filesystem after completion. These
bits include the job working directory, external metadata temporary files,
Expand Down
10 changes: 8 additions & 2 deletions lib/galaxy/datatypes/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
Location of protocols used in datatypes
"""

from typing import Any
from typing import (
Any,
TYPE_CHECKING,
)

from typing_extensions import Protocol

if TYPE_CHECKING:
from sqlalchemy.orm import Mapped


class HasClearAssociatedFiles(Protocol):
def clear_associated_files(self, metadata_safe: bool = False, purge: bool = False) -> None: ...
Expand Down Expand Up @@ -39,7 +45,7 @@ class HasHid(Protocol):


class HasId(Protocol):
id: int
id: "Mapped[int]"


class HasInfo(Protocol):
Expand Down
43 changes: 34 additions & 9 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import (
Dataset,
Job,
store,
Task,
Expand All @@ -74,6 +75,7 @@
ObjectStorePopulator,
serialize_static_object_store_config,
)
from galaxy.schema.tasks import ComputeDatasetHashTaskRequest
from galaxy.structured_app import MinimalManagerApp
from galaxy.tool_util.deps import requirements
from galaxy.tool_util.output_checker import (
Expand All @@ -100,6 +102,7 @@

if TYPE_CHECKING:
from galaxy.jobs.handler import JobHandlerQueue
from galaxy.model import DatasetInstance
from galaxy.tools import Tool

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -986,7 +989,7 @@ class MinimalJobWrapper(HasResourceParameters):

def __init__(
self,
job: model.Job,
job: Job,
app: MinimalManagerApp,
use_persisted_destination: bool = False,
tool: Optional["Tool"] = None,
Expand Down Expand Up @@ -1190,7 +1193,7 @@ def job_destination(self) -> JobDestination:
def galaxy_url(self):
return self.get_destination_configuration("galaxy_infrastructure_url")

def get_job(self) -> model.Job:
def get_job(self) -> Job:
job = self.sa_session.get(Job, self.job_id)
assert job
return job
Expand Down Expand Up @@ -1540,7 +1543,7 @@ def mark_as_resubmitted(self, info=None):
self.sa_session.refresh(job)
if info is not None:
job.info = info
job.set_state(model.Job.states.RESUBMITTED)
job.set_state(Job.states.RESUBMITTED)
self.sa_session.add(job)
with transaction(self.sa_session):
self.sa_session.commit()
Expand All @@ -1555,7 +1558,7 @@ def change_state(self, state, info=False, flush=True, job=None):
# on the current job state value to minimize race conditions.
self.sa_session.expire(job, ["state"])

if job.state in model.Job.terminal_states:
if job.state in Job.terminal_states:
log.warning(
"(%s) Ignoring state change from '%s' to '%s' for job that is already terminal",
job.id,
Expand Down Expand Up @@ -1610,7 +1613,7 @@ def get_destination_configuration(self, key, default=None):
def enqueue(self):
job = self.get_job()
# Change to queued state before handing to worker thread so the runner won't pick it up again
self.change_state(model.Job.states.QUEUED, flush=False, job=job)
self.change_state(Job.states.QUEUED, flush=False, job=job)
# Persist the destination so that the job will be included in counts if using concurrency limits
self.set_job_destination(self.job_destination, None, flush=False, job=job)
# Set object store after job destination so can leverage parameters...
Expand Down Expand Up @@ -1732,7 +1735,9 @@ def split_object_stores(output_name): # noqa: F811 https://github.com/PyCQA/pyf
job.object_store_id_overrides = object_store_id_overrides
self._setup_working_directory(job=job)

def _finish_dataset(self, output_name, dataset, job, context, final_job_state, remote_metadata_directory):
def _finish_dataset(
self, output_name, dataset: "DatasetInstance", job: Job, context, final_job_state, remote_metadata_directory
):
implicit_collection_jobs = job.implicit_collection_jobs_association
purged = dataset.dataset.purged
if not purged and dataset.dataset.external_filename is None:
Expand Down Expand Up @@ -1779,6 +1784,7 @@ def _finish_dataset(self, output_name, dataset, job, context, final_job_state, r
# it would be quicker to just copy the metadata from the originating output dataset,
# but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
retry_internally = util.asbool(self.get_destination_configuration("retry_metadata_internally", True))
assert self.tool
if not retry_internally and self.tool.tool_type == "interactive":
retry_internally = util.asbool(
self.get_destination_configuration("retry_interactivetool_metadata_internally", retry_internally)
Expand Down Expand Up @@ -1992,16 +1998,16 @@ def fail(message=job.info, exception=None):
if (
not final_job_state == job.states.ERROR
and not dataset_assoc.dataset.dataset.state == job.states.ERROR
and not dataset_assoc.dataset.dataset.state == model.Dataset.states.DEFERRED
and not dataset_assoc.dataset.dataset.state == Dataset.states.DEFERRED
):
# We don't set datsets in error state to OK because discover_outputs may have already set the state to error
dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
dataset_assoc.dataset.dataset.state = Dataset.states.OK

if job.states.ERROR == final_job_state:
for dataset_assoc in output_dataset_associations:
log.debug("(%s) setting dataset %s state to ERROR", job.id, dataset_assoc.dataset.dataset.id)
# TODO: This is where the state is being set to error. Change it!
dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
dataset_assoc.dataset.dataset.state = Dataset.states.ERROR
# Pause any dependent jobs (and those jobs' outputs)
for dep_job_assoc in dataset_assoc.dataset.dependent_jobs:
self.pause(
Expand Down Expand Up @@ -2033,6 +2039,25 @@ def fail(message=job.info, exception=None):
dataset.full_delete()
collected_bytes = 0

# Calculate dataset hash
for dataset_assoc in output_dataset_associations:
dataset = dataset_assoc.dataset.dataset
if not dataset.purged and dataset.state != Dataset.states.DEFERRED and not dataset.hashes:
if self.app.config.calculate_dataset_hash == "always" or (
self.app.config.calculate_dataset_hash == "upload" and job.tool_id in ("upload1", "__DATA_FETCH__")
):
# Calculate dataset hash via a celery task
if self.app.config.enable_celery_tasks:
from galaxy.celery.tasks import compute_dataset_hash

extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None
request = ComputeDatasetHashTaskRequest(
dataset_id=dataset.id,
extra_files_path=extra_files_path,
hash_function=self.app.config.hash_function,
)
compute_dataset_hash.delay(request=request)

user = job.user
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
user.adjust_total_disk_usage(collected_bytes, quota_source_info.label)
Expand Down
3 changes: 1 addition & 2 deletions lib/galaxy/managers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def compute_hash(self, request: ComputeDatasetHashTaskRequest):
file_path = dataset.get_file_name()
hash_function = request.hash_function
calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=file_path)
extra_files_path = request.extra_files_path
dataset_hash = model.DatasetHash(
hash_function=hash_function,
hash_value=calculated_hash_value,
Expand Down Expand Up @@ -433,7 +432,7 @@ def is_composite(self, dataset_assoc: U):
"""
Return True if this hda/ldda is a composite type dataset.
.. note:: see also (whereever we keep information on composite datatypes?)
.. note:: see also (wherever we keep information on composite datatypes?)
"""
return dataset_assoc.extension in self.app.datatypes_registry.get_composite_extensions()

Expand Down
8 changes: 2 additions & 6 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4290,7 +4290,7 @@ def get_file_name(self, sync_cache: bool = True) -> str:
if not file_name and self.state not in (self.states.NEW, self.states.QUEUED):
# Queued datasets can be assigned an object store and have a filename, but they aren't guaranteed to.
# Anything after queued should have a file name.
log.warning(f"Failed to determine file name for dataset {self.id}")
log.warning(f"Failed to determine file name for dataset {self.id} in state {self.state}")
return file_name
else:
filename = self.external_filename
Expand Down Expand Up @@ -4801,8 +4801,6 @@ def set_skipped(self, object_store_populator: "ObjectStorePopulator") -> None:
self.set_total_size()

def get_file_name(self, sync_cache: bool = True) -> str:
if self.dataset.purged:
return ""
return self.dataset.get_file_name(sync_cache=sync_cache)

def set_file_name(self, filename: str):
Expand Down Expand Up @@ -5108,9 +5106,7 @@ def find_conversion_destination(
self, accepted_formats: List[str], **kwd
) -> Tuple[bool, Optional[str], Optional["DatasetInstance"]]:
"""Returns ( target_ext, existing converted dataset )"""
return self.datatype.find_conversion_destination(
self, accepted_formats, _get_datatypes_registry(), **kwd # type:ignore[arg-type]
)
return self.datatype.find_conversion_destination(self, accepted_formats, _get_datatypes_registry(), **kwd)

def add_validation_error(self, validation_error):
self.validation_errors.append(validation_error)
Expand Down
Loading

0 comments on commit 4c46062

Please sign in to comment.