diff --git a/docs/daos_tf_docs.md b/docs/daos_tf_docs.md index c57632146..0fe232a4e 100644 --- a/docs/daos_tf_docs.md +++ b/docs/daos_tf_docs.md @@ -1,6 +1,6 @@ # DAOS-TensorFlow IO GUIDE -## Table Of Content +## Table Of Contents - [Features](#features) - [Prerequisites](#prerequisites) @@ -15,7 +15,7 @@ ## Prerequisites -* A valid DAOS installation, currently based on [version v1.3.106](https://github.com/daos-stack/daos/releases/tag/v1.3.106-tb) +* A valid DAOS installation, currently based on [version v2.0.2](https://github.com/daos-stack/daos/releases/tag/v2.0.2) * An installation guide and steps can be accessed from [here](https://docs.daos.io/admin/installation/) ## Environment Setup diff --git a/docs/tutorials/daos.ipynb b/docs/tutorials/daos.ipynb index e7e4e4bba..44854c243 100644 --- a/docs/tutorials/daos.ipynb +++ b/docs/tutorials/daos.ipynb @@ -71,8 +71,8 @@ "\n", "The pool and container id or label are part of the filename uri:\n", "```\n", - "dfs:////\n", - "dfs:///cont-label/\n", + "daos:////\n", + "daos:///cont-label/\n", "```" ] }, @@ -230,7 +230,7 @@ }, "outputs": [], "source": [ - "dfs_url = \"dfs://TEST_POOL/TEST_CONT/\" # This the path you'll be using to load and access the dataset\n", + "dfs_url = \"daos://TEST_POOL/TEST_CONT/\" # This the path you'll be using to load and access the dataset\n", "pwd = !pwd\n", "posix_url = pwd[0] + \"/tests/test_dfs/\"" ] diff --git a/tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc b/tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc index 1760f941f..d3bbb1de5 100644 --- a/tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc +++ b/tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc @@ -12,7 +12,7 @@ namespace dfs { // ---------------------------------------------------------------------------- namespace tf_random_access_file { typedef struct DFSRandomAccessFile { - std::string dfs_path; + dfs_path_t dpath; DFS* daos; dfs_t* daos_fs; dfs_obj_t* daos_file; @@ -21,16 +21,15 @@ typedef struct DFSRandomAccessFile { bool caching; size_t buff_size; size_t num_of_buffers; - DFSRandomAccessFile(std::string aDfs_path, DFS* daos, dfs_t* file_system, - dfs_obj_t* obj, daos_handle_t eq_handle) - : dfs_path(std::move(aDfs_path)), daos(daos) { - daos_fs = file_system; + DFSRandomAccessFile(dfs_path_t* path, dfs_obj_t* obj, daos_handle_t eq_handle) + : dpath(*path) { + daos = dpath.getDAOS(); + daos_fs = dpath.getFsys(); daos_file = obj; - if (DFS::size_map.count(aDfs_path) == 0) { + + if (dpath.getCachedSize(file_size) != 0) { daos->libdfs->dfs_get_size(daos_fs, obj, &file_size); - DFS::size_map[aDfs_path] = file_size; - } else { - file_size = DFS::size_map[aDfs_path]; + dpath.setCachedSize(file_size); } if (char* env_caching = std::getenv("TF_IO_DAOS_CACHING")) { caching = atoi(env_caching) > 0; @@ -151,19 +150,20 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, // ---------------------------------------------------------------------------- namespace tf_writable_file { typedef struct DFSWritableFile { - std::string dfs_path; + dfs_path_t dpath; DFS* daos; dfs_t* daos_fs; dfs_obj_t* daos_file; daos_size_t file_size; bool size_known; - DFSWritableFile(std::string dfs_path, DFS* daos, dfs_t* file_system, - dfs_obj_t* obj) - : dfs_path(std::move(dfs_path)), daos(daos) { - daos_fs = file_system; + DFSWritableFile(dfs_path_t* path, dfs_obj_t* obj) : dpath(*path) { + daos = dpath.getDAOS(); + daos_fs = dpath.getFsys(); daos_file = obj; size_known = false; + daos_size_t dummy; // initialize file_size + get_file_size(dummy); } int get_file_size(daos_size_t& size) { @@ -172,6 +172,7 @@ typedef struct DFSWritableFile { if (rc != 0) { return rc; } + dpath.setCachedSize(file_size); size_known = true; } size = file_size; @@ -179,11 +180,15 @@ typedef struct DFSWritableFile { } void set_file_size(daos_size_t size) { + dpath.setCachedSize(size); file_size = size; size_known = true; } - void unset_file_size(void) { size_known = false; } + void unset_file_size(void) { + dpath.clearCachedSize(); + size_known = false; + } } DFSWritableFile; void Cleanup(TF_WritableFile* file) { @@ -262,8 +267,21 @@ uint64_t Length(const TF_ReadOnlyMemoryRegion* region) { return 0; } // ---------------------------------------------------------------------------- namespace tf_dfs_filesystem { +void atexit_handler(void); // forward declaration + +static TF_Filesystem* dfs_filesystem; + void Init(TF_Filesystem* filesystem, TF_Status* status) { filesystem->plugin_filesystem = new (std::nothrow) DFS(status); + + // tensorflow never calls Cleanup(), see + // https://github.com/tensorflow/tensorflow/issues/27535 + // The workaround is to implement its code via atexit() which in turn + // requires that a static pointer to the plugin be kept for use at exit time. + if (TF_GetCode(status) == TF_OK) { + dfs_filesystem = filesystem; + std::atexit(atexit_handler); + } } void Cleanup(TF_Filesystem* filesystem) { @@ -271,146 +289,165 @@ void Cleanup(TF_Filesystem* filesystem) { delete daos; } +void atexit_handler(void) { + // delete dfs_filesystem; + Cleanup(dfs_filesystem); +} + void NewFile(const TF_Filesystem* filesystem, const char* path, File_Mode mode, - int flags, dfs_obj_t** obj, TF_Status* status) { + int flags, dfs_path_t& dpath, dfs_obj_t** obj, TF_Status* status) { int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, file_path; - rc = daos->Setup(path, pool, cont, file_path, status); - + rc = daos->Setup(daos, path, dpath, status); if (rc) return; - daos->dfsNewFile(file_path, mode, flags, obj, status); + + daos->dfsNewFile(&dpath, mode, flags, obj, status); } void NewWritableFile(const TF_Filesystem* filesystem, const char* path, TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); + + dfs_path_t dpath; dfs_obj_t* obj = NULL; - NewFile(filesystem, path, WRITE, S_IWUSR | S_IFREG, &obj, status); + NewFile(filesystem, path, WRITE, S_IRUSR | S_IWUSR | S_IFREG, dpath, &obj, + status); if (TF_GetCode(status) != TF_OK) return; - auto daos = static_cast(filesystem->plugin_filesystem); - file->plugin_file = - new tf_writable_file::DFSWritableFile(path, daos, daos->daos_fs, obj); - TF_SetStatus(status, TF_OK, ""); + file->plugin_file = new tf_writable_file::DFSWritableFile(&dpath, obj); } void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path, TF_RandomAccessFile* file, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); + auto daos = static_cast(filesystem->plugin_filesystem); + + dfs_path_t dpath; dfs_obj_t* obj = NULL; - NewFile(filesystem, path, READ, S_IRUSR | S_IFREG, &obj, status); + NewFile(filesystem, path, READ, S_IRUSR | S_IFREG, dpath, &obj, status); if (TF_GetCode(status) != TF_OK) return; - auto daos = static_cast(filesystem->plugin_filesystem); auto random_access_file = new tf_random_access_file::DFSRandomAccessFile( - path, daos, daos->daos_fs, obj, daos->mEventQueueHandle); + &dpath, obj, daos->mEventQueueHandle); if (random_access_file->caching) { size_t async_offset = 0; for (size_t i = 0; i < random_access_file->num_of_buffers; i++) { if (async_offset > random_access_file->file_size) break; random_access_file->buffers[i].ReadAsync( - daos->daos_fs, random_access_file->daos_file, async_offset, - random_access_file->file_size); + random_access_file->daos_fs, random_access_file->daos_file, + async_offset, random_access_file->file_size); async_offset += random_access_file->buff_size; } } file->plugin_file = random_access_file; - TF_SetStatus(status, TF_OK, ""); } void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); + + dfs_path_t dpath; dfs_obj_t* obj = NULL; - NewFile(filesystem, path, APPEND, S_IWUSR | S_IFREG, &obj, status); + NewFile(filesystem, path, APPEND, S_IRUSR | S_IWUSR | S_IFREG, dpath, &obj, + status); if (TF_GetCode(status) != TF_OK) return; - auto daos = static_cast(filesystem->plugin_filesystem); - file->plugin_file = - new tf_writable_file::DFSWritableFile(path, daos, daos->daos_fs, obj); - TF_SetStatus(status, TF_OK, ""); + file->plugin_file = new tf_writable_file::DFSWritableFile(&dpath, obj); } -void PathExists(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +static void PathExists(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, file; - rc = daos->Setup(path, pool, cont, file, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return; + dfs_obj_t* obj; + rc = daos->dfsLookUp(&dpath, &obj, status); + if (rc) return; - rc = daos->dfsPathExists(file, &obj); + rc = daos->libdfs->dfs_release(obj); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); - } else { - TF_SetStatus(status, TF_OK, ""); + TF_SetStatus(status, TF_INTERNAL, ""); } - - daos->libdfs->dfs_release(obj); } -void CreateDir(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +static void CreateDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, dir_path; - rc = daos->Setup(path, pool, cont, dir_path, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return; - daos->dfsCreateDir(dir_path, status); + daos->dfsCreateDir(&dpath, status); } static void RecursivelyCreateDir(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; - std::string pool, cont, dir_path; auto daos = static_cast(filesystem->plugin_filesystem); - rc = daos->Setup(path, pool, cont, dir_path, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return; size_t next_dir = 0; std::string dir_string; - std::string path_string(dir_path); + std::string path_string = dpath.getRelPath(); do { next_dir = path_string.find("/", next_dir); - dir_string = path_string.substr(0, next_dir); + if (next_dir == 0) { + dpath.setRelPath("/"); + } else { + dpath.setRelPath(path_string.substr(0, next_dir)); + } if (next_dir != std::string::npos) next_dir++; - daos->dfsCreateDir(dir_string, status); + TF_SetStatus(status, TF_OK, ""); + daos->dfsCreateDir(&dpath, status); if ((TF_GetCode(status) != TF_OK) && - (TF_GetCode(status) != TF_ALREADY_EXISTS)) + (TF_GetCode(status) != TF_ALREADY_EXISTS)) { return; - TF_SetStatus(status, TF_OK, ""); - + } } while (next_dir != std::string::npos); + + if (TF_GetCode(status) == TF_ALREADY_EXISTS) { + TF_SetStatus(status, TF_OK, ""); // per modular_filesystem_test suite + } } void DeleteFileSystemEntry(const TF_Filesystem* filesystem, const char* path, bool recursive, bool is_dir, TF_Status* status) { int rc; - std::string pool, cont, dir_path; auto daos = static_cast(filesystem->plugin_filesystem); - rc = daos->Setup(path, pool, cont, dir_path, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) { - TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API"); return; } - daos->dfsDeleteObject(dir_path, is_dir, recursive, status); + daos->dfsDeleteObject(&dpath, is_dir, recursive, status); } -void DeleteSingleDir(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +static void DeleteSingleDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); bool recursive = false; bool is_dir = true; DeleteFileSystemEntry(filesystem, path, recursive, is_dir, status); } -void RecursivelyDeleteDir(const TF_Filesystem* filesystem, const char* path, - uint64_t* undeleted_files, uint64_t* undeleted_dirs, - TF_Status* status) { +static void RecursivelyDeleteDir(const TF_Filesystem* filesystem, + const char* path, uint64_t* undeleted_files, + uint64_t* undeleted_dirs, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); bool recursive = true; bool is_dir = true; DeleteFileSystemEntry(filesystem, path, recursive, is_dir, status); @@ -424,243 +461,249 @@ void RecursivelyDeleteDir(const TF_Filesystem* filesystem, const char* path, } } -bool IsDir(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +// Note: the signature for is_directory() has a bool for the return value, but +// tensorflow does not use this, instead it interprets the status field to get +// the result. A value of TF_OK indicates that the object is a directory, and +// a value of TF_FAILED_PRECONDITION indicates that the object is a file. All +// other status values throw an exception. + +static bool IsDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; - bool is_dir = false; - std::string pool, cont, file; auto daos = static_cast(filesystem->plugin_filesystem); - rc = daos->Setup(path, pool, cont, file, status); - if (rc) return is_dir; - - if (daos->isRoot(file)) { - is_dir = true; - return is_dir; - } + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); + if (rc) return false; dfs_obj_t* obj; - rc = daos->dfsPathExists(file, &obj, true); + rc = daos->dfsLookUp(&dpath, &obj, status); + if (rc) return false; + + bool is_dir = daos->dfsIsDirectory(obj); + + rc = daos->libdfs->dfs_release(obj); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); - } else { - is_dir = S_ISDIR(obj->mode); + TF_SetStatus(status, TF_INTERNAL, ""); + return false; } - if (is_dir) { - TF_SetStatus(status, TF_OK, ""); - } else { + if (!is_dir) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + return false; } - - return is_dir; + return true; } -int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +static int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, file; - rc = daos->Setup(path, pool, cont, file, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return -1; - if (DFS::size_map.count(path) != 0) { - return DFS::size_map[path]; + + daos_size_t size; + if (dpath.getCachedSize(size) == 0) { + return size; } dfs_obj_t* obj; - rc = daos->dfsPathExists(file, &obj, false); + rc = daos->dfsLookUp(&dpath, &obj, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); return -1; - } else { - if (S_ISDIR(obj->mode)) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); - return -1; - } - TF_SetStatus(status, TF_OK, ""); - daos_size_t size; - daos->libdfs->dfs_get_size(daos->daos_fs, obj, &size); - DFS::size_map[path] = size; + } + if (daos->dfsIsDirectory(obj)) { daos->libdfs->dfs_release(obj); - return size; + TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + return -1; } + + daos->libdfs->dfs_get_size(dpath.getFsys(), obj, &size); + dpath.setCachedSize(size); + + rc = daos->libdfs->dfs_release(obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, ""); + return -1; + } + return size; } -void DeleteFile(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { +static void DeleteFile(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); bool recursive = false; bool is_dir = false; DeleteFileSystemEntry(filesystem, path, recursive, is_dir, status); } -void RenameFile(const TF_Filesystem* filesystem, const char* src, - const char* dst, TF_Status* status) { +static void RenameFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - int allow_cont_creation = 1; - std::string pool_src, cont_src, file_src; - rc = daos->ParseDFSPath(src, pool_src, cont_src, file_src); - if (rc) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + dfs_path_t src_dpath; + rc = daos->Setup(daos, src, src_dpath, status); + if (rc) return; + + dfs_path_t dst_dpath; + rc = daos->Setup(daos, dst, dst_dpath, status); + if (rc) return; + + if (src_dpath.getFsys() != dst_dpath.getFsys()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, "Non-Matching Pool/Container"); return; } - std::string pool_dst, cont_dst, file_dst; - rc = daos->ParseDFSPath(dst, pool_dst, cont_dst, file_dst); + // Source object must exist + dfs_obj_t* temp_obj; + rc = daos->dfsLookUp(&src_dpath, &temp_obj, status); if (rc) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); return; } - if (pool_src != pool_dst || cont_src != cont_dst) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, "Non-Matching Pool/Container"); - return; - } + // Source object cannot be a directory + bool is_dir = daos->dfsIsDirectory(temp_obj); + daos_size_t src_size; - daos->Connect(pool_src, cont_src, allow_cont_creation, status); - if (TF_GetCode(status) != TF_OK) { - return; + if (is_dir) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + } else { + if (src_dpath.getCachedSize(src_size) != 0) { + daos->libdfs->dfs_get_size(src_dpath.getFsys(), temp_obj, &src_size); + } } - rc = daos->Mount(); - if (rc != 0) { - TF_SetStatus(status, TF_INTERNAL, "Error Mounting DFS"); + rc = daos->libdfs->dfs_release(temp_obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, ""); + } + if (TF_GetCode(status) != TF_OK) { return; } - file_src = "/" + file_src; - file_dst = "/" + file_dst; - - dfs_obj_t* temp_obj; - rc = daos->dfsPathExists(file_src, &temp_obj, false); + // Destination object may or may not exist, but must not be a directory. + rc = daos->dfsLookUp(&dst_dpath, &temp_obj, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); - return; + if (TF_GetCode(status) != TF_NOT_FOUND) { + return; + } + TF_SetStatus(status, TF_OK, ""); } else { - if (S_ISDIR(temp_obj->mode)) { + bool is_dir = daos->dfsIsDirectory(temp_obj); + rc = daos->libdfs->dfs_release(temp_obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, ""); + return; + } + if (is_dir) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); return; } } - daos->libdfs->dfs_release(temp_obj); - - rc = daos->dfsPathExists(file_dst, &temp_obj, false); - if (!rc && S_ISDIR(temp_obj->mode)) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); - return; - } - - daos->libdfs->dfs_release(temp_obj); + // Open the parent objects. Note that these are cached directory entries, + // not to be closed by this function. dfs_obj_t* parent_src = NULL; - size_t src_start = file_src.rfind("/") + 1; - std::string src_name = file_src.substr(src_start); - rc = daos->dfsFindParent(file_src, &parent_src); + rc = daos->dfsFindParent(&src_dpath, &parent_src, status); if (rc) { TF_SetStatus(status, TF_NOT_FOUND, ""); return; } dfs_obj_t* parent_dst = NULL; - size_t dst_start = file_dst.rfind("/") + 1; - std::string dst_name = file_dst.substr(dst_start); - rc = daos->dfsFindParent(file_dst, &parent_dst); + rc = daos->dfsFindParent(&dst_dpath, &parent_dst, status); if (rc) { TF_SetStatus(status, TF_NOT_FOUND, ""); return; } - if (!S_ISDIR(parent_dst->mode)) { + if (!daos->dfsIsDirectory(parent_dst)) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); return; } - char* name = (char*)malloc(src_name.size()); - strcpy(name, src_name.c_str()); - char* new_name = (char*)malloc(dst_name.size()); - strcpy(new_name, dst_name.c_str()); + std::string src_name = src_dpath.getBaseName(); + std::string dst_name = dst_dpath.getBaseName(); - rc = daos->libdfs->dfs_move(daos->daos_fs, parent_src, name, parent_dst, - new_name, NULL); - free(name); - free(new_name); + rc = daos->libdfs->dfs_move(src_dpath.getFsys(), parent_src, src_name.c_str(), + parent_dst, dst_name.c_str(), NULL); if (rc) { TF_SetStatus(status, TF_INTERNAL, ""); return; } - TF_SetStatus(status, TF_OK, ""); + dst_dpath.setCachedSize(src_size); + src_dpath.clearCachedSize(); } -void Stat(const TF_Filesystem* filesystem, const char* path, - TF_FileStatistics* stats, TF_Status* status) { +static void Stat(const TF_Filesystem* filesystem, const char* path, + TF_FileStatistics* stats, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, dir_path; - rc = daos->Setup(path, pool, cont, dir_path, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return; dfs_obj_t* obj; + rc = daos->dfsLookUp(&dpath, &obj, status); + if (rc) return; - rc = daos->dfsPathExists(dir_path, &obj); + struct stat stbuf; + rc = daos->libdfs->dfs_ostat(dpath.getFsys(), obj, &stbuf); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); + daos->libdfs->dfs_release(obj); + TF_SetStatus(status, TF_INTERNAL, ""); return; } - if (S_ISDIR(obj->mode)) { + stats->length = stbuf.st_size; + stats->mtime_nsec = static_cast(stbuf.st_mtime) * 1e9; + if (daos->dfsIsDirectory(obj)) { stats->is_directory = true; - stats->length = 0; } else { stats->is_directory = false; - daos_size_t size; - if (DFS::size_map.count(path) == 0) { - daos->libdfs->dfs_get_size(daos->daos_fs, obj, &size); - DFS::size_map[path] = size; - } else { - size = DFS::size_map[path]; - } - - stats->length = size; } - struct stat stbuf; - - daos->libdfs->dfs_ostat(daos->daos_fs, obj, &stbuf); - - stats->mtime_nsec = static_cast(stbuf.st_mtime) * 1e9; - - daos->libdfs->dfs_release(obj); - TF_SetStatus(status, TF_OK, ""); + rc = daos->libdfs->dfs_release(obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, ""); + } } -int GetChildren(const TF_Filesystem* filesystem, const char* path, - char*** entries, TF_Status* status) { +static int GetChildren(const TF_Filesystem* filesystem, const char* path, + char*** entries, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); int rc; auto daos = static_cast(filesystem->plugin_filesystem); - std::string pool, cont, dir_path; - rc = daos->Setup(path, pool, cont, dir_path, status); + dfs_path_t dpath; + rc = daos->Setup(daos, path, dpath, status); if (rc) return -1; dfs_obj_t* obj; - rc = daos->dfsPathExists(dir_path, &obj, true); + rc = daos->dfsLookUp(&dpath, &obj, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); return -1; } - if (!S_ISDIR(obj->mode)) { + if (!daos->dfsIsDirectory(obj)) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + daos->libdfs->dfs_release(obj); return -1; } std::vector children; - rc = daos->dfsReadDir(obj, children); + rc = daos->dfsReadDir(dpath.getFsys(), obj, children); + daos->libdfs->dfs_release(obj); if (rc) { TF_SetStatus(status, TF_INTERNAL, ""); return -1; @@ -670,19 +713,20 @@ int GetChildren(const TF_Filesystem* filesystem, const char* path, CopyEntries(entries, children); - TF_SetStatus(status, TF_OK, ""); return nr; } static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { + // Note: this function should be doing the equivalent of the + // lexically_normalize() function available in newer compilers. return strdup(uri); } -void FlushCaches(const TF_Filesystem* filesystem) { +static void FlushCaches(const TF_Filesystem* filesystem) { auto daos = static_cast(filesystem->plugin_filesystem); - daos->ClearConnections(); - daos->path_map.clear(); + daos->clearAllDirCaches(); + daos->clearAllSizeCaches(); } } // namespace tf_dfs_filesystem diff --git a/tensorflow_io/core/filesystems/dfs/dfs_utils.cc b/tensorflow_io/core/filesystems/dfs/dfs_utils.cc index ae6276c98..568fbdcd6 100644 --- a/tensorflow_io/core/filesystems/dfs/dfs_utils.cc +++ b/tensorflow_io/core/filesystems/dfs/dfs_utils.cc @@ -1,8 +1,6 @@ #include "tensorflow_io/core/filesystems/dfs/dfs_utils.h" -std::unordered_map DFS::size_map; - #include #include @@ -86,10 +84,6 @@ bool Match(const std::string& filename, const std::string& pattern) { } DFS::DFS(TF_Status* status) { - daos_fs = (dfs_t*)malloc(sizeof(dfs_t)); - daos_fs->mounted = false; - - path_map = {}; TF_SetStatus(status, TF_OK, ""); // Try to load the necessary daos libraries. @@ -114,16 +108,12 @@ DFS::DFS(TF_Status* status) { } DFS::~DFS() { - free(daos_fs); - int rc; - for (auto& kv : path_map) { - auto* obj = kv.second; - libdfs->dfs_release(obj); - } + clearAllDirCaches(); + clearAllSizeCaches(); + rc = libdfs->daos_eq_destroy(mEventQueueHandle, 0); assert(rc == 0); - Unmount(); ClearConnections(); libdfs->daos_fini(); @@ -132,68 +122,44 @@ DFS::~DFS() { int DFS::ParseDFSPath(const std::string& path, std::string& pool_string, std::string& cont_string, std::string& filename) { - size_t pool_start = path.find("://") + 3; - struct duns_attr_t* attr = - (struct duns_attr_t*)malloc(sizeof(struct duns_attr_t)); - attr->da_rel_path = NULL; - attr->da_flags = 1; - attr->da_no_prefix = true; - std::string direct_path = "/" + path.substr(pool_start); - bool has_file_name = true; - size_t cont_start = path.find("/", pool_start) + 1; - size_t file_start = path.find("/", cont_start) + 1; - if (file_start == 0) { - has_file_name = false; - } - std::string pool = path.substr(pool_start, cont_start - pool_start - 1); - std::string cont = path.substr(cont_start, file_start - cont_start - 1); - - if (pools.find(pool) != pools.end()) { - pool_string = pool; - auto* poolInfo = pools[pool]; - if (poolInfo->containers->find(cont) != poolInfo->containers->end()) { - cont_string = cont; - if (has_file_name) - filename = path.substr(file_start); - else - filename = "/"; - return 0; - } - } - int rc = libdfs->duns_resolve_path(direct_path.c_str(), attr); - if (rc == 2) { - attr->da_rel_path = NULL; - attr->da_flags = 0; - attr->da_no_prefix = false; - direct_path = "daos://" + path.substr(pool_start); - rc = libdfs->duns_resolve_path(direct_path.c_str(), attr); - if (rc) return rc; + struct duns_attr_t attr = {0}; + attr.da_flags = DUNS_NO_CHECK_PATH; + + int rc = libdfs->duns_resolve_path(path.c_str(), &attr); + if (rc == 0) { + pool_string = attr.da_pool; + cont_string = attr.da_cont; + filename = attr.da_rel_path == NULL ? "/" : attr.da_rel_path; + if (filename.back() == '/' && filename.size() > 1) filename.pop_back(); + libdfs->duns_destroy_attr(&attr); } - pool_string = attr->da_pool; - cont_string = attr->da_cont; - filename = attr->da_rel_path == NULL ? "" : attr->da_rel_path; - libdfs->duns_destroy_attr(attr); - return 0; + return rc; } -int DFS::Setup(const std::string& path, std::string& pool_string, - std::string& cont_string, std::string& file_path, +int DFS::Setup(DFS* daos, const std::string path, dfs_path_t& dpath, TF_Status* status) { int allow_cont_creation = 1; int rc; - rc = ParseDFSPath(path, pool_string, cont_string, file_path); + + std::string pool, cont, rel_path; + rc = ParseDFSPath(path, pool, cont, rel_path); if (rc) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + TF_SetStatus(status, TF_INVALID_ARGUMENT, ""); return rc; } - Connect(pool_string, cont_string, allow_cont_creation, status); + Connect(daos, pool, cont, allow_cont_creation, status); if (TF_GetCode(status) != TF_OK) { return -1; } - return Mount(); + + pool_info_t* po_inf = pools[pool]; + cont_info_t* cont_info = po_inf->containers[cont]; + dfs_path_t res(cont_info, rel_path); + dpath = res; + return 0; } -void DFS::Connect(std::string& pool_string, std::string& cont_string, +void DFS::Connect(DFS* daos, std::string& pool_string, std::string& cont_string, int allow_cont_creation, TF_Status* status) { int rc; @@ -203,63 +169,21 @@ void DFS::Connect(std::string& pool_string, std::string& cont_string, return; } - rc = ConnectContainer(cont_string, allow_cont_creation, status); + rc = ConnectContainer(daos, pool_string, cont_string, allow_cont_creation, + status); if (rc) { TF_SetStatus(status, TF_INTERNAL, "Error Connecting to Container"); return; } - connected = true; - - TF_SetStatus(status, TF_OK, ""); -} - -void DFS::Disconnect(TF_Status* status) { - int rc; - rc = DisconnectContainer(pool.first, container.first); - if (rc) { - TF_SetStatus(status, TF_INTERNAL, "Error Disconnecting from Container"); - return; - } - - rc = DisconnectPool(pool.first); - if (rc) { - TF_SetStatus(status, TF_INTERNAL, "Error Disconnecting from Pool"); - return; - } - - connected = false; - TF_SetStatus(status, TF_OK, ""); } -int DFS::Mount() { - int rc = 0; - if (daos_fs->mounted) { - if (daos_fs->poh.cookie == pool.second.cookie && - daos_fs->coh.cookie == container.second.cookie) { - return rc; - } - rc = Unmount(); - if (rc) return rc; - } - return libdfs->dfs_mount(pool.second, container.second, O_RDWR, &daos_fs); -} - -int DFS::Unmount() { - int rc; - if (!daos_fs->mounted) return 0; - rc = libdfs->dfs_umount(daos_fs); - daos_fs = (dfs_t*)malloc(sizeof(dfs_t)); - daos_fs->mounted = false; - return rc; -} - -int DFS::Query() { +int DFS::Query(id_handle_t pool, id_handle_t container, dfs_t* daos_fs) { int rc; daos_pool_info_t pool_info; daos_cont_info_t cont_info; - if (connected) { + if (daos_fs) { memset(&pool_info, 'D', sizeof(daos_pool_info_t)); pool_info.pi_bits = DPI_ALL; rc = libdfs->daos_pool_query(pool.second, NULL, &pool_info, NULL, NULL); @@ -294,77 +218,125 @@ int DFS::Query() { int DFS::ClearConnections() { int rc; - rc = Unmount(); - if (rc) return rc; - for (auto pool_it = pools.cbegin(); pool_it != pools.cend();) { - for (auto cont_it = (*(*pool_it).second->containers).cbegin(); - cont_it != (*(*pool_it).second->containers).cend();) { - rc = DisconnectContainer((*pool_it).first, (*cont_it++).first); - if (rc) return rc; + + for (;;) { + auto pool_it = pools.cbegin(); + if (pool_it == pools.cend()) { + break; } rc = DisconnectPool((*pool_it++).first); if (rc) return rc; } + return 0; +} - return rc; +void DFS::clearDirCache(dir_cache_t& dir_cache) { + for (auto kv = dir_cache.begin(); kv != dir_cache.end();) { + dfs_obj_t* dir = kv->second; + libdfs->dfs_release(dir); + kv = dir_cache.erase(kv); + } } -int DFS::dfsDeleteObject(std::string dir_path, bool is_dir, bool recursive, +void DFS::clearAllDirCaches(void) { + for (auto pool_it = pools.cbegin(); pool_it != pools.cend();) { + for (auto cont_it = ((*pool_it).second->containers).cbegin(); + cont_it != ((*pool_it).second->containers).cend();) { + cont_info_t* cont = (*cont_it).second; + clearDirCache(cont->dir_map); + cont_it++; + } + pool_it++; + } +} + +void DFS::clearSizeCache(size_cache_t& size_cache) { size_cache.clear(); } + +void DFS::clearAllSizeCaches(void) { + for (auto pool_it = pools.cbegin(); pool_it != pools.cend();) { + for (auto cont_it = ((*pool_it).second->containers).cbegin(); + cont_it != ((*pool_it).second->containers).cend();) { + cont_info_t* cont = (*cont_it).second; + clearSizeCache(cont->size_map); + cont_it++; + } + pool_it++; + } +} + +int DFS::dfsDeleteObject(dfs_path_t* dpath, bool is_dir, bool recursive, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); dfs_obj_t* temp_obj; - if (dir_path.back() == '/' && dir_path.size() > 1) dir_path.pop_back(); - if (dir_path.front() != '/') dir_path = "/" + dir_path; - int rc = dfsPathExists(dir_path, &temp_obj, is_dir); - if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); - return -1; - } - if (!is_dir && S_ISDIR(temp_obj->mode)) { - TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); - return -1; + + int rc = dfsLookUp(dpath, &temp_obj, status); + if (rc) return -1; + + if (dfsIsDirectory(temp_obj)) { + if (!is_dir) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, "Object is a directory"); + libdfs->dfs_release(temp_obj); + return -1; + } + } else { + if (is_dir && !recursive) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, "Object is not a directory"); + libdfs->dfs_release(temp_obj); + return -1; + } } - size_t dir_start = dir_path.rfind("/") + 1; - std::string dir = dir_path.substr(dir_start); dfs_obj_t* parent; - rc = dfsFindParent(dir_path, &parent); + rc = dfsFindParent(dpath, &parent, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); + libdfs->dfs_release(temp_obj); return -1; } - rc = libdfs->dfs_remove(daos_fs, parent, dir.c_str(), recursive, NULL); - if (recursive) { - path_map.clear(); - } else { - path_map.erase(dir_path); + rc = libdfs->dfs_remove(dpath->getFsys(), parent, + dpath->getBaseName().c_str(), recursive, NULL); + libdfs->dfs_release(temp_obj); + if (rc) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "Error Deleting Existing Object"); + return -1; } - if (rc) { - TF_SetStatus(status, TF_INTERNAL, "Error Deleting Existing Object"); + if (is_dir) { + if (recursive) { + dpath->clearFsysCachedDirs(); + dpath->clearFsysCachedSizes(); + } else { + dpath->clearCachedDir(); + } } else { - TF_SetStatus(status, TF_OK, ""); + dpath->clearCachedSize(); } - return rc; + TF_SetStatus(status, TF_OK, ""); + return 0; } -void DFS::dfsNewFile(std::string file_path, File_Mode file_mode, int flags, +void DFS::dfsNewFile(dfs_path_t* dpath, File_Mode file_mode, int flags, dfs_obj_t** obj, TF_Status* status) { int rc; dfs_obj_t* temp_obj; mode_t open_flags; - if (file_path.back() == '/' && file_path.size() > 1) file_path.pop_back(); - - rc = dfsPathExists(file_path, &temp_obj, false); - if (rc && file_mode == READ) { - TF_SetStatus(status, TF_NOT_FOUND, ""); - return; + rc = dfsLookUp(dpath, &temp_obj, status); + if (rc) { + if (TF_GetCode(status) != TF_NOT_FOUND) { + return; + } + if (file_mode == READ) { + return; + } + TF_SetStatus(status, TF_OK, ""); } - if (temp_obj != NULL && S_ISDIR(temp_obj->mode)) { + if (temp_obj != NULL && dfsIsDirectory(temp_obj)) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + libdfs->dfs_release(temp_obj); return; } @@ -374,134 +346,197 @@ void DFS::dfsNewFile(std::string file_path, File_Mode file_mode, int flags, } if (!rc && file_mode == WRITE) { - rc = dfsDeleteObject(file_path, false, false, status); - if (rc) return; + rc = dfsDeleteObject(dpath, false, false, status); + if (rc) { + libdfs->dfs_release(temp_obj); + return; + } } open_flags = GetFlags(file_mode); dfs_obj_t* parent; - rc = dfsFindParent(file_path, &parent); + mode_t parent_mode; + rc = dfsFindParent(dpath, &parent, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); + libdfs->dfs_release(temp_obj); return; } - if (parent != NULL && !S_ISDIR(parent->mode)) { + rc = libdfs->dfs_get_mode(parent, &parent_mode); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, "Cannot retrieve object mode"); + libdfs->dfs_release(temp_obj); + return; + } + if (parent != NULL && !S_ISDIR(parent_mode)) { TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + libdfs->dfs_release(temp_obj); return; } - size_t file_start = file_path.rfind("/") + 1; - std::string file_name = file_path.substr(file_start); - std::string parent_path = file_path.substr(0, file_start); - rc = libdfs->dfs_open(daos_fs, parent, file_name.c_str(), flags, open_flags, - 0, 0, NULL, obj); + std::string base_name = dpath->getBaseName(); + rc = libdfs->dfs_open(dpath->getFsys(), parent, base_name.c_str(), flags, + open_flags, 0, 0, NULL, obj); if (rc) { TF_SetStatus(status, TF_INTERNAL, "Error Creating Writable File"); + libdfs->dfs_release(temp_obj); return; } } -int DFS::dfsPathExists(std::string file, dfs_obj_t** obj, bool isDirectory) { - (*obj) = NULL; - int rc = 0; - if (isRoot(file)) { - return rc; - } - if (file.front() != '/') file = "/" + file; - rc = dfsLookUp(file, obj, isDirectory); - if (*(obj) == NULL) { - return ENOENT; - } - - return rc; -} +// Look up an object path and return its dfs_obj_t* in *obj. If this routine +// returns zero then *obj is guaranteed to contain a valid dfs_obj_t* which +// must be released by the caller. If non-zero, then *obj will be nullptr. If +// the object happens to be a directory, a dup of the dfs_obj_t* will also be +// separately cached in the filesystem's directory cache. -int DFS::dfsLookUp(std::string dir_path, dfs_obj_t** obj, bool isDirectory) { +int DFS::dfsLookUp(dfs_path_t* dpath, dfs_obj_t** obj, TF_Status* status) { *obj = NULL; - dfs_obj_t* parent = NULL; int rc; - if (dir_path.back() == '/' && dir_path.size() > 1) dir_path.pop_back(); - size_t file_start = dir_path.rfind("/") + 1; - std::string file_path = dir_path.substr(file_start); - std::string parent_path = - (file_start > 1) ? dir_path.substr(0, file_start - 1) : "/"; - if (path_map.find(dir_path) != path_map.end()) { - *obj = path_map[dir_path]; + + // Check if the object path is for a directory we have seen before. + + dfs_obj_t* _obj = dpath->getCachedDir(); + if (_obj) { + rc = libdfs->dfs_dup(dpath->getFsys(), _obj, O_RDWR, obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, "dfs_dup() of open directory failed"); + return -1; + } return 0; } - rc = dfsFindParent(dir_path, &parent); - if (rc) return rc; - if (parent != NULL && path_map.count(parent_path) == 0) { - dfs_obj_t* new_entry = new dfs_obj_t; - memcpy(new_entry, parent, sizeof(dfs_obj_t)); - path_map[parent_path] = new_entry; + + if (dpath->isRoot()) { + rc = libdfs->dfs_lookup(dpath->getFsys(), dpath->getRelPath().c_str(), + O_RDWR, &_obj, NULL, NULL); + } else { + dfs_obj_t* parent = NULL; + rc = dfsFindParent(dpath, &parent, status); + if (rc) return -1; + + dfs_t* fsys = dpath->getFsys(); + std::string basename = dpath->getBaseName(); + rc = libdfs->dfs_lookup_rel(fsys, parent, basename.c_str(), O_RDWR, &_obj, + NULL, NULL); + } + if (rc != 0) { + if (rc == ENOENT) { + TF_SetStatus(status, TF_NOT_FOUND, ""); + } else { + TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + } + return -1; } - if (file_path == "/") return rc; - mode_t open_mode = S_IRUSR | S_IFREG; - if (isDirectory) { - open_mode = S_IRUSR | S_IFDIR; + if (!dfsIsDirectory(_obj)) { + *obj = _obj; + return 0; } - rc = libdfs->dfs_open(daos_fs, parent, file_path.c_str(), open_mode, O_RDWR, - 0, 0, 0, obj); - if (*obj != NULL && path_map.count(dir_path) == 0 && isDirectory) { - dfs_obj_t* new_entry = new dfs_obj_t; - memcpy(new_entry, *obj, sizeof(dfs_obj_t)); - path_map[dir_path] = new_entry; + + // The object is a directory, so return the original dfs_obj_t* and store a + // dup of the original in the filesystem's directory cache. + + rc = libdfs->dfs_dup(dpath->getFsys(), _obj, O_RDWR, obj); + if (rc) { + TF_SetStatus(status, TF_INTERNAL, "dfs_dup() of open directory failed"); + return -1; } - return rc; + dpath->setCachedDir(_obj); + return 0; } -int DFS::dfsFindParent(std::string file, dfs_obj_t** parent) { - (*parent) = NULL; - if (file.back() == '/' && file.size() > 1) file.pop_back(); - size_t file_start = file.rfind("/") + 1; - std::string parent_path = - (file_start > 1) ? file.substr(0, file_start - 1) : "/"; - if (parent_path != "/") { - int rc = dfsLookUp(parent_path, parent, true); - if (*(parent) == NULL) return ENOENT; - return rc; - } else { - (*parent) = NULL; +// Given an object pathname, return the dfs_obj_t* for its parent directory. +// Cache the parent directory if not already cached. The caller should not +// release the parent dfs_obj_t*. + +int DFS::dfsFindParent(dfs_path_t* dpath, dfs_obj_t** obj, TF_Status* status) { + *obj = NULL; + int rc; + + dfs_path_t parent_dpath = *dpath; + parent_dpath.setRelPath(dpath->getParentPath()); + + dfs_obj_t* _obj = parent_dpath.getCachedDir(); + if (_obj) { + *obj = _obj; return 0; } + + rc = libdfs->dfs_lookup(parent_dpath.getFsys(), + parent_dpath.getRelPath().c_str(), O_RDWR, &_obj, + NULL, NULL); + if (rc) { + if (rc == ENOENT) { + TF_SetStatus(status, TF_NOT_FOUND, ""); + } else { + TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + } + return -1; + } + + if (!dfsIsDirectory(_obj)) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "parent object is not a directory"); + libdfs->dfs_release(_obj); + return -1; + } + + parent_dpath.setCachedDir(_obj); + *obj = _obj; + + return 0; } -int DFS::dfsCreateDir(std::string& dir_path, TF_Status* status) { +int DFS::dfsCreateDir(dfs_path_t* dpath, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); dfs_obj_t* temp_obj; int rc; - rc = dfsPathExists(dir_path, &temp_obj, true); + + rc = dfsLookUp(dpath, &temp_obj, status); if (!rc) { - TF_SetStatus(status, TF_ALREADY_EXISTS, ""); - return rc; + if (dfsIsDirectory(temp_obj)) { + libdfs->dfs_release(temp_obj); + TF_SetStatus(status, TF_ALREADY_EXISTS, ""); + return 0; + } else { + TF_SetStatus(status, TF_FAILED_PRECONDITION, ""); + libdfs->dfs_release(temp_obj); + return -1; + } + } else if (TF_GetCode(status) != TF_NOT_FOUND) { + return -1; } - size_t dir_start = dir_path.rfind("/") + 1; - std::string dir = dir_path.substr(dir_start); + TF_SetStatus(status, TF_OK, ""); dfs_obj_t* parent; - rc = dfsFindParent(dir_path, &parent); + rc = dfsFindParent(dpath, &parent, status); if (rc) { - TF_SetStatus(status, TF_NOT_FOUND, ""); return rc; } - rc = libdfs->dfs_mkdir(daos_fs, parent, dir.c_str(), S_IWUSR | S_IRUSR, 0); + rc = libdfs->dfs_mkdir(dpath->getFsys(), parent, dpath->getBaseName().c_str(), + S_IWUSR | S_IRUSR | S_IXUSR, 0); if (rc) { TF_SetStatus(status, TF_INTERNAL, "Error Creating Directory"); - } else { - TF_SetStatus(status, TF_OK, ""); } return rc; } -bool DFS::isRoot(std::string& file_path) { - return (file_path.empty() || file_path == "/"); +bool DFS::dfsIsDirectory(dfs_obj_t* obj) { + if (obj == NULL) { + return true; + } + mode_t mode; + libdfs->dfs_get_mode(obj, &mode); + if (S_ISDIR(mode)) { + return true; + } + return false; } -int DFS::dfsReadDir(dfs_obj_t* obj, std::vector& children) { +int DFS::dfsReadDir(dfs_t* daos_fs, dfs_obj_t* obj, + std::vector& children) { int rc = 0; daos_anchor_t anchor = {0}; uint32_t nr = STACK; @@ -525,59 +560,75 @@ int DFS::ConnectPool(std::string pool_string, TF_Status* status) { int rc = 0; if (pools.find(pool_string) != pools.end()) { - pool.first = pool_string; - pool.second = pools[pool_string]->poh; - return rc; + return 0; } - pool_info_t* po_inf = (pool_info_t*)malloc(sizeof(po_inf)); - rc = libdfs->daos_pool_connect2(pool_string.c_str(), NULL, DAOS_PC_RW, - &(po_inf->poh), NULL, NULL); + daos_handle_t poh; + daos_pool_info_t info; + rc = libdfs->daos_pool_connect2(pool_string.c_str(), NULL, DAOS_PC_RW, &poh, + &info, NULL); if (rc == 0) { - pool.first = pool_string; - pool.second = po_inf->poh; - po_inf->containers = new std::unordered_map(); + pool_info_t* po_inf = new pool_info_t(); + po_inf->poh = poh; pools[pool_string] = po_inf; } return rc; } -int DFS::ConnectContainer(std::string cont_string, int allow_creation, +int DFS::ConnectContainer(DFS* daos, std::string pool_string, + std::string cont_string, int allow_creation, TF_Status* status) { int rc = 0; - pool_info_t* po_inf = pools[pool.first]; - if (po_inf->containers->find(cont_string) != po_inf->containers->end()) { - container.first = cont_string; - container.second = (*po_inf->containers)[cont_string]; - return rc; + pool_info_t* po_inf = pools[pool_string]; + auto search = po_inf->containers.find(cont_string); + if (search != po_inf->containers.end()) { + return 0; } daos_handle_t coh; - - rc = libdfs->daos_cont_open2(pool.second, cont_string.c_str(), DAOS_COO_RW, - &coh, NULL, NULL); + daos_cont_info_t info; + rc = libdfs->daos_cont_open2(po_inf->poh, cont_string.c_str(), DAOS_COO_RW, + &coh, &info, NULL); if (rc == -DER_NONEXIST) { if (allow_creation) { - rc = libdfs->dfs_cont_create_with_label(pool.second, cont_string.c_str(), + rc = libdfs->dfs_cont_create_with_label(po_inf->poh, cont_string.c_str(), NULL, NULL, &coh, NULL); } } - if (rc == 0) { - container.first = cont_string; - container.second = coh; - (*po_inf->containers)[cont_string] = coh; - } - return rc; + if (rc != 0) return rc; + + dfs_t* daos_fs; + rc = libdfs->dfs_mount(po_inf->poh, coh, O_RDWR, &daos_fs); + if (rc != 0) return rc; + + cont_info_t* co_inf = new cont_info_t(); + co_inf->coh = coh; + co_inf->daos = daos; + co_inf->daos_fs = daos_fs; + co_inf->pool = pool_string; + co_inf->cont = cont_string; + + po_inf->containers[cont_string] = co_inf; + return 0; } int DFS::DisconnectPool(std::string pool_string) { int rc = 0; - daos_handle_t poh = pools[pool_string]->poh; - rc = libdfs->daos_pool_disconnect(poh, NULL); + pool_info_t* po_inf = pools[pool_string]; + + for (;;) { + auto cont_it = po_inf->containers.cbegin(); + if (cont_it == po_inf->containers.cend()) { + break; + } + rc = DisconnectContainer(pool_string, (*cont_it++).first); + if (rc) return rc; + } + + rc = libdfs->daos_pool_disconnect(po_inf->poh, NULL); if (rc == 0) { - delete pools[pool_string]->containers; - free(pools[pool_string]); + delete po_inf; pools.erase(pool_string); } return rc; @@ -585,10 +636,18 @@ int DFS::DisconnectPool(std::string pool_string) { int DFS::DisconnectContainer(std::string pool_string, std::string cont_string) { int rc = 0; - daos_handle_t coh = (*pools[pool_string]->containers)[cont_string]; - rc = libdfs->daos_cont_close(coh, nullptr); + cont_info_t* co_inf = pools[pool_string]->containers[cont_string]; + + if (co_inf->daos_fs) { + rc = libdfs->dfs_umount(co_inf->daos_fs); + if (rc) return rc; + co_inf->daos_fs = nullptr; + } + + rc = libdfs->daos_cont_close(co_inf->coh, nullptr); if (rc == 0) { - pools[pool_string]->containers->erase(cont_string); + delete co_inf; + pools[pool_string]->containers.erase(cont_string); } return rc; } @@ -691,6 +750,91 @@ int64_t ReadBuffer::CopyFromCache(char* ret, const size_t ret_offset, return static_cast(aRead_size); } +dfs_path_t::dfs_path_t(cont_info_t* cont_info, std::string rel_path) + : cont_info(cont_info), rel_path(rel_path) {} + +dfs_path_t& dfs_path_t::operator=(dfs_path_t other) { + cont_info = other.cont_info; + rel_path = other.rel_path; + return *this; +} + +std::string dfs_path_t::getFullPath(void) { + std::string full_path = + "/" + cont_info->pool + "/" + cont_info->cont + rel_path; + return full_path; +} + +std::string dfs_path_t::getRelPath(void) { return rel_path; } + +std::string dfs_path_t::getParentPath(void) { + if (rel_path == "/") { + return rel_path; // root is its own parent + } + + std::string parent_path; + size_t slash_pos = rel_path.rfind("/"); + if (slash_pos == 0) { + parent_path = "/"; + } else { + parent_path = rel_path.substr(0, slash_pos); + } + return parent_path; +} + +std::string dfs_path_t::getBaseName(void) { + size_t base_start = rel_path.rfind("/") + 1; + std::string base_name = rel_path.substr(base_start); + return base_name; +} + +DFS* dfs_path_t::getDAOS(void) { return cont_info->daos; } + +dfs_t* dfs_path_t::getFsys(void) { return cont_info->daos_fs; } + +void dfs_path_t::setRelPath(std::string new_path) { rel_path = new_path; } + +bool dfs_path_t::isRoot(void) { return (rel_path == "/"); } + +dfs_obj_t* dfs_path_t::getCachedDir(void) { + auto search = cont_info->dir_map.find(rel_path); + if (search != cont_info->dir_map.end()) { + return search->second; + } else { + return nullptr; + } +} + +void dfs_path_t::setCachedDir(dfs_obj_t* dir_obj) { + cont_info->dir_map[rel_path] = dir_obj; +} + +void dfs_path_t::clearCachedDir(void) { cont_info->dir_map.erase(rel_path); } + +void dfs_path_t::clearFsysCachedDirs(void) { + cont_info->daos->clearDirCache(cont_info->dir_map); +} + +int dfs_path_t::getCachedSize(daos_size_t& size) { + auto search = cont_info->size_map.find(rel_path); + if (search != cont_info->size_map.end()) { + size = search->second; + return 0; + } else { + return -1; + } +} + +void dfs_path_t::setCachedSize(daos_size_t size) { + cont_info->size_map[rel_path] = size; +} + +void dfs_path_t::clearCachedSize(void) { cont_info->size_map.erase(rel_path); } + +void dfs_path_t::clearFsysCachedSizes(void) { + cont_info->daos->clearSizeCache(cont_info->size_map); +} + static void* LoadSharedLibrary(const char* library_filename, TF_Status* status) { std::string full_path; @@ -803,7 +947,11 @@ void libDFS::LoadAndBindDaosLibs(TF_Status* status) { BIND_DFS_FUNC(libdaos_handle_, daos_pool_query); BIND_DFS_FUNC(libdfs_handle_, dfs_cont_create_with_label); + BIND_DFS_FUNC(libdfs_handle_, dfs_dup); + BIND_DFS_FUNC(libdfs_handle_, dfs_get_mode); BIND_DFS_FUNC(libdfs_handle_, dfs_get_size); + BIND_DFS_FUNC(libdfs_handle_, dfs_lookup); + BIND_DFS_FUNC(libdfs_handle_, dfs_lookup_rel); BIND_DFS_FUNC(libdfs_handle_, dfs_mkdir); BIND_DFS_FUNC(libdfs_handle_, dfs_mount); BIND_DFS_FUNC(libdfs_handle_, dfs_move); diff --git a/tensorflow_io/core/filesystems/dfs/dfs_utils.h b/tensorflow_io/core/filesystems/dfs/dfs_utils.h index b2c57ffba..1522c8115 100644 --- a/tensorflow_io/core/filesystems/dfs/dfs_utils.h +++ b/tensorflow_io/core/filesystems/dfs/dfs_utils.h @@ -32,87 +32,60 @@ #include "tensorflow/c/tf_status.h" #include "tensorflow_io/core/filesystems/filesystem_plugins.h" -/** object struct that is instantiated for a DFS open object */ -struct dfs_obj { - /** DAOS object ID */ - daos_obj_id_t oid; - /** DAOS object open handle */ - daos_handle_t oh; - /** mode_t containing permissions & type */ - mode_t mode; - /** open access flags */ - int flags; - /** DAOS object ID of the parent of the object */ - daos_obj_id_t parent_oid; - /** entry name of the object in the parent */ - char name[DFS_MAX_NAME + 1]; - union { - /** Symlink value if object is a symbolic link */ - char* value; - struct { - /** Default object class for all entries in dir */ - daos_oclass_id_t oclass; - /** Default chunk size for all entries in dir */ - daos_size_t chunk_size; - } d; - }; -}; +typedef std::unordered_map dir_cache_t; +typedef std::unordered_map size_cache_t; -/** dfs struct that is instantiated for a mounted DFS namespace */ -struct dfs { - /** flag to indicate whether the dfs is mounted */ - bool mounted; - /** flag to indicate whether dfs is mounted with balanced mode (DTX) */ - bool use_dtx; - /** lock for threadsafety */ - pthread_mutex_t lock; - /** uid - inherited from container. */ - uid_t uid; - /** gid - inherited from container. */ - gid_t gid; - /** Access mode (RDONLY, RDWR) */ - int amode; - /** Open pool handle of the DFS */ - daos_handle_t poh; - /** Open container handle of the DFS */ - daos_handle_t coh; - /** Object ID reserved for this DFS (see oid_gen below) */ - daos_obj_id_t oid; - /** superblock object OID */ - daos_obj_id_t super_oid; - /** Open object handle of SB */ - daos_handle_t super_oh; - /** Root object info */ - dfs_obj_t root; - /** DFS container attributes (Default chunk size, oclass, etc.) */ - dfs_attr_t attr; - /** Optional prefix to account for when resolving an absolute path */ - char* prefix; - daos_size_t prefix_len; -}; +class DFS; -struct dfs_entry { - /** mode (permissions + entry type) */ - mode_t mode; - /** Object ID if not a symbolic link */ - daos_obj_id_t oid; - /* Time of last access */ - time_t atime; - /* Time of last modification */ - time_t mtime; - /* Time of last status change */ - time_t ctime; - /** chunk size of file */ - daos_size_t chunk_size; - /** Sym Link value */ - char* value; +// Class for per-DFS-filesystem state variables, one per container in the +// 'containers' map. +class cont_info_t { + public: + daos_handle_t coh; + DFS* daos; + std::string pool; + std::string cont; + dfs_t* daos_fs; + dir_cache_t dir_map; + size_cache_t size_map; }; typedef struct pool_info { daos_handle_t poh; - std::unordered_map* containers; + std::unordered_map containers; } pool_info_t; +// Class for per-DFS-file state variables and common path operations. State +// includes the filesystem in which the file resides. +class dfs_path_t { + public: + dfs_path_t() { cont_info = nullptr; }; + dfs_path_t(cont_info_t* cont_info, std::string rel_path); + dfs_path_t& operator=(dfs_path_t other); + DFS* getDAOS(void); + dfs_t* getFsys(void); + std::string getFullPath(void); + std::string getRelPath(void); + std::string getParentPath(void); + std::string getBaseName(void); + void setRelPath(std::string); + bool isRoot(void); + + dfs_obj_t* getCachedDir(void); + void setCachedDir(dfs_obj_t* dir_obj); + void clearCachedDir(void); + void clearFsysCachedDirs(void); + + int getCachedSize(daos_size_t& size); + void setCachedSize(daos_size_t size); + void clearCachedSize(void); + void clearFsysCachedSizes(void); + + private: + cont_info_t* cont_info; + std::string rel_path; +}; + typedef std::pair id_handle_t; enum File_Mode { READ, WRITE, APPEND, READWRITE }; @@ -168,8 +141,20 @@ class libDFS { daos_handle_t*, dfs_t**)> dfs_cont_create_with_label; + std::function dfs_dup; + + std::function dfs_get_mode; + std::function dfs_get_size; + std::function + dfs_lookup; + + std::function + dfs_lookup_rel; + std::function dfs_mkdir; @@ -216,60 +201,52 @@ class libDFS { void* libduns_handle_; }; +// Singlton class for the DFS plugin, containing all its global state. class DFS { public: - bool connected; - dfs_t* daos_fs; - id_handle_t pool; - id_handle_t container; - daos_handle_t mEventQueueHandle; std::unique_ptr libdfs; std::unordered_map pools; - std::unordered_map path_map; - static std::unordered_map size_map; explicit DFS(TF_Status* status); int ParseDFSPath(const std::string& path, std::string& pool_string, std::string& cont_string, std::string& filename); - int Setup(const std::string& path, std::string& pool_string, - std::string& cont_string, std::string& file_path, + int Setup(DFS* daos, const std::string path, dfs_path_t& dpath, TF_Status* status); - void Connect(std::string& pool_string, std::string& cont_string, + void Connect(DFS* daos, std::string& pool_string, std::string& cont_string, int allow_cont_creation, TF_Status* status); - void Disconnect(TF_Status* status); + int Query(id_handle_t pool, id_handle_t container, dfs_t* daos_fs); - int Mount(); + int ClearConnections(); - int Unmount(); + void clearDirCache(dir_cache_t& dir_cache); - int Query(); + void clearAllDirCaches(void); - int ClearConnections(); + void clearSizeCache(size_cache_t& size_cache); - void dfsNewFile(std::string file_path, File_Mode mode, int flags, - dfs_obj_t** obj, TF_Status* status); + void clearAllSizeCaches(void); - int dfsPathExists(std::string file, dfs_obj_t** obj, - bool isDirectory = false); + void dfsNewFile(dfs_path_t* dpath, File_Mode mode, int flags, dfs_obj_t** obj, + TF_Status* status); - int dfsFindParent(std::string file, dfs_obj_t** parent); + int dfsFindParent(dfs_path_t* dpath, dfs_obj_t** obj, TF_Status* status); - int dfsCreateDir(std::string& dir_path, TF_Status* status); + int dfsCreateDir(dfs_path_t* dpath, TF_Status* status); - int dfsDeleteObject(std::string dir_path, bool is_dir, bool recursive, + int dfsDeleteObject(dfs_path_t* dpath, bool is_dir, bool recursive, TF_Status* status); - bool isRoot(std::string& file_path); + bool dfsIsDirectory(dfs_obj_t* obj); - int dfsReadDir(dfs_obj_t* obj, std::vector& children); + int dfsReadDir(dfs_t* daos_fs, dfs_obj_t* obj, + std::vector& children); - int dfsLookUp(std::string dir_path, dfs_obj_t** obj, - bool isDirectory = false); + int dfsLookUp(dfs_path_t* dpath, dfs_obj_t** obj, TF_Status* status); dfs_obj_t* lookup_insert_dir(const char* name, mode_t* mode); @@ -278,7 +255,8 @@ class DFS { private: int ConnectPool(std::string pool_string, TF_Status* status); - int ConnectContainer(std::string cont_string, int allow_creation, + int ConnectContainer(DFS* daos, std::string pool_string, + std::string cont_string, int allow_creation, TF_Status* status); int DisconnectPool(std::string pool_string); diff --git a/tensorflow_io/core/filesystems/filesystem_plugins.cc b/tensorflow_io/core/filesystems/filesystem_plugins.cc index 9631246bb..190e8d9ee 100644 --- a/tensorflow_io/core/filesystems/filesystem_plugins.cc +++ b/tensorflow_io/core/filesystems/filesystem_plugins.cc @@ -40,5 +40,5 @@ TFIO_PLUGIN_EXPORT void TF_InitPlugin(TF_FilesystemPluginInfo* info) { tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "hdfs"); tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "viewfs"); tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[6], "har"); - tensorflow::io::dfs::ProvideFilesystemSupportFor(&info->ops[7], "dfs"); + tensorflow::io::dfs::ProvideFilesystemSupportFor(&info->ops[7], "daos"); } diff --git a/tests/test_dfs.py b/tests/test_dfs.py index 33702fedb..40bfaadd6 100644 --- a/tests/test_dfs.py +++ b/tests/test_dfs.py @@ -21,8 +21,8 @@ def __init__(self, methodName="runTest"): # pylint: disable=invalid-name self.pool_uuid = os.environ["POOL_UUID"] self.container = os.environ["CONT_LABEL"] self.container_uuid = os.environ["CONT_UUID"] - self.path_root = "dfs://" + os.path.join(self.pool, self.container) - self.path_root_with_uuid = "dfs://" + os.path.join( + self.path_root = "daos://" + os.path.join(self.pool, self.container) + self.path_root_with_uuid = "daos://" + os.path.join( self.pool_uuid, self.container_uuid ) super().__init__(methodName)