Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize object store cache operations #17025

Merged
merged 8 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions lib/galaxy/config/sample/object_store_conf.xml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@
backends to the distributed and hierarchical object stores (including
distributed and hierarchical themselves).
-->
<!--
Most of the Object Stores have <cache> option like:
<cache path="database/object_store_cache" size="1000" cache_updated_data="True" />

Here
"path" - local path to store cached data,
"size" - size of the cache in gigabytes.
"cache_updated_data" - optional parameter that allows to control data
is being sent directly
to an object store without storing it in the cache.
By default data is also copied to the cache (cache_updated_data="True").
-->


<!--
Sample Disk Object Store
Expand Down Expand Up @@ -135,7 +148,7 @@
<object_store type="aws_s3">
<auth access_key="...." secret_key="....." />
<bucket name="unique_bucket_name_all_lowercase" use_reduced_redundancy="False" />
<cache path="database/object_store_cache" size="1000" />
<cache path="database/object_store_cache" size="1000" cache_updated_data="True" />
jdavcs marked this conversation as resolved.
Show resolved Hide resolved
<extra_dir type="job_work" path="database/job_working_directory_s3"/>
<extra_dir type="temp" path="database/tmp_s3"/>
</object_store>
Expand All @@ -150,7 +163,7 @@
<resource name="demoResc" />
<zone name="tempZone" />
<connection host="localhost" port="1247" timeout="30" refresh_time="300" connection_pool_monitor_interval="3600"/>
<cache path="database/object_store_cache_irods" size="1000" />
<cache path="database/object_store_cache_irods" size="1000" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_irods"/>
<extra_dir type="temp" path="database/tmp_irods"/>
</object_store>
Expand All @@ -166,7 +179,7 @@
<auth access_key="...." secret_key="....." />
<bucket name="unique_bucket_name" use_reduced_redundancy="False" max_chunk_size="250"/>
<connection host="" port="" is_secure="" conn_path="" multipart="True"/>
<cache path="database/object_store_cache" size="1000" />
<cache path="database/object_store_cache" size="1000" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_swift"/>
<extra_dir type="temp" path="database/tmp_swift"/>
</object_store>
Expand All @@ -181,7 +194,7 @@
<object_store type="azure_blob">
<auth account_name="..." account_key="...." />
<container name="unique_container_name" max_chunk_size="250"/>
<cache path="database/object_store_cache" size="100" />
<cache path="database/object_store_cache" size="100" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_azure"/>
<extra_dir type="temp" path="database/tmp_azure"/>
</object_store>
Expand All @@ -196,7 +209,7 @@
<object_store type="cloud" provider="aws" order="0">
<auth access_key="..." secret_key="..." />
<bucket name="..." use_reduced_redundancy="False" />
<cache path="database/object_store_cache" size="100" />
<cache path="database/object_store_cache" size="100" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_s3"/>
<extra_dir type="temp" path="database/tmp_s3"/>
</object_store>
Expand All @@ -211,7 +224,7 @@
<object_store type="cloud" provider="azure" order="0">
<auth subscription_id="..." client_id="..." secret="..." tenant="..." />
<bucket name="..." use_reduced_redundancy="False" />
<cache path="database/object_store_cache" size="100" />
<cache path="database/object_store_cache" size="100" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_azure"/>
<extra_dir type="temp" path="database/tmp_azure"/>
</object_store>
Expand All @@ -226,7 +239,7 @@
<object_store type="cloud" provider="google" order="0">
<auth credentials_file="..." />
<bucket name="..." use_reduced_redundancy="False" />
<cache path="database/object_store_cache" size="1000" />
<cache path="database/object_store_cache" size="1000" cache_updated_data="True" />
<extra_dir type="job_work" path="database/job_working_directory_gcp"/>
<extra_dir type="temp" path="database/tmp_gcp"/>
</object_store>
Expand Down Expand Up @@ -319,7 +332,7 @@
<resource name="demoResc" />
<zone name="tempZone" />
<connection host="localhost" port="1247" timeout="30" refresh_time="300" connection_pool_monitor_interval="3600"/>
<cache path="database/object_store_cache_irods" size="1000" />
<cache path="database/object_store_cache_irods" size="1000" cache_updated_data="True" />
<badges>
<less_stable />
<backed_up>This data is backed up using iRODs native hierarchal storage management mechanisms. The rules describing how data is stored and backed up in iRODS can be found in our institutional [iRODS documentation](https://irods.org/uploads/2018/Saum-SURFsara-Data_Archiving_in_iRODS-slides.pdf)</backed_up>
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/datatypes/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def extra_files_path(self):


class HasFileName(Protocol):
def get_file_name(self) -> str:
def get_file_name(self, sync_cache=True) -> str:
...


Expand Down
5 changes: 4 additions & 1 deletion lib/galaxy/job_execution/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ def compute_outputs(self) -> None:
da_false_path = dataset_path_rewriter.rewrite_dataset_path(da.dataset, "output")
mutable = da.dataset.dataset.external_filename is None
dataset_path = DatasetPath(
da.dataset.dataset.id, da.dataset.get_file_name(), false_path=da_false_path, mutable=mutable
da.dataset.dataset.id,
da.dataset.get_file_name(sync_cache=False),
false_path=da_false_path,
mutable=mutable,
)
job_outputs.append(JobOutput(da.name, da.dataset, dataset_path))

Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/managers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def serialize_file_name(self, item, key, user=None, **context):
# expensive: allow config option due to cost of operation
if is_admin or self.app.config.expose_dataset_path:
if not dataset.purged:
return dataset.get_file_name()
return dataset.get_file_name(sync_cache=False)
self.skip()

def serialize_extra_files_path(self, item, key, user=None, **context):
Expand Down
16 changes: 8 additions & 8 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2516,8 +2516,8 @@ def __init__(self, dataset=None):
self.dataset = dataset
self.metadata = dict()

def get_file_name(self):
return self.dataset.get_file_name()
def get_file_name(self, sync_cache=True):
return self.dataset.get_file_name(sync_cache)

def __eq__(self, other):
return isinstance(other, FakeDatasetAssociation) and self.dataset == other.dataset
Expand Down Expand Up @@ -3926,14 +3926,14 @@ def ensure_shareable(self):
if not self.shareable:
raise Exception(CANNOT_SHARE_PRIVATE_DATASET_MESSAGE)

def get_file_name(self):
def get_file_name(self, sync_cache=True):
if self.purged:
log.warning(f"Attempt to get file name of purged dataset {self.id}")
return ""
if not self.external_filename:
object_store = self._assert_object_store_set()
if object_store.exists(self):
file_name = object_store.get_filename(self)
file_name = object_store.get_filename(self, sync_cache=sync_cache)
else:
file_name = ""
if not file_name and self.state not in (self.states.NEW, self.states.QUEUED):
Expand Down Expand Up @@ -4387,10 +4387,10 @@ def set_dataset_state(self, state):

state = property(get_dataset_state, set_dataset_state)

def get_file_name(self) -> str:
def get_file_name(self, sync_cache=True) -> str:
if self.dataset.purged:
return ""
return self.dataset.get_file_name()
return self.dataset.get_file_name(sync_cache=sync_cache)

def set_file_name(self, filename: str):
return self.dataset.set_file_name(filename)
Expand Down Expand Up @@ -9168,7 +9168,7 @@ def update_from_file(self, file_name):
alt_name=os.path.basename(self.get_file_name()),
)

