Skip to content

Commit

Permalink
204 playback client api (#252)
Browse files Browse the repository at this point in the history
* added ClientQueryService definition

* added PlaybackQueryRpcClient

* format

* integration build errors

* added QueryId to PlaybackService API

* more ClientQueryService changes

* added chronolog::Event definition

* CMakelists.txt : disable unused-parameter & unused-variable warnings

* ClientQueryService instantiation

* tied StroytellerClient with PlaybackQueryRpcClient and ClientQueryService

* interdependency between ClientQueryService , PlaybackQueryRpcClient, and StorytellerClient

* ClientQueryService Configuration
  • Loading branch information
ibrodkin authored Feb 4, 2025
1 parent 23b04d9 commit 4c62536
Show file tree
Hide file tree
Showing 23 changed files with 743 additions and 101 deletions.
8 changes: 3 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ option(CHRONOLOG_ENABLE_DOXYGEN "Enable Doxygen documentation generation." OFF)
# Change some compilation flags depending on the build type
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
message("Compiling ChronoLog in Debug Mode")
# add_compile_options(-g3)
add_compile_options(-O0)
#set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O0")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -O0 -g")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-unused-parameter -Wno-unused-variable -O0 -g")

#if (CHRONOLOG_USE_ADDRESS_SANITIZER)
# add_compile_options(-fsanitize=address)
Expand Down Expand Up @@ -205,10 +203,10 @@ add_subdirectory(ChronoKeeper)
add_subdirectory(ChronoGrapher)
add_subdirectory(ChronoPlayer)
add_subdirectory(ChronoStore)
#if(CHRONOLOG_ENABLE_PYTHON_BINDINGS)
if(CHRONOLOG_ENABLE_PYTHON_BINDINGS)
message("Python binding enabled")
add_subdirectory(python_client)
#endif()
endif()
if(CHRONOLOG_BUILD_TESTING)
enable_testing()
add_subdirectory(test)
Expand Down
1 change: 1 addition & 0 deletions ChronoAPI/ChronoLog/include/chronolog_errcode.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum ErrorCode
CL_ERR_STORY_FILE_NOT_EXIST = -19, // Story file does not exist
CL_ERR_STORY_CHUNK_DSET_NOT_EXIST = -20,// Story chunk dataset does not exist
CL_ERR_STORY_CHUNK_EXTRACTION = -21, // Error in extracting Story chunk in ChronoKeeper
CL_ERR_NO_PLAYERS = -22, // No ChronoPlayers are available for story playback
};
}

Expand Down
5 changes: 2 additions & 3 deletions ChronoPlayer/PlaybackService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ void chronolog::PlaybackService::playback_service_available(tl::request const &r
request.respond(1);
}

