diff --git a/.github/workflows/build_and_push_docker_images.yml b/.github/workflows/build_and_push_docker_images.yml index a11b38ed4..ebdc777b1 100644 --- a/.github/workflows/build_and_push_docker_images.yml +++ b/.github/workflows/build_and_push_docker_images.yml @@ -25,7 +25,7 @@ jobs: id: docker-deps-cache with: path: ci/deps/deps.Dockerfile - key: docker-${{ hashFiles('ci/deps/deps.Dockerfile') }} + key: docker-${{ hashFiles('docker/deps.Dockerfile') }} - name: Checkout uses: actions/checkout@v3 - name: Login to Docker Hub diff --git a/CMake/HermesConfig.cmake b/CMake/HermesConfig.cmake index 60e17f1eb..737992aa5 100644 --- a/CMake/HermesConfig.cmake +++ b/CMake/HermesConfig.cmake @@ -91,6 +91,15 @@ if(thallium_FOUND) message(STATUS "found thallium at ${thallium_DIR}") endif() +# ADIOS +if(HERMES_ENABLE_ADIOS) + find_package(ADIOS2 REQUIRED) + message(STATUS "found adios2") + include_directories(${ADIOS2_INCLUDE_DIRS}) + link_directories(${ADIOS2_LIBRARY_DIRS}) + add_compile_definitions(HERMES_ENABLE_ADIOS) +endif() + #----------------------------------------------------------------------------- # Mark hermes as found and set all needed packages #----------------------------------------------------------------------------- @@ -107,6 +116,7 @@ set(Hermes_LIBRARIES -ldl -lrt -lc -pthread thallium hermes + ${ADIOS2_LIBRARIES} ${Boost_LIBRARIES} ${Hermes_LIBRARY}) set(Hermes_LIBRARY_DIRS ${HermeShm_LIBRARY_DIRS}) # Set Hermes client dirs (equal to Hermes dirs) diff --git a/CMakeLists.txt b/CMakeLists.txt index 93d9c822c..947b22eb9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -24,8 +24,8 @@ option(HERMES_ENABLE_MPIIO_ADAPTER "Build the Hermes MPI-IO adapter." OFF) option(HERMES_ENABLE_VFD "Build the Hermes HDF5 Virtual File Driver" OFF) option(HERMES_ENABLE_PUBSUB_ADAPTER "Build the Hermes pub/sub adapter." OFF) option(HERMES_ENABLE_KVSTORE "Build the Hermes KVStore adapter." OFF) -option(HERMES_ENABLE_PYTHON "Build the Hermes Python wrapper" ON) -option(HERMES_ENABLE_ADIOS "Build the Hermes Python wrapper" ON) +option(HERMES_ENABLE_PYTHON "Build the Hermes Python wrapper" OFF) +option(HERMES_ENABLE_ADIOS "Build the Hermes Python wrapper" OFF) option(HERMES_MPICH "Specify that this a MPICH build" OFF) option(HERMES_OPENMPI "Specify that this a OpenMPI build" OFF) @@ -169,6 +169,12 @@ endif() include_directories(${Boost_INCLUDE_DIRS}) message("Boost: ${Boost_LIBRARIES}") +# Libelf +pkg_check_modules(libelf REQUIRED libelf) +message(STATUS "found libelf at ${libelf_INCLUDE_DIRS}") +include_directories(${libelf_INCLUDE_DIRS}) +link_directories(${libelf_LIBRARY_DIRS}) + # HDF5 if(HERMES_ENABLE_VFD) set(HERMES_REQUIRED_HDF5_VERSION 1.14.0) @@ -200,6 +206,14 @@ if(HERMES_ENABLE_VFD) endif() endif() +# ADIOS +if(HERMES_ENABLE_ADIOS) + find_package(ADIOS2 REQUIRED) + message(STATUS "found adios2 at ${ADIOS2_INCLUDE_DIRS}") + include_directories(${ADIOS2_INCLUDE_DIRS}) + link_directories(${ADIOS2_LIBRARY_DIRS}) + add_compile_definitions(HERMES_ENABLE_ADIOS) +endif() #------------------------------------------------------------------------------ # Setup CMake Environment @@ -233,6 +247,7 @@ endif() #----------------------------------------------------------------------------- # Main includes include_directories(${HermesShm_INCLUDE_DIRS}) +link_directories(${HermesShm_LIBRARY_DIRS}) include_directories(${CMAKE_SOURCE_DIR}) include_directories(${CMAKE_SOURCE_DIR}/include) # Required Task includes @@ -263,13 +278,15 @@ set(Hermes_CLIENT_LIBRARIES ${HermesShm_LIBRARIES} yaml-cpp cereal::cereal - -ldl -lrt -lc -pthread hrun_client) + -ldl -lrt -lc -pthread hrun_client + ${ADIOS2_LIBRARIES}) set(Hermes_CLIENT_DEPS hrun_client) set(Hermes_RUNTIME_LIBRARIES ${Hermes_CLIENT_LIBRARIES} hrun_runtime - ${Boost_LIBRARIES}) + ${Boost_LIBRARIES} + ${ADIOS2_LIBRARIES}) set(Hermes_RUNTIME_DEPS hrun_client hrun_runtime) diff --git a/ci/hermes/packages/hermes/package.py b/ci/hermes/packages/hermes/package.py index 4b8ceda3b..631a1f5df 100644 --- a/ci/hermes/packages/hermes/package.py +++ b/ci/hermes/packages/hermes/package.py @@ -38,6 +38,7 @@ class Hermes(CMakePackage): variant('ares', default=False, description='Enable full libfabric install') variant('zmq', default=False, description='Build ZeroMQ tests') variant('adios', default=False, description='Build Adios tests') + variant('python', default=False, description='Build Python Wrapper') variant('encrypt', default=False, description='Build Adios tests') variant('compress', default=False, description='Build Adios tests') @@ -73,6 +74,10 @@ def cmake_args(self): args.append(self.define('HERMES_ENABLE_COMPRESSION', 'ON')) if '+encrypt' in self.spec: args.append(self.define('HERMES_ENABLE_ENCRYPTION', 'ON')) + if '+adios' in self.spec: + args.append(self.define('HERMES_ENABLE_ADIOS', 'ON')) + if '+python' in self.spec: + args.append(self.define('HERMES_ENABLE_PYTHON', 'ON')) return args def set_include(self, env, path): diff --git a/ci/hermes/packages/hermes_shm/package.py b/ci/hermes/packages/hermes_shm/package.py index fab60e16f..2cbe65223 100644 --- a/ci/hermes/packages/hermes_shm/package.py +++ b/ci/hermes/packages/hermes_shm/package.py @@ -4,8 +4,6 @@ class HermesShm(CMakePackage): homepage = "https://github.com/lukemartinlogan/hermes_shm/wiki" git = "https://github.com/lukemartinlogan/hermes_shm.git" version('master', branch='master') - version("1.1.0", sha256="080d5361cff22794b670e4544c532926ca8b6d6ec695af25596efe035bfffea5") - version("1.0.0", sha256="a79f01d531ce89985ad59a2f62b41d74c2385e48d929e2f4ad895ae34137573b") # Main variants variant('debug', default=False, description='Build shared libraries') @@ -21,6 +19,7 @@ class HermesShm(CMakePackage): depends_on('catch2@3.0.1') depends_on('yaml-cpp') depends_on('doxygen@1.9.3') + depends_on('libelf') # Machine variants variant('ares', default=False, description='Build in ares') @@ -30,7 +29,7 @@ class HermesShm(CMakePackage): # Main dependencies depends_on('mochi-thallium+cereal@0.10.1', when='+mochi') depends_on('cereal', when='+cereal') - depends_on('boost@1.7: +context +fiber', when='+boost') + depends_on('boost@1.7: +context +fiber +coroutine +regex +system +filesystem +serialization +pic +math', when='+boost') depends_on('mpi', when='+mpiio') depends_on('hdf5@1.14.0', when='+vfd') depends_on('libzmq', '+zmq') diff --git a/ci/install_deps.sh b/ci/install_deps.sh index 764bfc0a5..1ab54d80a 100755 --- a/ci/install_deps.sh +++ b/ci/install_deps.sh @@ -10,8 +10,8 @@ docker run -d \ --mount src=${PWD},target=/hermes,type=bind \ --name hermes_deps_c \ --network host \ ---memory=4G \ ---shm-size=4G \ +--memory=8G \ +--shm-size=8G \ -p 4000:4000 \ -p 4001:4001 \ lukemartinlogan/hermes_deps \ diff --git a/config/hermes_client_default.yaml b/config/hermes_client_default.yaml index fb5bee72e..4771bbc91 100644 --- a/config/hermes_client_default.yaml +++ b/config/hermes_client_default.yaml @@ -1,10 +1,10 @@ stop_daemon: false -path_inclusions: ["/tmp/test_hermes"] -path_exclusions: ["/"] +path_inclusions: ["/tmp/test_hermes/*"] +path_exclusions: ["/*"] file_page_size: 1024KB base_adapter_mode: kDefault flushing_mode: kAsync file_adapter_configs: - - path: "/" + - path: "/*" page_size: 1MB mode: kDefault \ No newline at end of file diff --git a/docker/deps.Dockerfile b/docker/deps.Dockerfile index 29939dd7c..5a7a83cf7 100644 --- a/docker/deps.Dockerfile +++ b/docker/deps.Dockerfile @@ -1,5 +1,5 @@ # NOTE(llogan): This dockerfile assumes that -# hermes github is mounted on /hermes +# hermes github is the current working directory # Install ubuntu 22.04 FROM ubuntu:22.04 @@ -38,9 +38,10 @@ ENV SPACK_DIR="${HOME}/spack" ENV SPACK_VERSION="v0.20.2" ENV HERMES_DEPS_DIR="${HOME}/hermes_deps" ENV HERMES_DIR="${HOME}/hermes" +COPY ci/module_load.sh module_load.sh # Install Spack -RUN . /hermes/ci/module_load.sh && \ +RUN . /module_load.sh && \ git clone -b ${SPACK_VERSION} https://github.com/spack/spack ${SPACK_DIR} && \ . "${SPACK_DIR}/share/spack/setup-env.sh" && \ git clone -b dev https://github.com/lukemartinlogan/hermes.git ${HERMES_DEPS_DIR} && \ @@ -50,7 +51,7 @@ RUN . /hermes/ci/module_load.sh && \ spack external find # Install hermes_shm -RUN . /hermes/ci/module_load.sh && \ +RUN . /module_load.sh && \ . "${SPACK_DIR}/share/spack/setup-env.sh" && \ spack external find && \ spack install hermes_shm@master+vfd+mpiio^mpich@3.3.2 diff --git a/hermes_adapters/mpiio/CMakeLists.txt b/hermes_adapters/mpiio/CMakeLists.txt index cfd2c664a..b6430fceb 100644 --- a/hermes_adapters/mpiio/CMakeLists.txt +++ b/hermes_adapters/mpiio/CMakeLists.txt @@ -18,7 +18,8 @@ set(INTERCEPTOR_DEPS add_library(hermes_mpiio SHARED ${CMAKE_CURRENT_SOURCE_DIR}/mpiio_api.cc) add_dependencies(hermes_mpiio ${INTERCEPTOR_DEPS}) -target_link_libraries(hermes_mpiio MPI::MPI_CXX stdc++fs dl ${INTERCEPTOR_DEPS}) +target_link_libraries(hermes_mpiio + MPI::MPI_CXX stdc++fs ${libelf_LIBRARIES} dl ${INTERCEPTOR_DEPS}) #----------------------------------------------------------------------------- # Add Target(s) to CMake Install diff --git a/hermes_adapters/posix/CMakeLists.txt b/hermes_adapters/posix/CMakeLists.txt index c0e412a99..d51640f46 100644 --- a/hermes_adapters/posix/CMakeLists.txt +++ b/hermes_adapters/posix/CMakeLists.txt @@ -11,7 +11,7 @@ set(INTERCEPTOR_DEPS add_library(hermes_posix SHARED ${CMAKE_CURRENT_SOURCE_DIR}/posix_api.cc) add_dependencies(hermes_posix ${INTERCEPTOR_DEPS}) -target_link_libraries(hermes_posix MPI::MPI_CXX stdc++fs dl ${INTERCEPTOR_DEPS}) +target_link_libraries(hermes_posix MPI::MPI_CXX stdc++fs ${libelf_LIBRARIES} dl ${INTERCEPTOR_DEPS}) #----------------------------------------------------------------------------- # Add Target(s) to CMake Install diff --git a/hermes_adapters/posix/posix_api.cc b/hermes_adapters/posix/posix_api.cc index a6cc7836f..af1fedc45 100644 --- a/hermes_adapters/posix/posix_api.cc +++ b/hermes_adapters/posix/posix_api.cc @@ -37,17 +37,16 @@ namespace stdfs = std::filesystem; extern "C" { -static __attribute__((constructor(101))) void init_posix(void) { - HERMES_POSIX_API; - HERMES_POSIX_FS; - TRANSPARENT_HERMES();; -}/**/ +//static __attribute__((constructor(101))) void init_posix(void) { +// HERMES_POSIX_API; +// HERMES_POSIX_FS; +// TRANSPARENT_HERMES();; +//} /** * POSIX */ int HERMES_DECL(open)(const char *path, int flags, ...) { -// TRANSPARENT_HERMES(); int mode = 0; auto real_api = HERMES_POSIX_API; auto fs_api = HERMES_POSIX_FS; @@ -57,7 +56,10 @@ int HERMES_DECL(open)(const char *path, int flags, ...) { mode = va_arg(arg, int); va_end(arg); } - if (fs_api->IsPathTracked(path)) { + if (real_api->IsInterceptorLoaded()) { + TRANSPARENT_HERMES(); + } + if (real_api->IsInterceptorLoaded() && fs_api->IsPathTracked(path)) { HILOG(kDebug, "Intercept open for filename: {}" " and mode: {}" " is tracked.", path, flags) @@ -68,24 +70,29 @@ int HERMES_DECL(open)(const char *path, int flags, ...) { return f.hermes_fd_; } + int fd = -1; if (flags & O_CREAT || flags & O_TMPFILE) { - return real_api->open(path, flags, mode); + fd = real_api->open(path, flags, mode); + } else { + fd = real_api->open(path, flags); } - return real_api->open(path, flags); + return fd; } int HERMES_DECL(open64)(const char *path, int flags, ...) { -// TRANSPARENT_HERMES(); int mode = 0; auto real_api = HERMES_POSIX_API; auto fs_api = HERMES_POSIX_FS; - if (flags & O_CREAT) { + if (flags & O_CREAT || flags & O_TMPFILE) { va_list arg; va_start(arg, flags); mode = va_arg(arg, int); va_end(arg); } - if (fs_api->IsPathTracked(path)) { + if (real_api->IsInterceptorLoaded()) { + TRANSPARENT_HERMES(); + } + if (real_api->IsInterceptorLoaded() && fs_api->IsPathTracked(path)) { HILOG(kDebug, "Intercept open64 for filename: {}" " and mode: {}" " is tracked.", path, flags) @@ -94,17 +101,22 @@ int HERMES_DECL(open64)(const char *path, int flags, ...) { stat.st_mode_ = mode; return fs_api->Open(stat, path).hermes_fd_; } - if (flags & O_CREAT) { - return real_api->open64(path, flags, mode); + int fd = -1; + if (flags & O_CREAT || flags & O_TMPFILE) { + fd = real_api->open64(path, flags, mode); + } else { + fd = real_api->open64(path, flags); } - return real_api->open64(path, flags); + return fd; } int HERMES_DECL(__open_2)(const char *path, int oflag) { -// TRANSPARENT_HERMES(); auto real_api = HERMES_POSIX_API; auto fs_api = HERMES_POSIX_FS; - if (fs_api->IsPathTracked(path)) { + if (real_api->IsInterceptorLoaded()) { + TRANSPARENT_HERMES(); + } + if (real_api->IsInterceptorLoaded() && fs_api->IsPathTracked(path)) { HILOG(kDebug, "Intercept __open_2 for filename: {}" " and mode: {}" " is tracked.", path, oflag) @@ -117,11 +129,13 @@ int HERMES_DECL(__open_2)(const char *path, int oflag) { } int HERMES_DECL(creat)(const char *path, mode_t mode) { -// TRANSPARENT_HERMES(); std::string path_str(path); auto real_api = HERMES_POSIX_API; auto fs_api = HERMES_POSIX_FS; - if (fs_api->IsPathTracked(path)) { + if (real_api->IsInterceptorLoaded()) { + TRANSPARENT_HERMES(); + } + if (real_api->IsInterceptorLoaded() && fs_api->IsPathTracked(path)) { HILOG(kDebug, "Intercept creat for filename: {}" " and mode: {}" " is tracked.", path, mode) @@ -134,11 +148,13 @@ int HERMES_DECL(creat)(const char *path, mode_t mode) { } int HERMES_DECL(creat64)(const char *path, mode_t mode) { -// TRANSPARENT_HERMES(); std::string path_str(path); auto real_api = HERMES_POSIX_API; auto fs_api = HERMES_POSIX_FS; - if (fs_api->IsPathTracked(path)) { + if (real_api->IsInterceptorLoaded()) { + TRANSPARENT_HERMES(); + } + if (real_api->IsInterceptorLoaded() && fs_api->IsPathTracked(path)) { HILOG(kDebug, "Intercept creat64 for filename: {}" " and mode: {}" " is tracked.", path, mode) @@ -440,6 +456,30 @@ int HERMES_DECL(fsync)(int fd) { return real_api->fsync(fd); } +int HERMES_DECL(ftruncate)(int fd, off_t length) { + bool stat_exists; + auto real_api = HERMES_POSIX_API; + auto fs_api = HERMES_POSIX_FS; + if (fs_api->IsFdTracked(fd)) { + File f; f.hermes_fd_ = fd; + HILOG(kDebug, "Intercepted ftruncate.") + return fs_api->Truncate(f, stat_exists, length); + } + return real_api->ftruncate(fd, length); +} + +int HERMES_DECL(ftruncate64)(int fd, off64_t length) { + bool stat_exists; + auto real_api = HERMES_POSIX_API; + auto fs_api = HERMES_POSIX_FS; + if (fs_api->IsFdTracked(fd)) { + File f; f.hermes_fd_ = fd; + HILOG(kDebug, "Intercepted ftruncate.") + return fs_api->Truncate(f, stat_exists, length); + } + return real_api->ftruncate64(fd, length); +} + int HERMES_DECL(close)(int fd) { bool stat_exists; auto real_api = HERMES_POSIX_API; @@ -462,7 +502,7 @@ int HERMES_DECL(flock)(int fd, int operation) { // TODO(llogan): implement? return 0; } - return real_api->close(fd); + return real_api->flock(fd, operation); } int HERMES_DECL(remove)(const char *pathname) { diff --git a/hermes_adapters/posix/posix_api.h b/hermes_adapters/posix/posix_api.h index 9b8e44b66..0b79b51e3 100644 --- a/hermes_adapters/posix/posix_api.h +++ b/hermes_adapters/posix/posix_api.h @@ -87,6 +87,8 @@ typedef int (*posix_fadvise_t)(int fd, off_t offset, typedef int (*flock_t)(int fd, int operation); typedef int (*remove_t)(const char *pathname); typedef int (*unlink_t)(const char *pathname); +typedef int (*ftruncate_t)(int fd, off_t length); +typedef int (*ftruncate64_t)(int fd, off64_t length); } namespace hermes::adapter { @@ -96,6 +98,9 @@ static int fxstat_to_fstat(int fd, struct stat * stbuf); /** Pointers to the real posix API */ class PosixApi : public RealApi { + public: + bool is_loaded_ = false; + public: /** open */ open_t open = nullptr; @@ -160,6 +165,10 @@ class PosixApi : public RealApi { remove_t remove = nullptr; /** unlink */ unlink_t unlink = nullptr; + /** ftruncate */ + ftruncate_t ftruncate = nullptr; + /** ftruncate64 */ + ftruncate64_t ftruncate64 = nullptr; PosixApi() : RealApi("open", "posix_intercepted") { open = (open_t)dlsym(real_lib_, "open"); @@ -221,6 +230,19 @@ class PosixApi : public RealApi { REQUIRE_API(remove) unlink = (unlink_t)dlsym(real_lib_, "unlink"); REQUIRE_API(unlink) + ftruncate = (ftruncate_t)dlsym(real_lib_, "ftruncate"); + REQUIRE_API(ftruncate) + ftruncate64 = (ftruncate64_t)dlsym(real_lib_, "ftruncate64"); + REQUIRE_API(ftruncate64) + } + + bool IsInterceptorLoaded() { + if (is_loaded_) { + return true; + } + InterceptorApi check("open", "posix_intercepted"); + is_loaded_ = check.is_loaded_; + return is_loaded_; } }; @@ -233,6 +255,7 @@ class PosixApi : public RealApi { hshm::EasySingleton<::hermes::adapter::PosixApi>::GetInstance() #define HERMES_POSIX_API_T hermes::adapter::PosixApi* + namespace hermes::adapter { /** Used for compatability with older kernel versions */ static int fxstat_to_fstat(int fd, struct stat *stbuf) { diff --git a/hermes_adapters/real_api.h b/hermes_adapters/real_api.h index 9aba4cb40..6146348cb 100644 --- a/hermes_adapters/real_api.h +++ b/hermes_adapters/real_api.h @@ -13,8 +13,13 @@ #ifndef HERMES_ADAPTER_API_H #define HERMES_ADAPTER_API_H +#undef DEPRECATED + #include #include +// #include +#include + namespace stdfs = std::filesystem; @@ -44,6 +49,7 @@ struct RealApi { static int callback(struct dl_phdr_info *info, size_t size, void *data) { auto iter = (RealApiIter*)data; auto lib = dlopen(info->dlpi_name, RTLD_GLOBAL | RTLD_NOW); + // auto lib = dlopen(info->dlpi_name, RTLD_NOLOAD | RTLD_NOW); auto exists = dlsym(lib, iter->symbol_name_); void *is_intercepted = (void*)dlsym(lib, iter->is_intercepted_); @@ -71,6 +77,123 @@ struct RealApi { } }; +template +struct InterceptorApi { + PosixT *posix_; + std::string lib_path_; + bool is_loaded_; + int lib_fd_ = -1; + Elf *lib_elf_ = nullptr; + + static int callback(struct dl_phdr_info *info, size_t size, void *data) { + auto iter = (RealApiIter*)data; + auto lib = dlopen(info->dlpi_name, RTLD_GLOBAL | RTLD_NOW); + // auto lib = dlopen(info->dlpi_name, RTLD_NOLOAD | RTLD_NOW); + auto exists = dlsym(lib, iter->symbol_name_); + void *is_intercepted = + (void*)dlsym(lib, iter->is_intercepted_); + if (is_intercepted && exists && !iter->lib_path_) { + iter->lib_path_ = info->dlpi_name; + if (iter->lib_path_ == nullptr || strlen(iter->lib_path_) == 0) { + Dl_info dl_info; + if (dladdr(is_intercepted, &dl_info) == 0) { + iter->lib_path_ = ""; + } else { + iter->lib_path_ = dl_info.dli_fname; + } + } + } + return 0; + } + + bool IsLoaded(const char *filename) { + // Open the ELF file + lib_fd_ = posix_->open(filename, O_RDONLY); + if (lib_fd_ < 0) { + return false; + } + + // Initialize libelf + if (elf_version(EV_CURRENT) == EV_NONE) { + return false; + } + + // Open ELF descriptor + lib_elf_ = elf_begin(lib_fd_, ELF_C_READ, NULL); + if (!lib_elf_) { + return false; + } + + // Get the ELF header + GElf_Ehdr ehdr; + if (!gelf_getehdr(lib_elf_, &ehdr)) { + return false; + } + + // Scan the dynamic table + Elf_Scn *scn = NULL; + while ((scn = elf_nextscn(lib_elf_, scn)) != NULL) { + GElf_Shdr shdr = {}; + if (gelf_getshdr(scn, &shdr) != &shdr) { + return false; + } + + if (shdr.sh_type == SHT_DYNAMIC) { + Elf_Data *data = NULL; + data = elf_getdata(scn, data); + if (data == NULL) { + return false; + } + + size_t sh_entsize = gelf_fsize(lib_elf_, ELF_T_DYN, 1, EV_CURRENT); + + for (size_t i = 0; i < shdr.sh_size / sh_entsize; i++) { + GElf_Dyn dyn = {}; + if (gelf_getdyn(data, i, &dyn) != &dyn) { + return false; + } + const char *lib_name = + elf_strptr(lib_elf_, shdr.sh_link, dyn.d_un.d_val); + if (lib_name) { + auto lib = dlopen(lib_name, RTLD_NOLOAD | RTLD_NOW); + if (!lib) { + return false; + } + } + } + } + } + + // Clean up + return true; + } + + void CloseElf() { + if (lib_elf_) { + elf_end(lib_elf_); + } + if (lib_fd_ > 0) { + posix_->close(lib_fd_); + } + } + + InterceptorApi(const char *symbol_name, + const char *is_intercepted) : is_loaded_(false) { + is_loaded_ = true; + return; + posix_ = hshm::EasySingleton::GetInstance(); + RealApiIter iter(symbol_name, is_intercepted); + dl_iterate_phdr(callback, (void*)&iter); + if (iter.lib_path_) { + lib_path_ = iter.lib_path_; + is_loaded_ = IsLoaded(iter.lib_path_); + } + CloseElf(); + } +}; + } // namespace hermes::adapter +#undef DEPRECATED + #endif // HERMES_ADAPTER_API_H diff --git a/hermes_adapters/stdio/CMakeLists.txt b/hermes_adapters/stdio/CMakeLists.txt index da729b02c..38e981fdd 100644 --- a/hermes_adapters/stdio/CMakeLists.txt +++ b/hermes_adapters/stdio/CMakeLists.txt @@ -11,7 +11,7 @@ set(INTERCEPTOR_DEPS add_library(hermes_stdio SHARED ${CMAKE_CURRENT_SOURCE_DIR}/stdio_api.cc) add_dependencies(hermes_stdio ${INTERCEPTOR_DEPS}) -target_link_libraries(hermes_stdio MPI::MPI_CXX stdc++fs dl ${INTERCEPTOR_DEPS}) +target_link_libraries(hermes_stdio MPI::MPI_CXX stdc++fs ${libelf_LIBRARIES} dl ${INTERCEPTOR_DEPS}) #----------------------------------------------------------------------------- # Add Target(s) to CMake Install diff --git a/hrun/include/hrun/network/local_serialize.h b/hrun/include/hrun/network/local_serialize.h index 2f950ac5a..a95ea7cc3 100644 --- a/hrun/include/hrun/network/local_serialize.h +++ b/hrun/include/hrun/network/local_serialize.h @@ -54,9 +54,17 @@ class LocalSerialize { memcpy(data_.data() + off, &size, sizeof(size_t)); off += sizeof(size_t); memcpy(data_.data() + off, obj.data(), size); + } else if (std::is_enum::value) { + size_t size = sizeof(T); + size_t off = data_.size(); + data_.resize(off + size); + memcpy(data_.data() + off, &obj, size); } else { throw std::runtime_error("Cannot serialize object"); } + + // Check if the type is an enum + return *this; } }; @@ -96,6 +104,9 @@ class LocalDeserialize { off += sizeof(size_t); obj.resize(str_size); memcpy(obj.data(), data_.data() + off, str_size); + } else if (std::is_enum::value) { + size = sizeof(T); + memcpy(&obj, data_.data() + off, size); } else { throw std::runtime_error("Cannot serialize object"); } diff --git a/include/hermes/bucket.h b/include/hermes/bucket.h index effbfda14..2490636ac 100644 --- a/include/hermes/bucket.h +++ b/include/hermes/bucket.h @@ -137,6 +137,13 @@ class Bucket { return bkt_mdm_->GetSizeRoot(id_); } + /** + * Set the current size of the bucket + * */ + void SetSize(size_t new_size) { + bkt_mdm_->AsyncUpdateSizeRoot(id_, new_size, UpdateSizeMode::kCap); + } + /** * Rename this bucket * */ diff --git a/include/hermes/config_client.h b/include/hermes/config_client.h index 0dddf7e89..e91d17c34 100644 --- a/include/hermes/config_client.h +++ b/include/hermes/config_client.h @@ -70,8 +70,8 @@ struct UserPathInfo { /** Detect if a path matches the input path */ bool Match(const std::string &abs_path) { - return std::regex_match(abs_path, regex_); - // return abs_path.rfind(path_) != std::string::npos; + return std::regex_match(abs_path, regex_) || + abs_path.rfind(path_) != std::string::npos; } }; @@ -120,7 +120,7 @@ class ClientConfig : public BaseConfig { void CreateAdapterPathTracking(const std::string &path, bool include) { bool is_dir = stdfs::is_directory(path); path_list_.emplace_back( - stdfs::absolute(path).string(), include, is_dir); + path, include, is_dir); std::sort(path_list_.begin(), path_list_.end(), [](const UserPathInfo &a, diff --git a/include/hermes/config_client_default.h b/include/hermes/config_client_default.h index aff693b94..0a54e592d 100644 --- a/include/hermes/config_client_default.h +++ b/include/hermes/config_client_default.h @@ -2,13 +2,13 @@ #define HRUN_SRC_CONFIG_HERMES_CLIENT_DEFAULT_H_ const inline char* kHermesClientDefaultConfigStr = "stop_daemon: false\n" -"path_inclusions: [\"/tmp/test_hermes\"]\n" -"path_exclusions: [\"/\"]\n" +"path_inclusions: [\"/tmp/test_hermes/*\"]\n" +"path_exclusions: [\"/*\"]\n" "file_page_size: 1024KB\n" "base_adapter_mode: kDefault\n" "flushing_mode: kAsync\n" "file_adapter_configs:\n" -" - path: \"/\"\n" +" - path: \"/*\"\n" " page_size: 1MB\n" " mode: kDefault\n"; #endif // HRUN_SRC_CONFIG_HERMES_CLIENT_DEFAULT_H_ \ No newline at end of file diff --git a/include/hermes/hermes_types.h b/include/hermes/hermes_types.h index de8fd8cbe..29bf36254 100644 --- a/include/hermes/hermes_types.h +++ b/include/hermes/hermes_types.h @@ -332,6 +332,13 @@ struct BlobInfo { } }; +/** The mode used to update size */ +class UpdateSizeMode { + public: + TASK_METHOD_T kAdd = 0; + TASK_METHOD_T kCap = 1; +}; + /** Data structure used to store Bucket information */ struct TagInfo { TagId tag_id_; diff --git a/tasks/data_stager/include/data_stager/data_stager.h b/tasks/data_stager/include/data_stager/data_stager.h index cd50fde1b..5f5902f72 100644 --- a/tasks/data_stager/include/data_stager/data_stager.h +++ b/tasks/data_stager/include/data_stager/data_stager.h @@ -91,19 +91,21 @@ class Client : public TaskLibClient { const TaskNode &task_node, const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) { HRUN_CLIENT->ConstructTask( task, task_node, id_, bkt_id, - blob_name, score, node_id); + blob_name, data_size, score, node_id); } HSHM_ALWAYS_INLINE void StageInRoot(const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) { LPointer> task = - AsyncStageInRoot(bkt_id, blob_name, score, node_id); + AsyncStageInRoot(bkt_id, blob_name, data_size, score, node_id); task.ptr_->Wait(); } HRUN_TASK_NODE_PUSH_ROOT(StageIn); diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 5184c84b5..2a1416c68 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -236,6 +236,7 @@ struct UnregisterStagerTask : public Task, TaskFlags { struct StageInTask : public Task, TaskFlags { IN hermes::BucketId bkt_id_; IN hipc::ShmArchive blob_name_; + IN size_t data_size_; IN float score_; IN u32 node_id_; @@ -250,6 +251,7 @@ struct StageInTask : public Task, TaskFlags { const TaskStateId &state_id, const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) : Task(alloc) { // Initialize task @@ -266,6 +268,7 @@ struct StageInTask : public Task, TaskFlags { HSHM_MAKE_AR(blob_name_, alloc, blob_name); score_ = score; node_id_ = node_id; + data_size_ = data_size; } /** Destructor */ diff --git a/tasks/data_stager/include/data_stager/factory/abstract_stager.h b/tasks/data_stager/include/data_stager/factory/abstract_stager.h index 80e000f20..4a16b77ac 100644 --- a/tasks/data_stager/include/data_stager/factory/abstract_stager.h +++ b/tasks/data_stager/include/data_stager/factory/abstract_stager.h @@ -18,6 +18,13 @@ class AbstractStager { AbstractStager() = default; ~AbstractStager() = default; + /** Build context for staging */ + static Context BuildPutContext() { + Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + return ctx; + } + virtual void RegisterStager(RegisterStagerTask *task, RunContext &rctx) = 0; virtual void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) = 0; virtual void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) = 0; diff --git a/tasks/data_stager/include/data_stager/factory/adios2_stager.h b/tasks/data_stager/include/data_stager/factory/adios2_stager.h new file mode 100644 index 000000000..a17cec82e --- /dev/null +++ b/tasks/data_stager/include/data_stager/factory/adios2_stager.h @@ -0,0 +1,274 @@ +// +// Created by lukemartinlogan on 9/30/23. +// + +#ifndef HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ +#define HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ + +#include "abstract_stager.h" +#include "hermes_adapters/mapper/abstract_mapper.h" +#include + +namespace hermes::data_stager { + +enum class DataType { + kChar, + kShort, + kInt, + kLong, + kFloat, + kDouble +}; + +class Adios2Stager : public AbstractStager { + public: + size_t page_size_; + std::string config_path_; + std::string io_name_; + bitfield32_t flags_; + adios2::ADIOS adios_; + adios2::IO io_; + + public: + /** Default constructor */ + Adios2Stager() = default; + + /** Destructor */ + ~Adios2Stager() {} + + /** Build context for staging */ + static Context BuildContext(const std::string &config_path, + const std::string &io_name, + u32 flags = 0) { + Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + ctx.bkt_params_ = BuildFileParams(config_path, io_name, flags); + return ctx; + } + + /** Build serialized file parameter pack */ + static std::string BuildFileParams(const std::string &config_path, + const std::string &io_name, + u32 flags = 0) { + hshm::charbuf params(4096); + hrun::LocalSerialize srl(params); + srl << std::string("adios2"); + srl << flags; + srl << config_path; + srl << io_name; + return params.str(); + } + + /** Create blob name */ + template + static std::string CreateBlobName(const adios2::Variable &var) { + hshm::charbuf params(4096); + hrun::LocalSerialize srl(params); + srl << var.Name(); + if constexpr (std::is_same::value) { + srl << DataType::kChar; + } else if constexpr (std::is_same::value) { + srl << DataType::kShort; + } else if constexpr (std::is_same::value) { + srl << DataType::kInt; + } else if constexpr (std::is_same::value) { + srl << DataType::kLong; + } else if constexpr (std::is_same::value) { + srl << DataType::kFloat; + } else if constexpr (std::is_same::value) { + srl << DataType::kDouble; + } + SerializeDims(srl, var.Shape()); + SerializeDims(srl, var.Start()); + SerializeDims(srl, var.Count()); + return params.str(); + } + + /** Serialize adios2 dims */ + static void SerializeDims(hrun::LocalSerialize &srl, + const adios2::Dims &dims) { + srl << dims.size(); + for (size_t dim : dims) { + srl << dim; + } + } + + /** Decode blob name */ + static void DecodeBlobName(const std::string &blob_name, + std::string &var_name, + DataType &data_type, + adios2::Dims &shape, + adios2::Dims &start, + adios2::Dims &count) { + hrun::LocalDeserialize srl(blob_name); + srl >> var_name; + srl >> data_type; + DeserializeDims(srl, shape); + DeserializeDims(srl, start); + DeserializeDims(srl, count); + } + + /** Deserialize adios2 dims */ + static void DeserializeDims(hrun::LocalDeserialize &srl, + adios2::Dims &dims) { + size_t dim_size; + srl >> dim_size; + dims.resize(dim_size); + for (size_t i = 0; i < dim_size; ++i) { + srl >> dims[i]; + } + } + + /** Create the data stager payload */ + void RegisterStager(RegisterStagerTask *task, RunContext &rctx) override { + std::string params = task->params_->str(); + std::string protocol; + hrun::LocalDeserialize srl(params); + srl >> protocol; + srl >> flags_.bits_; + srl >> config_path_; + srl >> io_name_; + path_ = task->tag_name_->str(); + adios_ = adios2::ADIOS(config_path_); + io_ = adios_.DeclareIO(io_name_); + } + + /** Stage data in from remote source */ + void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_READ)) { + return; + } + std::string blob_name = task->blob_name_->str(); + + // Read blob from PFS + try { + std::string var_name; + DataType data_type; + adios2::Dims shape, start, count; + DecodeBlobName(blob_name, var_name, data_type, + shape, start, count); + adios2::Engine reader = io_.Open(path_, adios2::Mode::Read); + LPointer blob = + HRUN_CLIENT->AllocateBufferServer(task->data_size_); + switch (data_type) { + case DataType::kChar: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (char *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kShort: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (short *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kInt: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (int *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kLong: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (long *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kFloat: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (float *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kDouble: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (double *) blob.ptr_, adios2::Mode::Sync); + break; + } + } + + // Write blob to hermes + HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})", + task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) + hapi::Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + LPointer put_task = + blob_mdm.AsyncPutBlob(task->task_node_ + 1, + task->bkt_id_, + hshm::to_charbuf(*task->blob_name_), + hermes::BlobId::GetNull(), + 0, task->data_size_, blob.shm_, task->score_, 0, + ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); + put_task->Wait(task); + HRUN_CLIENT->DelTask(put_task); + } catch (...) { + } + } + + /** Stage data out to remote source */ + void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_WRITE)) { + return; + } + std::string blob_name = task->blob_name_->str(); + + // Read variable info from PFS + std::string var_name; + DataType data_type; + adios2::Dims shape, start, count; + DecodeBlobName(blob_name, var_name, data_type, + shape, start, count); + adios2::Engine writer = io_.Open(path_, adios2::Mode::Write); + char *data = HRUN_CLIENT->GetDataPointer(task->data_); + switch (data_type) { + case DataType::kChar: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (char*)data, adios2::Mode::Sync); + break; + } + case DataType::kShort: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (short*)data, adios2::Mode::Sync); + break; + } + case DataType::kInt: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (int*)data, adios2::Mode::Sync); + break; + } + case DataType::kLong: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (long*)data, adios2::Mode::Sync); + break; + } + case DataType::kFloat: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (float*)data, adios2::Mode::Sync); + break; + } + case DataType::kDouble: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (double*)data, adios2::Mode::Sync); + break; + } + } + HILOG(kDebug, "Staged out {} bytes to the backend file {}", + task->data_size_, path_); + } + + void UpdateSize(bucket_mdm::Client &bkt_mdm, UpdateSizeTask *task, RunContext &rctx) override { + // TODO(llogan) + } +}; + +} // namespace hermes::data_stager + +#endif // HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index cbc921dc7..b37e38d30 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -139,7 +139,7 @@ class BinaryFileStager : public AbstractStager { bkt_mdm.AsyncUpdateSize(task->task_node_ + 1, task->bkt_id_, p.bucket_off_ + task->blob_off_ + task->data_size_, - bucket_mdm::UpdateSizeMode::kCap); + UpdateSizeMode::kCap); } }; diff --git a/tasks/data_stager/include/data_stager/factory/stager_factory.h b/tasks/data_stager/include/data_stager/factory/stager_factory.h index ba37e5282..5bdd35fdf 100644 --- a/tasks/data_stager/include/data_stager/factory/stager_factory.h +++ b/tasks/data_stager/include/data_stager/factory/stager_factory.h @@ -8,6 +8,10 @@ #include "../data_stager.h" #include "abstract_stager.h" #include "binary_stager.h" +#ifdef HERMES_ENABLE_ADIOS +#include "adios2.h" +#include "adios2_stager.h" +#endif namespace hermes::data_stager { @@ -24,6 +28,10 @@ class StagerFactory { stager = std::make_unique(); } else if (protocol == "parquet") { } else if (protocol == "hdf5") { + } else if (protocol == "adios2") { +#ifdef HERMES_ENABLE_ADIOS + stager = std::make_unique(); +#endif } else { throw std::runtime_error("Unknown stager type"); } diff --git a/tasks/data_stager/src/CMakeLists.txt b/tasks/data_stager/src/CMakeLists.txt index 07eca417a..0ca022276 100644 --- a/tasks/data_stager/src/CMakeLists.txt +++ b/tasks/data_stager/src/CMakeLists.txt @@ -1,3 +1,4 @@ + #------------------------------------------------------------------------------ # Build Small Message Task Library #------------------------------------------------------------------------------ diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 1aa9e3bfe..f169381e0 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -362,6 +362,7 @@ class Server : public TaskLib { stager_mdm_.AsyncStageIn(task->task_node_ + 1, task->tag_id_, blob_info.name_, + task->data_size_, task->score_, 0); stage_task->Wait(task); blob_info.mod_count_ = 1; @@ -480,7 +481,7 @@ class Server : public TaskLib { bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, task->tag_id_, bkt_size_diff, - bucket_mdm::UpdateSizeMode::kAdd); + UpdateSizeMode::kAdd); } if (task->flags_.Any(HERMES_BLOB_DID_CREATE)) { bkt_mdm_.AsyncTagAddBlob(task->task_node_ + 1, @@ -536,6 +537,7 @@ class Server : public TaskLib { stager_mdm_.AsyncStageIn(task->task_node_ + 1, task->tag_id_, blob_info.name_, + task->data_size_, 1, 0); stage_task->Wait(task); HRUN_CLIENT->DelTask(stage_task); @@ -841,7 +843,7 @@ class Server : public TaskLib { bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, task->tag_id_, -(ssize_t) blob_info.blob_size_, - bucket_mdm::UpdateSizeMode::kAdd); + UpdateSizeMode::kAdd); } HSHM_DESTROY_AR(task->free_tasks_); blob_map.erase(task->blob_id_); diff --git a/tasks/hermes_bucket_mdm/include/hermes_bucket_mdm/hermes_bucket_mdm_tasks.h b/tasks/hermes_bucket_mdm/include/hermes_bucket_mdm/hermes_bucket_mdm_tasks.h index 489ecadd0..8446c1ce5 100644 --- a/tasks/hermes_bucket_mdm/include/hermes_bucket_mdm/hermes_bucket_mdm_tasks.h +++ b/tasks/hermes_bucket_mdm/include/hermes_bucket_mdm/hermes_bucket_mdm_tasks.h @@ -134,12 +134,6 @@ struct SetBlobMdmTask : public Task, TaskFlags { void ReplicateEnd() {} }; -class UpdateSizeMode { - public: - TASK_METHOD_T kAdd = 0; - TASK_METHOD_T kCap = 1; -}; - /** Update bucket size */ struct UpdateSizeTask : public Task, TaskFlags { IN TagId tag_id_; diff --git a/test/unit/hermes/adios2.xml b/test/unit/hermes/adios2.xml new file mode 100644 index 000000000..cf1b98090 --- /dev/null +++ b/test/unit/hermes/adios2.xml @@ -0,0 +1,142 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index f713999b3..91c0239e4 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -15,7 +15,7 @@ #include "hrun_admin/hrun_admin.h" #include "hermes/hermes.h" #include "hermes/bucket.h" -#include "data_stager/factory/binary_stager.h" +#include "data_stager/factory/stager_factory.h" #include TEST_CASE("TestHermesConnect") { @@ -524,6 +524,83 @@ TEST_CASE("TestHermesDataStager") { HILOG(kInfo, "Flushing finished") } +#ifdef HERMES_ENABLE_ADIOS +TEST_CASE("TestHermesAdios2") { + int rank, nprocs; + MPI_Barrier(MPI_COMM_WORLD); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + + // create dataset + std::string config_path = "/home/llogan/Documents/Projects/hermes/test/unit/hermes/adios2.xml"; + std::string home_dir = getenv("HOME"); + std::string path = home_dir + "/test.bp"; + size_t count_per_proc = 16; + size_t off = rank * count_per_proc; + size_t proc_count = off + count_per_proc; + size_t var_count = KILOBYTES(1); + size_t var_size = var_count * sizeof(double); + + // Create an ADIOS file + adios2::ADIOS adios; + + // Initialize Hermes on all nodes + HERMES->ClientInit(); + + // ADIOS2 Hermes Context + using hermes::data_stager::Adios2Stager; + hermes::Context bkt_ctx = Adios2Stager::BuildContext( + config_path, "SimulationOutput", 0); + adios2::IO io = adios.DeclareIO("SimulationOutput"); + + // Create a stageable bucket + hermes::Bucket write_bkt(path, bkt_ctx); + + // Put a few blobs in the bucket + for (size_t i = off; i < proc_count; ++i) { + HILOG(kInfo, "Iteration: {}", i); + std::string var_name = "var" + std::to_string(i); + adios2::Variable ad_var = + io.DefineVariable(var_name, + {var_count, 1, 1}, + {0, 0, 0}, + {var_count, 1, 1}); + hermes::Context blob_ctx = Adios2Stager::BuildPutContext(); + std::vector var(var_count, (double)i); + hermes::Blob blob(var_size); + memcpy(blob.data(), var.data(), blob.size()); + std::string blob_name = Adios2Stager::CreateBlobName(ad_var); + write_bkt.Put(blob_name, blob, blob_ctx); + } + MPI_Barrier(MPI_COMM_WORLD); + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + + // Destroy bucket + write_bkt.Destroy(); + MPI_Barrier(MPI_COMM_WORLD); + if (rank == 0) { HILOG(kInfo, "Flushing (write) began"); } + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + if (rank == 0) { HILOG(kInfo, "Flushing (write) done"); } + + // Re-create bucket + read + hermes::Bucket read_bkt(path, bkt_ctx); + for (size_t i = off; i < proc_count; ++i) { + std::string var_name = "var" + std::to_string(i); + adios2::Variable ad_var = + io.InquireVariable(var_name); + std::string blob_name = Adios2Stager::CreateBlobName(ad_var); + hermes::Blob blob; + read_bkt.Get(blob_name, blob, bkt_ctx); + std::vector var(var_count, (double)i); + REQUIRE(blob.size() == var_size); + REQUIRE(memcmp(blob.data(), var.data(), blob.size()) == 0); + } + if (rank == 0) { HILOG(kInfo, "Flushing (read) began"); } + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + if (rank == 0) { HILOG(kInfo, "Flushing (read) done"); } +} +#endif + TEST_CASE("TestHermesDataOp") { int rank, nprocs; MPI_Barrier(MPI_COMM_WORLD); diff --git a/test/unit/hermes_adapters/vfd/CMakeLists.txt b/test/unit/hermes_adapters/vfd/CMakeLists.txt index 030843d1e..065f3b085 100644 --- a/test/unit/hermes_adapters/vfd/CMakeLists.txt +++ b/test/unit/hermes_adapters/vfd/CMakeLists.txt @@ -23,4 +23,17 @@ target_link_libraries(hermes_vfd_adapter_test ${HDF5_HERMES_VFD_EXT_LIB_DEPENDENCIES}) jarvis_test(vfd test_hermes_vfd_basic) -jarvis_test(vfd test_hermes_vfd_scratch) \ No newline at end of file +jarvis_test(vfd test_hermes_vfd_scratch) + + +install( + TARGETS + hermes_vfd_adapter_test + LIBRARY DESTINATION ${HERMES_INSTALL_LIB_DIR} + ARCHIVE DESTINATION ${HERMES_INSTALL_LIB_DIR} + RUNTIME DESTINATION ${HERMES_INSTALL_BIN_DIR} +) + +file (COPY ${CMAKE_CURRENT_SOURCE_DIR}/hermes_vfd_py_test.py + DESTINATION ${HERMES_INSTALL_BIN_DIR} + FILE_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ) \ No newline at end of file diff --git a/test/unit/hermes_adapters/vfd/hermes_vfd_py_test.py b/test/unit/hermes_adapters/vfd/hermes_vfd_py_test.py new file mode 100755 index 000000000..60daf3c91 --- /dev/null +++ b/test/unit/hermes_adapters/vfd/hermes_vfd_py_test.py @@ -0,0 +1,28 @@ +import h5py +import numpy as np + +path = '/tmp/hello.h5' + +def write_phase(): + f = h5py.File(path, "w") + prefix = 'train' + for p in prefix: + group = f.create_group(p) + story_dict = {0:1, 1:2, 2:3} + length = len(story_dict) + images = list() + for i in range(5): + images.append( + group.create_dataset('image{}'.format(i), (length,), dtype=h5py.vlen_dtype(np.dtype('uint8')))) + sis = group.create_dataset('sis', (length,), dtype=h5py.string_dtype(encoding='utf-8')) + dii = group.create_dataset('dii', (length,), dtype=h5py.string_dtype(encoding='utf-8')) + f.close() + + +def read_phase(): + h5 = h5py.File(path, "r") + print(h5) + h5 = h5['train'] + +write_phase() +read_phase() diff --git a/test/unit/pipelines/hermes/test_adios2.yaml b/test/unit/pipelines/hermes/test_adios2.yaml new file mode 100644 index 000000000..a7cb4c6e3 --- /dev/null +++ b/test/unit/pipelines/hermes/test_adios2.yaml @@ -0,0 +1,14 @@ +name: hermes_unit_hermes_mpiio_basic_large +env: hermes +pkgs: + - pkg_type: hermes_run + pkg_name: hermes_run + ram: 16m + sleep: 5 + do_dbg: true + dbg_port: 4000 + - pkg_type: hermes_unit_tests + pkg_name: hermes_unit_tests + TEST_CASE: TestHermesAdios2 + do_dbg: true + dbg_port: 4001 diff --git a/test/unit/pipelines/vfd/test_hermes_vfd_python.yaml b/test/unit/pipelines/vfd/test_hermes_vfd_python.yaml new file mode 100644 index 000000000..f87d650e2 --- /dev/null +++ b/test/unit/pipelines/vfd/test_hermes_vfd_python.yaml @@ -0,0 +1,16 @@ +name: hermes_unit_hermes_vfd_basic +env: hermes +pkgs: + - pkg_type: hermes_run + pkg_name: hermes_run + sleep: 5 + - pkg_type: hermes_api + pkg_name: hermes_api + vfd: true + - pkg_type: hermes_vfd_tests + pkg_name: hermes_vfd_tests + test_file: vfd_py_test + hermes: true + mode: default + dbg_port: 4001 + do_dbg: false diff --git a/test/unit/pipelines/vfd/test_vfd_python.yaml b/test/unit/pipelines/vfd/test_vfd_python.yaml new file mode 100644 index 000000000..1eab45166 --- /dev/null +++ b/test/unit/pipelines/vfd/test_vfd_python.yaml @@ -0,0 +1,8 @@ +name: hermes_unit_vfd_python +env: hermes +pkgs: + - pkg_type: hermes_vfd_tests + pkg_name: hermes_vfd_tests + test_file: vfd_py_test + hermes: true + mode: default