def get_file_name(self):
def get_file_name(self, sync_cache=True):
# Ensure the directory structure and the metadata file object exist
try:
da = self.history_dataset or self.library_dataset
Expand All @@ -9183,7 +9183,7 @@ def get_file_name(self):
if not object_store.exists(self, extra_dir="_metadata_files", extra_dir_at_root=True, alt_name=alt_name):
object_store.create(self, extra_dir="_metadata_files", extra_dir_at_root=True, alt_name=alt_name)
path = object_store.get_filename(
self, extra_dir="_metadata_files", extra_dir_at_root=True, alt_name=alt_name
self, extra_dir="_metadata_files", extra_dir_at_root=True, alt_name=alt_name, sync_cache=sync_cache
)
return path
except AttributeError:
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/model/none_like.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ def __getattr__(self, name):
def missing_meta(self):
return False

def get_file_name(self):
def get_file_name(self, sync_cache=True):
return "None"
11 changes: 8 additions & 3 deletions lib/galaxy/objectstore/azure_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(self, config, config_dict):

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
self.cache_updated_data = cache_dict.get("cache_updated_data", True)

self._initialize()

Expand Down Expand Up @@ -136,6 +137,7 @@ def to_dict(self):
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
}
)
Expand Down Expand Up @@ -501,12 +503,15 @@ def _get_filename(self, obj, **kwargs):
base_dir = kwargs.get("base_dir", None)
dir_only = kwargs.get("dir_only", False)
obj_dir = kwargs.get("obj_dir", False)
sync_cache = kwargs.get("sync_cache", True)