void chronolog::PlaybackService::story_playback_request(tl::request const &request
, std::string const &chronicle_name, std::string const &story_name
, uint64_t start_time, uint64_t end_time, chronolog::ServiceId const & receiver_service_id)
void chronolog::PlaybackService::story_playback_request(tl::request const &request,chl::ServiceId const & receiver_service_id, uint32_t query_id
,chl::ChronicleName const &chronicle_name, chl::StoryName const &story_name, chl::chrono_time const& start_time, chl::chrono_time const& end_time)
{
LOG_INFO("[PlaybackService] PlaybackStoryRequest: ChronicleName={},StoryName={}", chronicle_name, story_name);

Expand Down
4 changes: 2 additions & 2 deletions ChronoPlayer/PlaybackService.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class PlaybackService: public tl::provider <PlaybackService>
void playback_service_available(tl::request const &request);

void
story_playback_request(tl::request const &request, std::string const &chronicle_name, std::string const &story_name
, uint64_t start_time, uint64_t end_time, ServiceId const & requesting_service_id);
story_playback_request(tl::request const &request, ServiceId const & requesting_service_id, uint32_t query_id
, ChronicleName const &chronicle_name, StoryName const &story_name, chrono_time const& start_time, chrono_time const& end_time);

private:
PlaybackService(tl::engine &tl_engine, uint16_t service_provider_id
Expand Down
4 changes: 3 additions & 1 deletion ChronoVisor/include/KeeperRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class GrapherProcessEntry
public:
PlayerProcessEntry(PlayerIdCard const& id_card, ServiceId const& admin_service_id)
: idCard(id_card)
, playbackServiceId(ServiceId())
, adminServiceId(admin_service_id)
, adminClient(nullptr)
, active(false)
Expand All @@ -109,6 +110,7 @@ class GrapherProcessEntry
~PlayerProcessEntry() = default;// Registry is reponsible for creating & deleting AdminClient

PlayerIdCard idCard;
ServiceId playbackServiceId;
ServiceId adminServiceId;
std::string idCardString;
DataStoreAdminClient* adminClient;
Expand Down Expand Up @@ -182,7 +184,7 @@ class GrapherProcessEntry
void updateKeeperProcessStats(KeeperStatsMsg const& keeperStatsMsg);

int notifyRecordingGroupOfStoryRecordingStart(ChronicleName const &, StoryName const &, StoryId const &
, std::vector <KeeperIdCard> &);
, std::vector <KeeperIdCard> &, ServiceId &);
int notifyRecordingGroupOfStoryRecordingStop(StoryId const&);

int registerGrapherProcess(GrapherRegistrationMsg const& reg_msg);
Expand Down
7 changes: 6 additions & 1 deletion ChronoVisor/src/KeeperRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ std::vector<KeeperIdCard>& RecordingGroup::getActiveKeepers(std::vector<KeeperId
/////////////////
int KeeperRegistry::notifyRecordingGroupOfStoryRecordingStart(ChronicleName const& chronicle, StoryName const &story
, StoryId const &story_id
, std::vector <KeeperIdCard> &vectorOfKeepers)
, std::vector <KeeperIdCard> &vectorOfKeepers
, ServiceId & player_service_id)
{
vectorOfKeepers.clear();

Expand Down Expand Up @@ -552,6 +553,10 @@ int KeeperRegistry::notifyRecordingGroupOfStoryRecordingStart(ChronicleName cons
story_start_time);
}

if(rpc_return == CL_SUCCESS && recording_group->playerProcess != nullptr)
{
player_service_id = recording_group->playerProcess->playbackServiceId;
}
return rpc_return;
}

