Skip to content

Commit

Permalink
Adios2
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemartinlogan committed Jun 3, 2024
1 parent d8e3632 commit f51ee80
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 5 deletions.
10 changes: 10 additions & 0 deletions CMake/HermesConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ if(thallium_FOUND)
message(STATUS "found thallium at ${thallium_DIR}")
endif()

# ADIOS
if(HERMES_ENABLE_ADIOS)
find_package(ADIOS2 REQUIRED)
message(STATUS "found adios2")
include_directories(${ADIOS2_INCLUDE_DIRS})
link_directories(${ADIOS2_LIBRARY_DIRS})
add_compile_definitions(HERMES_ENABLE_ADIOS)
endif()

#-----------------------------------------------------------------------------
# Mark hermes as found and set all needed packages
#-----------------------------------------------------------------------------
Expand All @@ -107,6 +116,7 @@ set(Hermes_LIBRARIES
-ldl -lrt -lc -pthread
thallium
hermes
${ADIOS2_LIBRARIES}
${Boost_LIBRARIES} ${Hermes_LIBRARY})
set(Hermes_LIBRARY_DIRS ${HermeShm_LIBRARY_DIRS})
# Set Hermes client dirs (equal to Hermes dirs)
Expand Down
14 changes: 12 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ if(HERMES_ENABLE_VFD)
endif()
endif()

# ADIOS
if(HERMES_ENABLE_ADIOS)
find_package(ADIOS2 REQUIRED)
message(STATUS "found adios2 at ${ADIOS2_INCLUDE_DIRS}")
include_directories(${ADIOS2_INCLUDE_DIRS})
link_directories(${ADIOS2_LIBRARY_DIRS})
add_compile_definitions(HERMES_ENABLE_ADIOS)
endif()

#------------------------------------------------------------------------------
# Setup CMake Environment
Expand Down Expand Up @@ -270,13 +278,15 @@ set(Hermes_CLIENT_LIBRARIES
${HermesShm_LIBRARIES}
yaml-cpp
cereal::cereal
-ldl -lrt -lc -pthread hrun_client)
-ldl -lrt -lc -pthread hrun_client
${ADIOS2_LIBRARIES})
set(Hermes_CLIENT_DEPS
hrun_client)
set(Hermes_RUNTIME_LIBRARIES
${Hermes_CLIENT_LIBRARIES}
hrun_runtime
${Boost_LIBRARIES})
${Boost_LIBRARIES}
${ADIOS2_LIBRARIES})
set(Hermes_RUNTIME_DEPS
hrun_client hrun_runtime)

Expand Down
11 changes: 11 additions & 0 deletions hrun/include/hrun/network/local_serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ class LocalSerialize {
memcpy(data_.data() + off, &size, sizeof(size_t));
off += sizeof(size_t);
memcpy(data_.data() + off, obj.data(), size);
} else if (std::is_enum<T>::value) {
size_t size = sizeof(T);
size_t off = data_.size();
data_.resize(off + size);
memcpy(data_.data() + off, &obj, size);
} else {
throw std::runtime_error("Cannot serialize object");
}

// Check if the type is an enum

return *this;
}
};
Expand Down Expand Up @@ -96,6 +104,9 @@ class LocalDeserialize {
off += sizeof(size_t);
obj.resize(str_size);
memcpy(obj.data(), data_.data() + off, str_size);
} else if (std::is_enum<T>::value) {
size = sizeof(T);
memcpy(&obj, data_.data() + off, size);
} else {
throw std::runtime_error("Cannot serialize object");
}
Expand Down
6 changes: 4 additions & 2 deletions tasks/data_stager/include/data_stager/data_stager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,21 @@ class Client : public TaskLibClient {
const TaskNode &task_node,
const BucketId &bkt_id,
const hshm::charbuf &blob_name,
size_t data_size,
float score,
u32 node_id) {
HRUN_CLIENT->ConstructTask<StageInTask>(
task, task_node, id_, bkt_id,
blob_name, score, node_id);
blob_name, data_size, score, node_id);
}
HSHM_ALWAYS_INLINE
void StageInRoot(const BucketId &bkt_id,
const hshm::charbuf &blob_name,
size_t data_size,
float score,
u32 node_id) {
LPointer<hrunpq::TypedPushTask<StageInTask>> task =
AsyncStageInRoot(bkt_id, blob_name, score, node_id);
AsyncStageInRoot(bkt_id, blob_name, data_size, score, node_id);
task.ptr_->Wait();
}
HRUN_TASK_NODE_PUSH_ROOT(StageIn);
Expand Down
3 changes: 3 additions & 0 deletions tasks/data_stager/include/data_stager/data_stager_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct UnregisterStagerTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
IN hermes::BucketId bkt_id_;
IN hipc::ShmArchive<hipc::charbuf> blob_name_;
IN size_t data_size_;
IN float score_;
IN u32 node_id_;

Expand All @@ -250,6 +251,7 @@ struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
const TaskStateId &state_id,
const BucketId &bkt_id,
const hshm::charbuf &blob_name,
size_t data_size,
float score,
u32 node_id) : Task(alloc) {
// Initialize task
Expand All @@ -266,6 +268,7 @@ struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
HSHM_MAKE_AR(blob_name_, alloc, blob_name);
score_ = score;
node_id_ = node_id;
data_size_ = data_size;
}

/** Destructor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class AbstractStager {
AbstractStager() = default;
~AbstractStager() = default;

/** Build context for staging */
static Context BuildPutContext() {
Context ctx;
ctx.flags_.SetBits(HERMES_SHOULD_STAGE);
return ctx;
}

virtual void RegisterStager(RegisterStagerTask *task, RunContext &rctx) = 0;
virtual void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) = 0;
virtual void StageOut(blob_mdm::Client &blob_mdm, StageOutTask *task, RunContext &rctx) = 0;
Expand Down
Loading

0 comments on commit f51ee80

Please sign in to comment.