Skip to content

Commit

Permalink
[FEAT] Changed server storage tree + subsequent changes to client
Browse files Browse the repository at this point in the history
  • Loading branch information
nots1dd committed Feb 24, 2025
1 parent 9547de6 commit 771c5f9
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 69 deletions.
57 changes: 20 additions & 37 deletions src/client_receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,52 +71,30 @@ auto perform_https_request(net::io_context& ioc, ssl::context& ctx, const std::s
}
}

auto fetch_transport_segments(int index, GlobalState& gs, const std::string& server) -> bool
auto fetch_transport_segments(const std::string& ip_id, const std::string& audio_id,
GlobalState& gs, const std::string& server) -> bool
{
net::io_context ioc;
ssl::context ctx(ssl::context::tlsv12_client);
ctx.set_verify_mode(ssl::verify_none);

LOG_INFO << "Fetching client list from server: " << server;
LOG_INFO << "Fetching playlist for IP-ID: " << ip_id << ", Audio ID: " << audio_id;

std::string clients_response =
perform_https_request(ioc, ctx, macros::to_string(macros::SERVER_PATH_HLS_CLIENTS), server);
std::istringstream clientStream(clients_response);
std::vector<std::string> clientIds;
std::string line;
std::string playlist_path = "/hls/" + ip_id + "/" + audio_id + "/index.m3u8";
std::string playlist_content = perform_https_request(ioc, ctx, playlist_path, server);

while (std::getline(clientStream, line))
if (playlist_content.empty())
{
if (!line.empty())
{
clientIds.push_back(line);
}
}

if (index < 0 || index >= static_cast<int>(clientIds.size()))
{
LOG_ERROR << "Invalid client index: " << index;
LOG_ERROR << "Failed to fetch playlist for " << ip_id << "/" << audio_id;
return false;
}

std::string client_id = clientIds[index];
if (client_id.empty())
{
LOG_ERROR << "Client ID cannot be empty";
return false;
}

LOG_INFO << "Selected client ID: " << client_id;

// Fetch the Master Playlist (index.m3u8)
std::string index_playlist_path = "/hls/" + client_id + "/index.m3u8";
std::string playlist_content = perform_https_request(ioc, ctx, index_playlist_path, server);

if (playlist_content.find(macros::PLAYLIST_VARIANT_TAG) != std::string::npos)
{
int max_bandwidth = 0;
std::string selected_playlist;
std::istringstream iss(playlist_content);
std::string line;

while (std::getline(iss, line))
{
Expand Down Expand Up @@ -152,16 +130,18 @@ auto fetch_transport_segments(int index, GlobalState& gs, const std::string& ser
LOG_INFO << "Selected highest bitrate playlist: " << selected_playlist;

// Fetch the variant playlist (which points to .m4s segments)
std::string highest_playlist_path = "/hls/" + client_id + "/" + selected_playlist;
playlist_content = perform_https_request(ioc, ctx, highest_playlist_path, server);
playlist_path = "/hls/" + ip_id + "/" + audio_id + "/" + selected_playlist;
playlist_content = perform_https_request(ioc, ctx, playlist_path, server);
}

std::istringstream segment_stream(playlist_content);
std::string line;

while (std::getline(segment_stream, line))
{
if (!line.empty() && line[0] != '#')
{
std::string segment_url = "/hls/" + client_id + "/" + line;
std::string segment_url = "/hls/" + ip_id + "/" + audio_id + "/" + line;

// Check if the segment is .ts or .m4s
if (line.ends_with(macros::TRANSPORT_STREAM_EXT) || line.ends_with(macros::M4S_FILE_EXT))
Expand Down Expand Up @@ -221,16 +201,19 @@ auto main(int argc, char* argv[]) -> int
{
logger::init_logging();

if (argc < 3)
if (argc < 4)
{
LOG_ERROR << "Usage: " << argv[0] << " <client-index> <server-ip>";
LOG_ERROR << "Usage: " << argv[0] << " <ip-id> <audio-id> <server-ip>";
return EXIT_FAILURE;
}

int index = std::stoi(argv[1]);
std::string ip_id = argv[1];
std::string audio_id = argv[2];
std::string server = argv[3];

GlobalState gs;

if (!fetch_transport_segments(index, gs, argv[2]))
if (!fetch_transport_segments(ip_id, audio_id, gs, server))
{
return EXIT_FAILURE;
}
Expand Down
139 changes: 107 additions & 32 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ auto extract_payload(const std::string& payload_path, const std::string& extract
return valid_files_found;
}

auto extract_and_validate(const std::string& gzip_path, const std::string& client_id) -> bool
auto extract_and_validate(const std::string& gzip_path, const std::string& audio_id,
const std::string& ip_id) -> bool
{
LOG_INFO << "[Extract] Validating and extracting GZIP file: " << gzip_path;

Expand All @@ -220,7 +221,7 @@ auto extract_and_validate(const std::string& gzip_path, const std::string& clien
}

std::string temp_extract_path =
macros::to_string(macros::SERVER_TEMP_STORAGE_DIR) + "/" + client_id;
macros::to_string(macros::SERVER_TEMP_STORAGE_DIR) + "/" + audio_id;
fs::create_directories(temp_extract_path);

if (!extract_payload(gzip_path, temp_extract_path))
Expand All @@ -232,7 +233,8 @@ auto extract_and_validate(const std::string& gzip_path, const std::string& clien
LOG_INFO << "[Extract] Extraction complete, validating files...";

// Move valid files to storage
std::string storage_path = macros::to_string(macros::SERVER_STORAGE_DIR) + "/" + client_id;
std::string storage_path =
macros::to_string(macros::SERVER_STORAGE_DIR) + "/" + ip_id + "/" + audio_id;
fs::create_directories(storage_path);

int valid_file_count = 0;
Expand Down Expand Up @@ -298,7 +300,10 @@ auto extract_and_validate(const std::string& gzip_path, const std::string& clien
class HLS_Session : public std::enable_shared_from_this<HLS_Session>
{
public:
explicit HLS_Session(boost::asio::ssl::stream<tcp::socket> socket) : socket_(std::move(socket)) {}
explicit HLS_Session(boost::asio::ssl::stream<tcp::socket> socket, const std::string ip)
: socket_(std::move(socket)), ip_id_(std::move(ip))
{
}

void start()
{
Expand All @@ -310,6 +315,7 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
boost::asio::ssl::stream<tcp::socket> socket_;
beast::flat_buffer buffer_;
http::request<http::string_body> request_;
std::string ip_id_;

void do_handshake()
{
Expand All @@ -327,6 +333,23 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
});
}

void resolve_ip()
{
try
{
ip_id_ = socket_.lowest_layer().remote_endpoint().address().to_string();
LOG_INFO << "[Session] Resolved IP: " << ip_id_;
}
catch (const std::exception& e)
{
LOG_ERROR << "[Session] Failed to resolve IP: " << e.what();
send_response(macros::to_string(macros::SERVER_ERROR_500));
return;
}

do_read();
}

void do_read()
{
auto self(shared_from_this());
Expand Down Expand Up @@ -361,39 +384,56 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
socket_.next_layer().set_option(option);
}

void handle_list_clients()
void handle_list_ips()
{
LOG_INFO << "[List Clients] Handling client listing request";
LOG_INFO << "[List IPs] Handling IP listing request";

std::string storage_path = macros::to_string(macros::SERVER_STORAGE_DIR);
if (!fs::exists(storage_path) || !fs::is_directory(storage_path))
{
LOG_ERROR << "[List Clients] Storage directory not found: " << storage_path;
LOG_ERROR << "[List IPs] Storage directory not found: " << storage_path;
send_response(macros::to_string(macros::SERVER_ERROR_500));
return;
}

std::ostringstream client_list;
std::ostringstream response_stream;
bool entries_found = false;

bool clients_found = false;
for (const fs::directory_entry& entry : fs::directory_iterator(storage_path)) // Added const
for (const fs::directory_entry& ip_entry : fs::directory_iterator(storage_path))
{
if (fs::is_directory(entry.status()))
if (fs::is_directory(ip_entry.status()))
{
client_list << entry.path().filename().string() << "\n";
clients_found = true;
std::string ip_id = ip_entry.path().filename().string();
response_stream << ip_id << ":\n"; // IP-ID Header

bool audio_found = false;
for (const fs::directory_entry& audio_entry : fs::directory_iterator(ip_entry.path()))
{
if (fs::is_directory(audio_entry.status()))
{
response_stream << " - " << audio_entry.path().filename().string() << "\n"; // Audio-ID
audio_found = true;
}
}

if (!audio_found)
{
response_stream << " (No audio IDs found)\n";
}

entries_found = true;
}
}

if (!clients_found)
if (!entries_found)
{
LOG_WARNING << "[List Clients] No clients found in storage";
LOG_WARNING << "[List IPs] No IPs or Audio-IDs found in storage";
send_response(macros::to_string(macros::SERVER_ERROR_404));
return;
}

// Return the list of client IDs
send_response("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n" + client_list.str());
// Return the list of IP-IDs and their respective Audio-IDs
send_response("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n" + response_stream.str());
}

void process_request()
Expand Down Expand Up @@ -440,7 +480,7 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
{
if (request_.target() == macros::SERVER_PATH_HLS_CLIENTS) // Request for all client IDs
{
handle_list_clients();
handle_list_ips();
}
else
{
Expand All @@ -457,8 +497,8 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
{
LOG_INFO << "[Upload] Handling GZIP file upload";

std::string client_id = boost::uuids::to_string(boost::uuids::random_generator()());
std::string gzip_path = macros::to_string(macros::SERVER_TEMP_STORAGE_DIR) + "/" + client_id +
std::string audio_id = boost::uuids::to_string(boost::uuids::random_generator()());
std::string gzip_path = macros::to_string(macros::SERVER_TEMP_STORAGE_DIR) + "/" + audio_id +
macros::to_string(macros::COMPRESSED_ARCHIVE_EXT);

fs::create_directories(macros::SERVER_TEMP_STORAGE_DIR);
Expand Down Expand Up @@ -490,9 +530,9 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>

LOG_INFO << "[Upload] File successfully written: " << gzip_path;

if (extract_and_validate(gzip_path, client_id))
if (extract_and_validate(gzip_path, audio_id, ip_id_))
{
send_response("HTTP/1.1 200 OK\r\nClient-ID: " + client_id + "\r\n\r\n");
send_response("HTTP/1.1 200 OK\r\nClient-ID: " + audio_id + "\r\n\r\n");
}
else
{
Expand Down Expand Up @@ -520,7 +560,7 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
*/
void handle_download()
{
// Parse request target (expected: /hls/<client_id>/<filename>)
// Parse request target (expected: /hls/<audio_id>/<filename>)
std::string target(request_.target().begin(), request_.target().end());
std::vector<std::string> parts;
std::istringstream iss(target);
Expand All @@ -534,19 +574,20 @@ class HLS_Session : public std::enable_shared_from_this<HLS_Session>
}
}

if (parts.size() < 3 || parts[0] != "hls")
if (parts.size() < 4 || parts[0] != "hls")
{
LOG_ERROR << "[Download] Invalid request path: " << target;
send_response(macros::to_string(macros::SERVER_ERROR_400));
return;
}

std::string client_id = parts[1];
std::string filename = parts[2];
std::string ip_addr = parts[1];
std::string audio_id = parts[2];
std::string filename = parts[3];

// Construct the file path
std::string file_path =
macros::to_string(macros::SERVER_STORAGE_DIR) + "/" + client_id + "/" + filename;
std::string file_path = macros::to_string(macros::SERVER_STORAGE_DIR) + "/" + ip_addr + "/" +
audio_id + "/" + filename;

if (!fs::exists(file_path) || !fs::is_regular_file(file_path))
{
Expand Down Expand Up @@ -651,12 +692,15 @@ class HLS_Server
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)), ssl_context_(ssl_context)
{
LOG_INFO << "[Server] Starting HLS server on port " << port;
load_ip_audio_map();
start_accept();
}

private:
tcp::acceptor acceptor_;
boost::asio::ssl::context& ssl_context_;
tcp::acceptor acceptor_;
boost::asio::ssl::context& ssl_context_;
std::unordered_map<std::string, std::vector<std::string>> ip_audio_map_;
std::mutex ip_audio_map_mutex_;

void start_accept()
{
Expand All @@ -669,13 +713,44 @@ class HLS_Server
return;
}

LOG_INFO << "[Server] Accepted new connection";
std::string ip = socket.remote_endpoint().address().to_string();
LOG_INFO << "[Server] Accepted new connection from " << ip;

auto session = std::make_shared<HLS_Session>(
boost::asio::ssl::stream<tcp::socket>(std::move(socket), ssl_context_));
boost::asio::ssl::stream<tcp::socket>(std::move(socket), ssl_context_), ip);
session->start();
start_accept();
});
}

void load_ip_audio_map()
{
std::ifstream infile("ip_audio_map.txt");
std::string ip, audio_id;
while (infile >> ip >> audio_id)
{
ip_audio_map_[ip].push_back(audio_id);
}
}

void save_ip_audio_map()
{
std::ofstream outfile("ip_audio_map.txt");
for (const auto& pair : ip_audio_map_)
{
for (const std::string& audio_id : pair.second)
{
outfile << pair.first << " " << audio_id << "\n";
}
}
}

void add_audio_for_ip(const std::string& ip, const std::string& audio_id)
{
std::lock_guard<std::mutex> lock(ip_audio_map_mutex_);
ip_audio_map_[ip].push_back(audio_id);
save_ip_audio_map();
}
};

auto main() -> int
Expand Down

0 comments on commit 771c5f9

Please sign in to comment.