Expand Down
5 changes: 3 additions & 2 deletions ChronoVisor/src/VisorClientPortal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ chronolog::VisorClientPortal::AcquireStory(chl::ClientId const &client_id, std::
{
chronolog::StoryId story_id{0};
std::vector <chronolog::KeeperIdCard> recording_keepers;
chl::ServiceId player; //ServiceID of ChronoPlayer providing playback service for this story

if(!theKeeperRegistry->is_running())
{ return chronolog::AcquireStoryResponseMsg(chronolog::CL_ERR_NO_KEEPERS, story_id, recording_keepers); }
Expand Down Expand Up @@ -225,7 +226,7 @@ chronolog::VisorClientPortal::AcquireStory(chl::ClientId const &client_id, std::
// so that they are ready to start recording this story

if(chronolog::CL_SUCCESS != theKeeperRegistry->notifyRecordingGroupOfStoryRecordingStart(
chronicle_name, story_name, story_id, recording_keepers))
chronicle_name, story_name, story_id, recording_keepers, player))
{
// RPC notification to the keepers might have failed, release the newly acquired story
chronicleMetaDirectory.release_story(client_id, chronicle_name, story_name, story_id);
Expand All @@ -234,7 +235,7 @@ chronolog::VisorClientPortal::AcquireStory(chl::ClientId const &client_id, std::
return chronolog::AcquireStoryResponseMsg(chronolog::CL_ERR_NO_KEEPERS, story_id, recording_keepers);
}

return chronolog::AcquireStoryResponseMsg(chronolog::CL_SUCCESS, story_id, recording_keepers);
return chronolog::AcquireStoryResponseMsg(chronolog::CL_SUCCESS, story_id, recording_keepers, player);
}


Expand Down
3 changes: 3 additions & 0 deletions Client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ add_library(chronolog_client
${CMAKE_CURRENT_SOURCE_DIR}/src/KeeperRecordingClient.h
${CMAKE_CURRENT_SOURCE_DIR}/src/StorytellerClient.h
${CMAKE_CURRENT_SOURCE_DIR}/src/StorytellerClient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/PlaybackQueryRpcClient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ClientQueryService.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ChronologClient.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ChronologClientImpl.cpp
${CMAKE_SOURCE_DIR}/ChronoAPI/ChronoLog/src/chrono_monitor.cpp
${CMAKE_SOURCE_DIR}/chrono_common/ConfigurationManager.cpp
${CMAKE_SOURCE_DIR}/chrono_common/StoryChunk.cpp
)

# Include directories for the library
Expand Down
24 changes: 23 additions & 1 deletion Client/include/ClientConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,29 @@ namespace chronolog

struct ClientPortalServiceConf
{
ClientPortalServiceConf( const std::string & protocol, const std::string & service_ip, uint16_t service_port, uint16_t service_provider_id)
ClientPortalServiceConf( const std::string & protocol="ofi+sockets",
const std::string & service_ip="127.0.0.1", uint16_t service_port=22, uint16_t service_provider_id=22)
: proto_conf_(protocol)
, ip_(service_ip)
, port_(service_port)
, provider_id_(service_provider_id)
{}

std::string const & proto_conf() const { return proto_conf_; }
std::string const & ip() const { return ip_; }
uint16_t port() const { return port_; }
uint16_t provider_id() const { return provider_id_; }

std::string proto_conf_;
std::string ip_;
uint16_t port_;
uint16_t provider_id_;
};

struct ClientQueryServiceConf
{
ClientQueryServiceConf( const std::string & protocol="ofi+sockets",
const std::string & service_ip="127.0.0.1", uint16_t service_port=27, uint16_t service_provider_id=27)
: proto_conf_(protocol)
, ip_(service_ip)
, port_(service_port)
Expand Down
95 changes: 88 additions & 7 deletions Client/include/chronolog_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,99 @@
#include <vector>
#include <map>

#include "ConfigurationManager.h" //TODO: not sure this is a good idea , but will keep it for now ...
#include "ConfigurationManager.h"

#include "ClientConfiguration.h"

//TODO: rename Client to be ChronologClient
//TODO: remove ConfigurationManager from chronolog_client include fiels , should only be in the implementation files
// if needed at all
namespace chronolog
{

typedef std::string StoryName;
typedef std::string ChronicleName;
typedef uint64_t ClientId;
typedef uint64_t chrono_time;
typedef uint32_t chrono_index;

namespace chronolog
class Event
{
public:

Event(chrono_time event_time = 0, ClientId client_id = 0, chrono_index index = 0, std::string const &record = std::string())
: eventTime(event_time)
, clientId(client_id)
, eventIndex(index)
, logRecord(record)
{ }

uint64_t time() const
{ return eventTime; }

ClientId const & client_id() const
{ return clientId; }

uint32_t index() const
{ return eventIndex; }

std::string const& log_record() const
{ return logRecord; }

Event( Event const& other)
: eventTime(other.time())
, clientId(other.client_id())
, eventIndex(other.index())
, logRecord(other.log_record())
{ }

Event& operator= (const Event & other)
{
if (this != &other)
{
eventTime = other.time();
clientId = other.client_id();
eventIndex = other.index();
logRecord = other.log_record();
}
return *this;
}

bool operator== (const Event &other) const
{
return (eventTime == other.eventTime && clientId == other.clientId && eventIndex == other.eventIndex );
}

bool operator!= (const Event &other) const
{ return !(*this == other); }

bool operator< (const Event &other) const
{
if( ( eventTime < other.time() )
|| (eventTime == other.time() && clientId < other.client_id())
|| (eventTime == other.time() && clientId == other.client_id() && eventIndex < other.index())
)
{
return true;
}
else
{
return false;
}
}


inline std::string toString() const
{
return "{Event: " + std::to_string(eventTime) + ":" + std::to_string(clientId) + ":" + std::to_string(eventIndex) +
":" + logRecord +"}";
}

private:

uint64_t eventTime;
ClientId clientId;
uint32_t eventIndex;
std::string logRecord;

};

class StoryHandle
{
Expand All @@ -24,8 +106,7 @@ class StoryHandle

virtual int log_event(std::string const &) = 0;

// to be implemented with libfabric/thallium bulk transfer...
//virtual int log_event( size_t , void*) = 0;
virtual int playback_story(uint64_t start, uint64_t end, std::vector<Event> & playback_events) = 0;
};

class ChronologClientImpl;
Expand Down
Loading

0 comments on commit 4c62536

Please sign in to comment.