From f51ee80eb08ba804cf09392c4932e8b8140aed0e Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Mon, 3 Jun 2024 11:51:20 -0500 Subject: [PATCH] Adios2 --- CMake/HermesConfig.cmake | 10 + CMakeLists.txt | 14 +- hrun/include/hrun/network/local_serialize.h | 11 + .../include/data_stager/data_stager.h | 6 +- .../include/data_stager/data_stager_tasks.h | 3 + .../data_stager/factory/abstract_stager.h | 7 + .../data_stager/factory/adios2_stager.h | 274 ++++++++++++++++++ .../data_stager/factory/stager_factory.h | 8 + tasks/data_stager/src/CMakeLists.txt | 1 + tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 + test/unit/hermes/adios2.xml | 142 +++++++++ test/unit/hermes/test_bucket.cc | 79 ++++- test/unit/pipelines/hermes/test_adios2.yaml | 14 + 13 files changed, 566 insertions(+), 5 deletions(-) create mode 100644 tasks/data_stager/include/data_stager/factory/adios2_stager.h create mode 100644 test/unit/hermes/adios2.xml create mode 100644 test/unit/pipelines/hermes/test_adios2.yaml diff --git a/CMake/HermesConfig.cmake b/CMake/HermesConfig.cmake index 60e17f1eb..737992aa5 100644 --- a/CMake/HermesConfig.cmake +++ b/CMake/HermesConfig.cmake @@ -91,6 +91,15 @@ if(thallium_FOUND) message(STATUS "found thallium at ${thallium_DIR}") endif() +# ADIOS +if(HERMES_ENABLE_ADIOS) + find_package(ADIOS2 REQUIRED) + message(STATUS "found adios2") + include_directories(${ADIOS2_INCLUDE_DIRS}) + link_directories(${ADIOS2_LIBRARY_DIRS}) + add_compile_definitions(HERMES_ENABLE_ADIOS) +endif() + #----------------------------------------------------------------------------- # Mark hermes as found and set all needed packages #----------------------------------------------------------------------------- @@ -107,6 +116,7 @@ set(Hermes_LIBRARIES -ldl -lrt -lc -pthread thallium hermes + ${ADIOS2_LIBRARIES} ${Boost_LIBRARIES} ${Hermes_LIBRARY}) set(Hermes_LIBRARY_DIRS ${HermeShm_LIBRARY_DIRS}) # Set Hermes client dirs (equal to Hermes dirs) diff --git a/CMakeLists.txt b/CMakeLists.txt index d4eff7000..947b22eb9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -206,6 +206,14 @@ if(HERMES_ENABLE_VFD) endif() endif() +# ADIOS +if(HERMES_ENABLE_ADIOS) + find_package(ADIOS2 REQUIRED) + message(STATUS "found adios2 at ${ADIOS2_INCLUDE_DIRS}") + include_directories(${ADIOS2_INCLUDE_DIRS}) + link_directories(${ADIOS2_LIBRARY_DIRS}) + add_compile_definitions(HERMES_ENABLE_ADIOS) +endif() #------------------------------------------------------------------------------ # Setup CMake Environment @@ -270,13 +278,15 @@ set(Hermes_CLIENT_LIBRARIES ${HermesShm_LIBRARIES} yaml-cpp cereal::cereal - -ldl -lrt -lc -pthread hrun_client) + -ldl -lrt -lc -pthread hrun_client + ${ADIOS2_LIBRARIES}) set(Hermes_CLIENT_DEPS hrun_client) set(Hermes_RUNTIME_LIBRARIES ${Hermes_CLIENT_LIBRARIES} hrun_runtime - ${Boost_LIBRARIES}) + ${Boost_LIBRARIES} + ${ADIOS2_LIBRARIES}) set(Hermes_RUNTIME_DEPS hrun_client hrun_runtime) diff --git a/hrun/include/hrun/network/local_serialize.h b/hrun/include/hrun/network/local_serialize.h index 2f950ac5a..a95ea7cc3 100644 --- a/hrun/include/hrun/network/local_serialize.h +++ b/hrun/include/hrun/network/local_serialize.h @@ -54,9 +54,17 @@ class LocalSerialize { memcpy(data_.data() + off, &size, sizeof(size_t)); off += sizeof(size_t); memcpy(data_.data() + off, obj.data(), size); + } else if (std::is_enum::value) { + size_t size = sizeof(T); + size_t off = data_.size(); + data_.resize(off + size); + memcpy(data_.data() + off, &obj, size); } else { throw std::runtime_error("Cannot serialize object"); } + + // Check if the type is an enum + return *this; } }; @@ -96,6 +104,9 @@ class LocalDeserialize { off += sizeof(size_t); obj.resize(str_size); memcpy(obj.data(), data_.data() + off, str_size); + } else if (std::is_enum::value) { + size = sizeof(T); + memcpy(&obj, data_.data() + off, size); } else { throw std::runtime_error("Cannot serialize object"); } diff --git a/tasks/data_stager/include/data_stager/data_stager.h b/tasks/data_stager/include/data_stager/data_stager.h index cd50fde1b..5f5902f72 100644 --- a/tasks/data_stager/include/data_stager/data_stager.h +++ b/tasks/data_stager/include/data_stager/data_stager.h @@ -91,19 +91,21 @@ class Client : public TaskLibClient { const TaskNode &task_node, const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) { HRUN_CLIENT->ConstructTask( task, task_node, id_, bkt_id, - blob_name, score, node_id); + blob_name, data_size, score, node_id); } HSHM_ALWAYS_INLINE void StageInRoot(const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) { LPointer> task = - AsyncStageInRoot(bkt_id, blob_name, score, node_id); + AsyncStageInRoot(bkt_id, blob_name, data_size, score, node_id); task.ptr_->Wait(); } HRUN_TASK_NODE_PUSH_ROOT(StageIn); diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 5184c84b5..2a1416c68 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -236,6 +236,7 @@ struct UnregisterStagerTask : public Task, TaskFlags { struct StageInTask : public Task, TaskFlags { IN hermes::BucketId bkt_id_; IN hipc::ShmArchive blob_name_; + IN size_t data_size_; IN float score_; IN u32 node_id_; @@ -250,6 +251,7 @@ struct StageInTask : public Task, TaskFlags { const TaskStateId &state_id, const BucketId &bkt_id, const hshm::charbuf &blob_name, + size_t data_size, float score, u32 node_id) : Task(alloc) { // Initialize task @@ -266,6 +268,7 @@ struct StageInTask : public Task, TaskFlags { HSHM_MAKE_AR(blob_name_, alloc, blob_name); score_ = score; node_id_ = node_id; + data_size_ = data_size; } /** Destructor */ diff --git a/tasks/data_stager/include/data_stager/factory/abstract_stager.h b/tasks/data_stager/include/data_stager/factory/abstract_stager.h index 80e000f20..4a16b77ac 100644 --- a/tasks/data_stager/include/data_stager/factory/abstract_stager.h +++ b/tasks/data_stager/include/data_stager/factory/abstract_stager.h @@ -18,6 +18,13 @@ class AbstractStager { AbstractStager() = default; ~AbstractStager() = default; + /** Build context for staging */ + static Context BuildPutContext() { + Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + return ctx; + } + virtual void RegisterStager(RegisterStagerTask *task, RunContext &rctx) = 0; virtual void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) = 0; virtual void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) = 0; diff --git a/tasks/data_stager/include/data_stager/factory/adios2_stager.h b/tasks/data_stager/include/data_stager/factory/adios2_stager.h new file mode 100644 index 000000000..a17cec82e --- /dev/null +++ b/tasks/data_stager/include/data_stager/factory/adios2_stager.h @@ -0,0 +1,274 @@ +// +// Created by lukemartinlogan on 9/30/23. +// + +#ifndef HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ +#define HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ + +#include "abstract_stager.h" +#include "hermes_adapters/mapper/abstract_mapper.h" +#include + +namespace hermes::data_stager { + +enum class DataType { + kChar, + kShort, + kInt, + kLong, + kFloat, + kDouble +}; + +class Adios2Stager : public AbstractStager { + public: + size_t page_size_; + std::string config_path_; + std::string io_name_; + bitfield32_t flags_; + adios2::ADIOS adios_; + adios2::IO io_; + + public: + /** Default constructor */ + Adios2Stager() = default; + + /** Destructor */ + ~Adios2Stager() {} + + /** Build context for staging */ + static Context BuildContext(const std::string &config_path, + const std::string &io_name, + u32 flags = 0) { + Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + ctx.bkt_params_ = BuildFileParams(config_path, io_name, flags); + return ctx; + } + + /** Build serialized file parameter pack */ + static std::string BuildFileParams(const std::string &config_path, + const std::string &io_name, + u32 flags = 0) { + hshm::charbuf params(4096); + hrun::LocalSerialize srl(params); + srl << std::string("adios2"); + srl << flags; + srl << config_path; + srl << io_name; + return params.str(); + } + + /** Create blob name */ + template + static std::string CreateBlobName(const adios2::Variable &var) { + hshm::charbuf params(4096); + hrun::LocalSerialize srl(params); + srl << var.Name(); + if constexpr (std::is_same::value) { + srl << DataType::kChar; + } else if constexpr (std::is_same::value) { + srl << DataType::kShort; + } else if constexpr (std::is_same::value) { + srl << DataType::kInt; + } else if constexpr (std::is_same::value) { + srl << DataType::kLong; + } else if constexpr (std::is_same::value) { + srl << DataType::kFloat; + } else if constexpr (std::is_same::value) { + srl << DataType::kDouble; + } + SerializeDims(srl, var.Shape()); + SerializeDims(srl, var.Start()); + SerializeDims(srl, var.Count()); + return params.str(); + } + + /** Serialize adios2 dims */ + static void SerializeDims(hrun::LocalSerialize &srl, + const adios2::Dims &dims) { + srl << dims.size(); + for (size_t dim : dims) { + srl << dim; + } + } + + /** Decode blob name */ + static void DecodeBlobName(const std::string &blob_name, + std::string &var_name, + DataType &data_type, + adios2::Dims &shape, + adios2::Dims &start, + adios2::Dims &count) { + hrun::LocalDeserialize srl(blob_name); + srl >> var_name; + srl >> data_type; + DeserializeDims(srl, shape); + DeserializeDims(srl, start); + DeserializeDims(srl, count); + } + + /** Deserialize adios2 dims */ + static void DeserializeDims(hrun::LocalDeserialize &srl, + adios2::Dims &dims) { + size_t dim_size; + srl >> dim_size; + dims.resize(dim_size); + for (size_t i = 0; i < dim_size; ++i) { + srl >> dims[i]; + } + } + + /** Create the data stager payload */ + void RegisterStager(RegisterStagerTask *task, RunContext &rctx) override { + std::string params = task->params_->str(); + std::string protocol; + hrun::LocalDeserialize srl(params); + srl >> protocol; + srl >> flags_.bits_; + srl >> config_path_; + srl >> io_name_; + path_ = task->tag_name_->str(); + adios_ = adios2::ADIOS(config_path_); + io_ = adios_.DeclareIO(io_name_); + } + + /** Stage data in from remote source */ + void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_READ)) { + return; + } + std::string blob_name = task->blob_name_->str(); + + // Read blob from PFS + try { + std::string var_name; + DataType data_type; + adios2::Dims shape, start, count; + DecodeBlobName(blob_name, var_name, data_type, + shape, start, count); + adios2::Engine reader = io_.Open(path_, adios2::Mode::Read); + LPointer blob = + HRUN_CLIENT->AllocateBufferServer(task->data_size_); + switch (data_type) { + case DataType::kChar: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (char *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kShort: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (short *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kInt: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (int *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kLong: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (long *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kFloat: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (float *) blob.ptr_, adios2::Mode::Sync); + break; + } + case DataType::kDouble: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + reader.Get(var, (double *) blob.ptr_, adios2::Mode::Sync); + break; + } + } + + // Write blob to hermes + HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})", + task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) + hapi::Context ctx; + ctx.flags_.SetBits(HERMES_SHOULD_STAGE); + LPointer put_task = + blob_mdm.AsyncPutBlob(task->task_node_ + 1, + task->bkt_id_, + hshm::to_charbuf(*task->blob_name_), + hermes::BlobId::GetNull(), + 0, task->data_size_, blob.shm_, task->score_, 0, + ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); + put_task->Wait(task); + HRUN_CLIENT->DelTask(put_task); + } catch (...) { + } + } + + /** Stage data out to remote source */ + void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) override { + if (flags_.Any(HERMES_STAGE_NO_WRITE)) { + return; + } + std::string blob_name = task->blob_name_->str(); + + // Read variable info from PFS + std::string var_name; + DataType data_type; + adios2::Dims shape, start, count; + DecodeBlobName(blob_name, var_name, data_type, + shape, start, count); + adios2::Engine writer = io_.Open(path_, adios2::Mode::Write); + char *data = HRUN_CLIENT->GetDataPointer(task->data_); + switch (data_type) { + case DataType::kChar: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (char*)data, adios2::Mode::Sync); + break; + } + case DataType::kShort: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (short*)data, adios2::Mode::Sync); + break; + } + case DataType::kInt: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (int*)data, adios2::Mode::Sync); + break; + } + case DataType::kLong: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (long*)data, adios2::Mode::Sync); + break; + } + case DataType::kFloat: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (float*)data, adios2::Mode::Sync); + break; + } + case DataType::kDouble: { + adios2::Variable var = io_.DefineVariable( + var_name, shape, start, count); + writer.Put(var, (double*)data, adios2::Mode::Sync); + break; + } + } + HILOG(kDebug, "Staged out {} bytes to the backend file {}", + task->data_size_, path_); + } + + void UpdateSize(bucket_mdm::Client &bkt_mdm, UpdateSizeTask *task, RunContext &rctx) override { + // TODO(llogan) + } +}; + +} // namespace hermes::data_stager + +#endif // HERMES_TASKS_DATA_STAGER_SRC_Adios2_STAGER_H_ diff --git a/tasks/data_stager/include/data_stager/factory/stager_factory.h b/tasks/data_stager/include/data_stager/factory/stager_factory.h index ba37e5282..5bdd35fdf 100644 --- a/tasks/data_stager/include/data_stager/factory/stager_factory.h +++ b/tasks/data_stager/include/data_stager/factory/stager_factory.h @@ -8,6 +8,10 @@ #include "../data_stager.h" #include "abstract_stager.h" #include "binary_stager.h" +#ifdef HERMES_ENABLE_ADIOS +#include "adios2.h" +#include "adios2_stager.h" +#endif namespace hermes::data_stager { @@ -24,6 +28,10 @@ class StagerFactory { stager = std::make_unique(); } else if (protocol == "parquet") { } else if (protocol == "hdf5") { + } else if (protocol == "adios2") { +#ifdef HERMES_ENABLE_ADIOS + stager = std::make_unique(); +#endif } else { throw std::runtime_error("Unknown stager type"); } diff --git a/tasks/data_stager/src/CMakeLists.txt b/tasks/data_stager/src/CMakeLists.txt index 07eca417a..0ca022276 100644 --- a/tasks/data_stager/src/CMakeLists.txt +++ b/tasks/data_stager/src/CMakeLists.txt @@ -1,3 +1,4 @@ + #------------------------------------------------------------------------------ # Build Small Message Task Library #------------------------------------------------------------------------------ diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index f3055c013..f169381e0 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -362,6 +362,7 @@ class Server : public TaskLib { stager_mdm_.AsyncStageIn(task->task_node_ + 1, task->tag_id_, blob_info.name_, + task->data_size_, task->score_, 0); stage_task->Wait(task); blob_info.mod_count_ = 1; @@ -536,6 +537,7 @@ class Server : public TaskLib { stager_mdm_.AsyncStageIn(task->task_node_ + 1, task->tag_id_, blob_info.name_, + task->data_size_, 1, 0); stage_task->Wait(task); HRUN_CLIENT->DelTask(stage_task); diff --git a/test/unit/hermes/adios2.xml b/test/unit/hermes/adios2.xml new file mode 100644 index 000000000..cf1b98090 --- /dev/null +++ b/test/unit/hermes/adios2.xml @@ -0,0 +1,142 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index f713999b3..91c0239e4 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -15,7 +15,7 @@ #include "hrun_admin/hrun_admin.h" #include "hermes/hermes.h" #include "hermes/bucket.h" -#include "data_stager/factory/binary_stager.h" +#include "data_stager/factory/stager_factory.h" #include TEST_CASE("TestHermesConnect") { @@ -524,6 +524,83 @@ TEST_CASE("TestHermesDataStager") { HILOG(kInfo, "Flushing finished") } +#ifdef HERMES_ENABLE_ADIOS +TEST_CASE("TestHermesAdios2") { + int rank, nprocs; + MPI_Barrier(MPI_COMM_WORLD); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + + // create dataset + std::string config_path = "/home/llogan/Documents/Projects/hermes/test/unit/hermes/adios2.xml"; + std::string home_dir = getenv("HOME"); + std::string path = home_dir + "/test.bp"; + size_t count_per_proc = 16; + size_t off = rank * count_per_proc; + size_t proc_count = off + count_per_proc; + size_t var_count = KILOBYTES(1); + size_t var_size = var_count * sizeof(double); + + // Create an ADIOS file + adios2::ADIOS adios; + + // Initialize Hermes on all nodes + HERMES->ClientInit(); + + // ADIOS2 Hermes Context + using hermes::data_stager::Adios2Stager; + hermes::Context bkt_ctx = Adios2Stager::BuildContext( + config_path, "SimulationOutput", 0); + adios2::IO io = adios.DeclareIO("SimulationOutput"); + + // Create a stageable bucket + hermes::Bucket write_bkt(path, bkt_ctx); + + // Put a few blobs in the bucket + for (size_t i = off; i < proc_count; ++i) { + HILOG(kInfo, "Iteration: {}", i); + std::string var_name = "var" + std::to_string(i); + adios2::Variable ad_var = + io.DefineVariable(var_name, + {var_count, 1, 1}, + {0, 0, 0}, + {var_count, 1, 1}); + hermes::Context blob_ctx = Adios2Stager::BuildPutContext(); + std::vector var(var_count, (double)i); + hermes::Blob blob(var_size); + memcpy(blob.data(), var.data(), blob.size()); + std::string blob_name = Adios2Stager::CreateBlobName(ad_var); + write_bkt.Put(blob_name, blob, blob_ctx); + } + MPI_Barrier(MPI_COMM_WORLD); + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + + // Destroy bucket + write_bkt.Destroy(); + MPI_Barrier(MPI_COMM_WORLD); + if (rank == 0) { HILOG(kInfo, "Flushing (write) began"); } + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + if (rank == 0) { HILOG(kInfo, "Flushing (write) done"); } + + // Re-create bucket + read + hermes::Bucket read_bkt(path, bkt_ctx); + for (size_t i = off; i < proc_count; ++i) { + std::string var_name = "var" + std::to_string(i); + adios2::Variable ad_var = + io.InquireVariable(var_name); + std::string blob_name = Adios2Stager::CreateBlobName(ad_var); + hermes::Blob blob; + read_bkt.Get(blob_name, blob, bkt_ctx); + std::vector var(var_count, (double)i); + REQUIRE(blob.size() == var_size); + REQUIRE(memcmp(blob.data(), var.data(), blob.size()) == 0); + } + if (rank == 0) { HILOG(kInfo, "Flushing (read) began"); } + HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); + if (rank == 0) { HILOG(kInfo, "Flushing (read) done"); } +} +#endif + TEST_CASE("TestHermesDataOp") { int rank, nprocs; MPI_Barrier(MPI_COMM_WORLD); diff --git a/test/unit/pipelines/hermes/test_adios2.yaml b/test/unit/pipelines/hermes/test_adios2.yaml new file mode 100644 index 000000000..a7cb4c6e3 --- /dev/null +++ b/test/unit/pipelines/hermes/test_adios2.yaml @@ -0,0 +1,14 @@ +name: hermes_unit_hermes_mpiio_basic_large +env: hermes +pkgs: + - pkg_type: hermes_run + pkg_name: hermes_run + ram: 16m + sleep: 5 + do_dbg: true + dbg_port: 4000 + - pkg_type: hermes_unit_tests + pkg_name: hermes_unit_tests + TEST_CASE: TestHermesAdios2 + do_dbg: true + dbg_port: 4001