# for JOB_WORK directory
if base_dir and dir_only and obj_dir:
return os.path.abspath(rel_path)

cache_path = self._get_cache_path(rel_path)
if not sync_cache:
return cache_path
# S3 does not recognize directories as files so cannot check if those exist.
# So, if checking dir only, ensure given dir exists in cache and return
# the expected cache path.
Expand All @@ -515,8 +520,8 @@ def _get_filename(self, obj, **kwargs):
# if not os.path.exists(cache_path):
# os.makedirs(cache_path)
# return cache_path
# Check if the file exists in the cache first
if self._in_cache(rel_path):
# Check if the file exists in the cache first, always pull if file size in cache is zero
if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0):
return cache_path
# Check if the file exists in persistent storage and, if it does, pull it into cache
elif self._exists(obj, **kwargs):
Expand All @@ -542,7 +547,7 @@ def _update_from_file(self, obj, file_name=None, create=False, **kwargs):
# Copy into cache
cache_file = self._get_cache_path(rel_path)
try:
if source_file != cache_file:
if source_file != cache_file and self.cache_updated_data:
# FIXME? Should this be a `move`?
shutil.copy2(source_file, cache_file)
self._fix_permissions(cache_file)
Expand Down
7 changes: 6 additions & 1 deletion lib/galaxy/objectstore/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

from typing_extensions import NamedTuple

from galaxy.util import nice_size
from galaxy.util import (
nice_size,
string_as_bool,
)
from galaxy.util.sleeper import Sleeper

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,11 +127,13 @@ def parse_caching_config_dict_from_xml(config_xml):
cache_size = float(c_xml.get("size", -1))
staging_path = c_xml.get("path", None)
monitor = c_xml.get("monitor", "auto")
cache_updated_data = string_as_bool(c_xml.get("cache_updated_data", "True"))

cache_dict = {
"size": cache_size,
"path": staging_path,
"monitor": monitor,
"cache_updated_data": cache_updated_data,
}
else:
cache_dict = {}
Expand Down
11 changes: 8 additions & 3 deletions lib/galaxy/objectstore/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _config_to_dict(self):
"cache": {
"size": self.cache_size,
"path": self.staging_path,
"cache_updated_data": self.cache_updated_data,
},
}

Expand Down Expand Up @@ -103,6 +104,7 @@ def __init__(self, config, config_dict):

self.cache_size = cache_dict.get("size") or self.config.object_store_cache_size
self.staging_path = cache_dict.get("path") or self.config.object_store_cache_path
self.cache_updated_data = cache_dict.get("cache_updated_data", True)

self._initialize()

Expand Down Expand Up @@ -621,12 +623,15 @@ def _get_filename(self, obj, **kwargs):
dir_only = kwargs.get("dir_only", False)
obj_dir = kwargs.get("obj_dir", False)
rel_path = self._construct_path(obj, **kwargs)
sync_cache = kwargs.get("sync_cache", True)

# for JOB_WORK directory
if base_dir and dir_only and obj_dir:
return os.path.abspath(rel_path)

cache_path = self._get_cache_path(rel_path)
if not sync_cache:
return cache_path
# S3 does not recognize directories as files so cannot check if those exist.
# So, if checking dir only, ensure given dir exists in cache and return
# the expected cache path.
Expand All @@ -635,8 +640,8 @@ def _get_filename(self, obj, **kwargs):
# if not os.path.exists(cache_path):
# os.makedirs(cache_path)
# return cache_path
# Check if the file exists in the cache first
if self._in_cache(rel_path):
# Check if the file exists in the cache first, always pull if file size in cache is zero
if self._in_cache(rel_path) and (dir_only or os.path.getsize(self._get_cache_path(rel_path)) > 0):
return cache_path
# Check if the file exists in persistent storage and, if it does, pull it into cache
elif self._exists(obj, **kwargs):
Expand All @@ -663,7 +668,7 @@ def _update_from_file(self, obj, file_name=None, create=False, **kwargs):
# Copy into cache
cache_file = self._get_cache_path(rel_path)
try:
if source_file != cache_file:
if source_file != cache_file and self.cache_updated_data:
# FIXME? Should this be a `move`?
shutil.copy2(source_file, cache_file)
self._fix_permissions(cache_file)
Expand Down
Loading
Loading