diff --git a/CMakeLists.txt b/CMakeLists.txt index 4a944b8c..d4d5500f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -179,6 +179,7 @@ FILE(MAKE_DIRECTORY "${PROTO_OUTPUT_DIR}") ADD_SUBDIRECTORY(src/pstd) ADD_SUBDIRECTORY(src/net) ADD_SUBDIRECTORY(src/praft) +ADD_SUBDIRECTORY(src/pd) ADD_SUBDIRECTORY(src/storage) ADD_SUBDIRECTORY(src) diff --git a/cmake/braft.cmake b/cmake/braft.cmake index bb8e6882..cdb22a4d 100644 --- a/cmake/braft.cmake +++ b/cmake/braft.cmake @@ -26,6 +26,7 @@ ExternalProject_Add( -DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE} -DCMAKE_CXX_FLAGS=${BRAFT_CXX_FLAGS} -DCMAKE_INSTALL_PREFIX=${LIB_INSTALL_PREFIX} + -DCMAKE_LIBRARY_PATH=${LIB_INSTALL_PREFIX} -DBRPC_LIB=${BRPC_LIBRARIES} -DBRPC_INCLUDE_PATH=${BRPC_INCLUDE_DIR} -DCMAKE_POSITION_INDEPENDENT_CODE=ON @@ -38,7 +39,7 @@ ExternalProject_Add( -DGFLAGS_LIB=${GFLAGS_LIBRARIES} -DPROTOC_LIB=${PROTOC_LIBRARY} - -DPROTOBUF_LIBRARIES=${PROTOBUF_LIBRARY} + -DPROTOBUF_LIBRARY=${PROTOBUF_LIBRARY} -DPROTOBUF_INCLUDE_DIRS=${PROTOBUF_INCLUDE_DIR} -DPROTOBUF_PROTOC_EXECUTABLE=${PROTOBUF_PROTOC} -DOPENSSL_INCLUDE_DIR=${OPENSSL_INCLUDE_DIR} diff --git a/cmake/brpc.cmake b/cmake/brpc.cmake index 2580254a..a5559913 100644 --- a/cmake/brpc.cmake +++ b/cmake/brpc.cmake @@ -24,6 +24,7 @@ ExternalProject_Add( -DCMAKE_CPP_FLAGS=${CMAKE_CPP_FLAGS} -DCMAKE_INSTALL_PREFIX=${BRPC_INSTALL_DIR} -DCMAKE_FIND_LIBRARY_SUFFIXES=${LIB_INSTALL_PREFIX} + -DCMAKE_LIBRARY_PATH=${LIB_INSTALL_PREFIX} -DGFLAGS_INCLUDE_PATH=${GFLAGS_INCLUDE_DIR} -DBUILD_SHARED_LIBS=FALSE -DBUILD_BRPC_TOOLS=OFF @@ -34,7 +35,9 @@ ExternalProject_Add( -DPROTOC_LIB=${PROTOC_LIBRARY} -DPROTOBUF_LIBRARIES=${PROTOBUF_LIBRARY} + -DProtobuf_LIBRARIES=${PROTOBUF_LIBRARY} -DPROTOBUF_INCLUDE_DIRS=${PROTOBUF_INCLUDE_DIR} + -DProtobuf_INCLUDE_DIR=${PROTOBUF_INCLUDE_DIR} -DPROTOBUF_PROTOC_EXECUTABLE=${PROTOBUF_PROTOC} -DOPENSSL_INCLUDE_DIR=${OPENSSL_INCLUDE_DIR} diff --git a/pikiwidb.conf b/pikiwidb.conf index 38d7371c..7c88f10b 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -35,7 +35,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 16 +databases 2 ################################ SNAPSHOTTING ################################# # @@ -343,6 +343,18 @@ rocksdb-ttl-second 604800 rocksdb-periodic-second 259200; ############################### RAFT ############################### -use-raft no +use-raft yes # Braft relies on brpc to communicate via the default port number plus the port offset raft-port-offset 10 +# request-timeout-ms +request-timeout-ms 1000 + +############################## PD ################################### +as-pd yes +# Standalone or hybrid deployment +pd-fake no +# pd group id +pd-group-id PikiwiDB +# pd group conf +# ip:port:index,ip:port:index, +pd-conf 127.0.0.1:7787:0, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b8f4fc74..776ab21e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -3,6 +3,18 @@ # LICENSE file in the root directory of this source tree. An additional grant # of patent rights can be found in the PATENTS file in the same directory. +ADD_CUSTOM_COMMAND( + OUTPUT "${PROTO_OUTPUT_DIR}/store.pb.cc" + DEPENDS protobuf + COMMAND ${PROTOBUF_PROTOC} + ARGS -I ${CMAKE_CURRENT_SOURCE_DIR} + --cpp_out ${PROTO_OUTPUT_DIR} + store.proto +) +ADD_LIBRARY(store_pb STATIC "${PROTO_OUTPUT_DIR}/store.pb.cc") +SET(LIBRARY_OUTPUT_PATH ${PLIB_INSTALL_DIR}) +TARGET_INCLUDE_DIRECTORIES(store_pb PRIVATE ${PROTOBUF_INCLUDE_DIR}) + AUX_SOURCE_DIRECTORY(. PIKIWIDB_SRC) ADD_EXECUTABLE(pikiwidb ${PIKIWIDB_SRC}) @@ -30,6 +42,7 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} PRIVATE ${LIBEVENT_INCLUDE_DIRS} + PRIVATE ${PROTO_OUTPUT_DIR} ) ADD_DEPENDENCIES(pikiwidb @@ -44,6 +57,10 @@ ADD_DEPENDENCIES(pikiwidb braft brpc storage + store_pb + pd_pb + praft_pb + binlog_pb ) TARGET_LINK_LIBRARIES(pikiwidb @@ -66,7 +83,10 @@ TARGET_LINK_LIBRARIES(pikiwidb protobuf leveldb z + pd praft + store_pb + pd_pb praft_pb binlog_pb "${LIB}" diff --git a/src/base_cmd.cc b/src/base_cmd.cc index 3c09991e..f1274ffb 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -6,11 +6,16 @@ */ #include "base_cmd.h" + +#include + #include "common.h" #include "config.h" #include "log.h" #include "pikiwidb.h" #include "praft/praft.h" +#include "pstd_string.h" +#include "store.h" namespace pikiwidb { @@ -35,32 +40,59 @@ void BaseCmd::Execute(PClient* client) { DEBUG("execute command: {}", client->CmdName()); if (g_config.use_raft.load()) { - // 1. If PRAFT is not initialized yet, return an error message to the client for both read and write commands. - if (!PRAFT.IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { - DEBUG("drop command: {}", client->CmdName()); - return client->SetRes(CmdRes::kErrOther, "PRAFT is not initialized"); + // @todo need to find region by key + auto db = PSTORE.GetBackend(client->GetCurrentDB()); + if (!db) { + if (!(HasFlag(kCmdFlagsAdmin) || HasFlag(kCmdFlagsFast) || HasFlag(kCmdFlagsRaft))) { + return client->SetRes(CmdRes::kErrOther, "The region is not exist!"); + } + } else { + auto& praft = db->GetPRaft(); + + // 1. If PRAFT is not initialized yet, return an error message to the client for both read and write commands. + if (!praft->IsInitialized() && (HasFlag(kCmdFlagsReadonly) || HasFlag(kCmdFlagsWrite))) { + DEBUG("drop command: {}", client->CmdName()); + return client->SetRes(CmdRes::kErrOther, "PRAFT is not initialized"); + } + + // 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write + // commands. + if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) { + return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", praft->GetLeaderAddress())); + } } + } - // 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write - // commands. - if (HasFlag(kCmdFlagsWrite) && !PRAFT.IsLeader()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", PRAFT.GetLeaderAddress())); + auto db_id = client->GetCurrentDB(); + auto db = PSTORE.GetBackend(db_id); + if (!db) { + /* + @todo + Since the creation of shards through pd is not currently supported, if the shard to be accessed does not exist, the + operation of the shard only supports the raft command, and the creation of shards is temporarily initialized using + the raft.cluster init command. After pd is supported to create shards, it is logical that this command should not be + allowed to create shards. + */ + if (!(HasFlag(kCmdFlagsAdmin) || HasFlag(kCmdFlagsFast) || HasFlag(kCmdFlagsRaft))) { + return client->SetRes(CmdRes::kErrOther, + fmt::format("The db of {} that the client wants to access does not exist", db_id)); + } + } else { + if (!HasFlag(kCmdFlagsExclusive)) { + db->LockShared(); } - } - auto dbIndex = client->GetCurrentDB(); - if (!HasFlag(kCmdFlagsExclusive)) { - PSTORE.GetBackend(dbIndex)->LockShared(); + DEFER { + if (!HasFlag(kCmdFlagsExclusive)) { + db->UnLockShared(); + } + }; } if (!DoInitial(client)) { return; } DoCmd(client); - - if (!HasFlag(kCmdFlagsExclusive)) { - PSTORE.GetBackend(dbIndex)->UnLockShared(); - } } std::string BaseCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, diff --git a/src/base_cmd.h b/src/base_cmd.h index 6bb77dfa..bf6b01b2 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -353,7 +353,7 @@ class BaseCmdGroup : public BaseCmd { BaseCmd* GetSubCmd(const std::string& cmdName) override; // group cmd this function will not be called - void DoCmd(PClient* client) override{}; + void DoCmd(PClient* client) override {}; // group cmd this function will not be called bool DoInitial(PClient* client) override; diff --git a/src/client.cc b/src/client.cc index fbb35877..c4409180 100644 --- a/src/client.cc +++ b/src/client.cc @@ -18,6 +18,7 @@ #include "base_cmd.h" #include "config.h" #include "pikiwidb.h" +#include "store.h" namespace pikiwidb { @@ -269,7 +270,7 @@ int PClient::handlePacket(const char* start, int bytes) { if (isPeerMaster()) { if (isClusterCmdTarget()) { // Proccees the packet at one turn. - int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo + int len = PSTORE.GetBackend(dbno_)->GetPRaft()->ProcessClusterCmdResponse(this, start, bytes); // @todo if (len > 0) { return len; } @@ -456,7 +457,7 @@ void PClient::OnConnect() { } if (isClusterCmdTarget()) { - PRAFT.SendNodeRequest(this); + PSTORE.GetBackend(dbno_)->GetPRaft()->SendNodeRequest(this); } } else { if (g_config.password.empty()) { @@ -541,7 +542,8 @@ bool PClient::isPeerMaster() const { } bool PClient::isClusterCmdTarget() const { - return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort(); + auto& praft = PSTORE.GetBackend(dbno_)->GetPRaft(); + return praft->GetClusterCmdCtx().GetPeerIp() == PeerIP() && praft->GetClusterCmdCtx().GetPort() == PeerPort(); } int PClient::uniqueID() const { diff --git a/src/client.h b/src/client.h index 7f9eb982..1d1b6b55 100644 --- a/src/client.h +++ b/src/client.h @@ -107,6 +107,8 @@ enum class ClientState { kClosed, }; +const int kChannelTimeoutMS = 200; + class DB; struct PSlaveInfo; diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 2112943d..52cafb1c 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -95,7 +95,7 @@ void FlushallCmd::DoCmd(PClient* client) { } SelectCmd::SelectCmd(const std::string& name, int16_t arity) - : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} + : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} bool SelectCmd::DoInitial(PClient* client) { return true; } @@ -175,19 +175,20 @@ void InfoCmd::InfoRaft(PClient* client) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - if (!PRAFT.IsInitialized()) { + auto& praft = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); + if (!praft->IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); } - auto node_status = PRAFT.GetNodeStatus(); + auto node_status = praft->GetNodeStatus(); if (node_status.state == braft::State::STATE_END) { return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); } std::string message; - message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n"; - message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n"; - message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n"; + message += "raft_group_id:" + praft->GetGroupID() + "\r\n"; + message += "raft_node_id:" + praft->GetNodeID() + "\r\n"; + message += "raft_peer_id:" + praft->GetPeerID() + "\r\n"; if (braft::is_active_state(node_status.state)) { message += "raft_state:up\r\n"; } else { @@ -197,9 +198,9 @@ void InfoCmd::InfoRaft(PClient* client) { message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; - if (PRAFT.IsLeader()) { + if (praft->IsLeader()) { std::vector peers; - auto status = PRAFT.GetListPeers(&peers); + auto status = praft->GetListPeers(&peers); if (!status.ok()) { return client->SetRes(CmdRes::kErrOther, status.error_str()); } diff --git a/src/cmd_admin.h b/src/cmd_admin.h index c7816409..311d36d7 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -32,7 +32,7 @@ class CmdConfig : public BaseCmdGroup { private: // std::vector subCmd_; - void DoCmd(PClient* client) override{}; + void DoCmd(PClient* client) override {}; }; class CmdConfigGet : public BaseCmd { @@ -136,7 +136,7 @@ class CmdDebug : public BaseCmdGroup { bool DoInitial(PClient* client) override { return true; }; private: - void DoCmd(PClient* client) override{}; + void DoCmd(PClient* client) override {}; }; class CmdDebugHelp : public BaseCmd { diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index b4bc29b5..a289e406 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -4,25 +4,30 @@ * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. */ - #include "cmd_raft.h" +#include #include #include #include -#include "net/event_loop.h" +#include "brpc/channel.h" +#include "praft.pb.h" #include "praft/praft.h" #include "pstd/log.h" +#include "pstd/pstd_status.h" #include "pstd/pstd_string.h" #include "client.h" #include "config.h" #include "pikiwidb.h" #include "replication.h" +#include "store.h" namespace pikiwidb { +extern PConfig g_config; + RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} @@ -34,6 +39,8 @@ bool RaftNodeCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); return false; } + group_id_ = client->argv_[2]; + return true; } @@ -45,6 +52,7 @@ void RaftNodeCmd::DoCmd(PClient* client) { } else if (cmd == kRemoveCmd) { DoCmdRemove(client); } else if (cmd == kDoSnapshot) { + assert(0); // TODO(longfar): add group id in arguments DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); @@ -52,9 +60,11 @@ void RaftNodeCmd::DoCmd(PClient* client) { } void RaftNodeCmd::DoCmdAdd(PClient* client) { + DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); + auto& praft = PSTORE.GetDBByGroupID(group_id_)->GetPRaft(); // Check whether it is a leader. If it is not a leader, return the leader information - if (!PRAFT.IsLeader()) { - client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderID()); + if (!praft->IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, praft->GetLeaderID()); return; } @@ -65,7 +75,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. // So we do not need to parse and use nodeid like redis; - auto s = PRAFT.AddPeer(client->argv_[3]); + auto s = praft->AddPeer(client->argv_[3]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -74,8 +84,9 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { } void RaftNodeCmd::DoCmdRemove(PClient* client) { + auto& praft = PSTORE.GetDBByGroupID(group_id_)->GetPRaft(); // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (!PRAFT.IsInitialized()) { + if (!praft->IsInitialized()) { client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); return; } @@ -86,9 +97,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } // Check whether it is a leader. If it is not a leader, send remove request to leader - if (!PRAFT.IsLeader()) { + if (!praft->IsLeader()) { // Get the leader information - braft::PeerId leader_peer_id(PRAFT.GetLeaderID()); + braft::PeerId leader_peer_id(praft->GetLeaderID()); // @todo There will be an unreasonable address, need to consider how to deal with it if (leader_peer_id.is_empty()) { client->SetRes(CmdRes::kErrOther, @@ -96,24 +107,64 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { return; } - // Connect target - std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); - auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; - auto peer_id = client->argv_[2]; - auto ret = - PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); - if (!ret) { // other clients have removed - return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); - } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent remove request to leader successfully"); - - // Not reply any message here, we will reply after the connection is established. - client->Clear(); + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; + + NodeRemoveRequest request; + NodeRemoveResponse response; + + request.set_group_id(praft->GetGroupID()); + request.set_endpoint(client->argv_[2]); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + + auto endpoint = leader_peer_id.addr; + int retry_count = 0; + do { + brpc::Channel remove_node_channel; + if (0 != remove_node_channel.Init(endpoint, &options)) { + ERROR("Fail to init remove_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init remove_node_channel."); + return; + } + + brpc::Controller cntl; + PRaftService_Stub stub(&remove_node_channel); + stub.RemoveNode(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + ERROR("Fail to send remove node rpc to target server {}", butil::endpoint2str(endpoint).c_str()); + client->SetRes(CmdRes::kErrOther, "Failed to send remove node rpc"); + return; + } + + if (response.success()) { + client->SetRes(CmdRes::kOK, "Remove Node Success"); + return; + } + + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &endpoint); + endpoint.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); + + ERROR("Remove node request return false"); + client->SetRes(CmdRes::kErrOther, "Failed to Remove Node"); return; } - auto s = PRAFT.RemovePeer(client->argv_[2]); + auto s = praft->RemovePeer(client->argv_[2], client->GetCurrentDB()); if (s.ok()) { client->SetRes(CmdRes::kOK); } else { @@ -122,9 +173,8 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } void RaftNodeCmd::DoCmdSnapshot(PClient* client) { - auto self_snapshot_index = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetSmallestFlushedLogIndex(); - INFO("DoCmdSnapshot self_snapshot_index:{}", self_snapshot_index); - auto s = PRAFT.DoSnapshot(self_snapshot_index); + auto& praft = PSTORE.GetDBByGroupID(group_id_)->GetPRaft(); + auto s = praft->DoSnapshot(); if (s.ok()) { client->SetRes(CmdRes::kOK); } @@ -140,12 +190,17 @@ bool RaftClusterCmd::DoInitial(PClient* client) { client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); return false; } + return true; } void RaftClusterCmd::DoCmd(PClient* client) { - if (PRAFT.IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, "Already cluster member"); + auto db = PSTORE.GetBackend(client->GetCurrentDB()); + if (db) { + auto& praft = db->GetPRaft(); + if (praft->IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "Already cluster member"); + } } auto cmd = client->argv_[1]; @@ -162,21 +217,24 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - std::string cluster_id; + std::string group_id; if (client->argv_.size() == 3) { - cluster_id = client->argv_[2]; - if (cluster_id.size() != RAFT_GROUPID_LEN) { + group_id = client->argv_[2]; + if (group_id.size() != RAFT_GROUPID_LEN) { return client->SetRes(CmdRes::kInvalidParameter, "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } } else { - cluster_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); + group_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } - auto s = PRAFT.Init(cluster_id, false); - if (!s.ok()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str())); + + std::string group_id_copy(group_id); + auto s = PSTORE.AddBackend(client->GetCurrentDB(), std::move(group_id_copy)); + if (s.ok()) { + client->SetLineString(fmt::format("+OK {}", group_id)); + } else { + client->SetRes(CmdRes::kErrOther, fmt::format("The current GroupID {} already exists", group_id)); } - client->SetRes(CmdRes::kOK); } static inline std::optional> GetIpAndPortFromEndPoint(const std::string& endpoint) { @@ -191,43 +249,88 @@ static inline std::optional> GetIpAndPortFromEnd } void RaftClusterCmd::DoCmdJoin(PClient* client) { - // If the node has been initialized, it needs to close the previous initialization and rejoin the other group - if (PRAFT.IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, - "A node that has been added to a cluster must be removed \ - from the old cluster before it can be added to the new cluster"); + assert(client->argv_.size() == 4); + auto group_id = client->argv_[2]; + auto addr = client->argv_[3]; + butil::EndPoint endpoint; + if (0 != butil::str2endpoint(addr.c_str(), &endpoint)) { + ERROR("Wrong endpoint format: {}", addr); + return client->SetRes(CmdRes::kErrOther, "Wrong endpoint format"); } + endpoint.port += g_config.raft_port_offset; - if (client->argv_.size() < 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + if (group_id.size() != RAFT_GROUPID_LEN) { + return client->SetRes(CmdRes::kInvalidParameter, + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } - // (KKorpse)TODO: Support multiple nodes join at the same time. - if (client->argv_.size() > 3) { - return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments"); + std::string group_id_copy(group_id); + auto db_id = client->GetCurrentDB(); + auto s = PSTORE.AddBackend(db_id, std::move(group_id_copy)); + if (!s.ok()) { + client->SetRes(CmdRes::kErrOther, + fmt::format("The current GroupID {} fails to create region because of {}", group_id, s.ToString())); } - auto addr = client->argv_[2]; - if (braft::PeerId(addr).is_empty()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); - } + brpc::ChannelOptions options; + options.connection_type = brpc::CONNECTION_TYPE_SINGLE; + options.max_retry = 0; + options.connect_timeout_ms = kChannelTimeoutMS; - auto ip_port = GetIpAndPortFromEndPoint(addr); - if (!ip_port.has_value()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); - } - auto& [peer_ip, port] = *ip_port; + NodeAddRequest request; + NodeAddResponse response; - // Connect target - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); - if (!ret) { // other clients have joined - return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); - } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); - INFO("Sent join request to leader successfully"); + auto end_point = std::string(butil::endpoint2str(PSTORE.GetEndPoint()).c_str()); + request.set_group_id(group_id); + request.set_endpoint(end_point); + request.set_index(client->GetCurrentDB()); + request.set_role(0); + + int retry_count = 0; + + do { + brpc::Channel add_node_channel; + if (0 != add_node_channel.Init(endpoint, &options)) { + PSTORE.RemoveBackend(db_id); + ERROR("Fail to init add_node_channel to praft service!"); + client->SetRes(CmdRes::kErrOther, "Fail to init add_node_channel."); + return; + } + + brpc::Controller cntl; + PRaftService_Stub stub(&add_node_channel); + stub.AddNode(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + PSTORE.RemoveBackend(db_id); + ERROR("Fail to send add node rpc to target server {}", addr); + client->SetRes(CmdRes::kErrOther, "Failed to send add node rpc"); + return; + } + + if (response.success()) { + client->SetRes(CmdRes::kOK, "Add Node Success"); + return; + } + + switch (response.error_code()) { + case PRaftErrorCode::kErrorReDirect: { + butil::str2endpoint(response.leader_endpoint().c_str(), &endpoint); + endpoint.port += g_config.raft_port_offset; + break; + } + default: { + ERROR("Add node request return false"); + PSTORE.RemoveBackend(db_id); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); + return; + } + } + } while (!response.success() && ++retry_count <= 3); - // Not reply any message here, we will reply after the connection is established. - client->Clear(); + ERROR("Add node request return false"); + PSTORE.RemoveBackend(db_id); + client->SetRes(CmdRes::kErrOther, "Failed to Add Node"); } } // namespace pikiwidb diff --git a/src/cmd_raft.h b/src/cmd_raft.h index 6a4c1f86..9603abff 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -10,10 +10,13 @@ #include #include "base_cmd.h" +#include "client.h" namespace pikiwidb { +class PRaft; -/* RAFT.NODE ADD [id] [address:port] +/* + * RAFT.NODE ADD [id] [address:port] * Add a new node to the cluster. The [id] can be an explicit non-zero value, * or zero to let the cluster choose one. * Reply: @@ -47,20 +50,22 @@ class RaftNodeCmd : public BaseCmd { void DoCmdRemove(PClient *client); void DoCmdSnapshot(PClient *client); + private: + std::string group_id_; + static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; }; -/* RAFT.CLUSTER INIT +/* + * RAFT.CLUSTER INIT [group_id] * Initializes a new Raft cluster. - * is an optional 32 character string, if set, cluster will use it for the id * Reply: - * +OK [group_id] + * +OK * - * RAFT.CLUSTER JOIN [addr:port] - * Join an existing cluster. - * The operation is asynchronous and may take place/retry in the background. + * RAFT.CLUSTER JOIN + * Join an existing cluster. The operation is asynchronous and may take place/retry in the background. * Reply: * +OK */ @@ -76,6 +81,7 @@ class RaftClusterCmd : public BaseCmd { void DoCmdInit(PClient *client); void DoCmdJoin(PClient *client); + private: static constexpr std::string_view kInitCmd = "INIT"; static constexpr std::string_view kJoinCmd = "JOIN"; }; diff --git a/src/config.cc b/src/config.cc index 96d2d8fe..3aef23a2 100644 --- a/src/config.cc +++ b/src/config.cc @@ -138,6 +138,13 @@ PConfig::PConfig() { AddNumber("rocksdb-level0-slowdown-writes-trigger", false, &rocksdb_level0_slowdown_writes_trigger); AddNumber("rocksdb-level0-stop-writes-trigger", false, &rocksdb_level0_stop_writes_trigger); AddNumber("rocksdb-level0-slowdown-writes-trigger", false, &rocksdb_level0_slowdown_writes_trigger); + + // pd config + AddBool("as-pd", &CheckYesNo, false, &as_pd); + AddBool("pd-fake", &CheckYesNo, false, &fake); + AddString("pd-group-id", false, {&pd_group_id}); + AddString("pd-conf", false, {&pd_conf}); + AddNumber("request-timeout-ms", false, &request_timeout_ms); } bool PConfig::LoadFromFile(const std::string& file_name) { diff --git a/src/config.h b/src/config.h index 7b119638..7a2cc620 100644 --- a/src/config.h +++ b/src/config.h @@ -151,10 +151,10 @@ class PConfig { AtomicString log_dir = "stdout"; // the log directory, differ from redis AtomicString log_level = "warning"; AtomicString run_id; - std::atomic databases = 16; + std::atomic databases = 2; std::atomic_uint32_t worker_threads_num = 2; std::atomic_uint32_t slave_threads_num = 2; - std::atomic db_instance_num = 3; + std::atomic db_instance_num = 1; std::atomic_bool use_raft = true; std::atomic_uint32_t rocksdb_max_subcompactions = 0; @@ -174,6 +174,13 @@ class PConfig { std::atomic_uint64_t rocksdb_ttl_second = 604800; // default 86400 * 7 std::atomic_uint64_t rocksdb_periodic_second = 259200; // default 86400 * 3 + // PD + std::atomic_bool as_pd = false; + std::atomic_bool fake = false; + AtomicString pd_group_id; + AtomicString pd_conf; + std::atomic_int request_timeout_ms = 1000; + rocksdb::Options GetRocksDBOptions(); rocksdb::BlockBasedTableOptions GetRocksDBBlockBasedTableOptions(); diff --git a/src/db.cc b/src/db.cc index eef54016..b8c94a8c 100644 --- a/src/db.cc +++ b/src/db.cc @@ -6,7 +6,6 @@ */ #include "db.h" -#include #include "config.h" #include "praft/praft.h" @@ -16,35 +15,46 @@ extern pikiwidb::PConfig g_config; namespace pikiwidb { -DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {} +DB::DB(int64_t db_id, const std::string& db_path) + : db_id_(db_id), db_path_(db_path + std::to_string(db_id_) + '/'), praft_(std::make_unique(db_id)) {} -DB::~DB() { INFO("DB{} is closing...", db_index_); } +DB::~DB() { INFO("DB_{} is closing...", db_id_); } + +pstd::Status DB::Init(std::string&& group_id) { + rocksdb::Status rs = Open(); + if (!rs.ok()) { + return pstd::Status::Error(rs.ToString()); + } + + butil::Status bs = praft_->Init(std::move(group_id), false); + if (!bs.ok()) { + return pstd::Status::Error(bs.error_str()); + } + + return pstd::Status::OK(); +} rocksdb::Status DB::Open() { storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); storage_options.table_options = g_config.GetRocksDBBlockBasedTableOptions(); - storage_options.options.ttl = g_config.rocksdb_ttl_second.load(std::memory_order_relaxed); storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); - storage_options.small_compaction_threshold = g_config.small_compaction_threshold.load(); storage_options.small_compaction_duration_threshold = g_config.small_compaction_duration_threshold.load(); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) { - raft->DoSnapshot(std::forward(self_snapshot_index), - std::forward(is_sync)); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); }; } storage_options.db_instance_num = g_config.db_instance_num.load(); - storage_options.db_id = db_index_; + storage_options.db_id = db_id_; std::unique_ptr old_storage = std::move(storage_); if (old_storage != nullptr) { @@ -59,12 +69,12 @@ rocksdb::Status DB::Open() { } opened_ = true; - INFO("Open DB{} success!", db_index_); + INFO("Open DB{} success!", db_id_); return rocksdb::Status::OK(); } void DB::CreateCheckpoint(const std::string& checkpoint_path, bool sync) { - auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_index_); + auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_id_); if (0 != pstd::CreatePath(checkpoint_sub_path)) { WARN("Create dir {} fail !", checkpoint_sub_path); return; @@ -80,7 +90,7 @@ void DB::CreateCheckpoint(const std::string& checkpoint_path, bool sync) { } void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[maybe_unused]]) { - auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_index_); + auto checkpoint_sub_path = checkpoint_path + '/' + std::to_string(db_id_); if (0 != pstd::IsDir(checkpoint_sub_path)) { WARN("Checkpoint dir {} does not exist!", checkpoint_sub_path); return; @@ -110,18 +120,19 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma storage::StorageOptions storage_options; storage_options.options = g_config.GetRocksDBOptions(); storage_options.db_instance_num = g_config.db_instance_num.load(); - storage_options.db_id = db_index_; + storage_options.db_id = db_id_; // options for CF storage_options.options.ttl = g_config.rocksdb_ttl_second.load(std::memory_order_relaxed); storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second.load(std::memory_order_relaxed); if (g_config.use_raft.load(std::memory_order_relaxed)) { - storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + storage_options.append_log_function = [&r = *praft_](const Binlog& log, std::promise&& promise) { r.AppendLog(log, std::move(promise)); }; - storage_options.do_snapshot_function = - std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2); + storage_options.do_snapshot_function = [&r = *praft_](int64_t self_snapshot_index, bool is_sync) { + return r.DoSnapshot(self_snapshot_index, is_sync); + }; } if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { @@ -135,6 +146,6 @@ void DB::LoadDBFromCheckpoint(const std::string& checkpoint_path, bool sync [[ma } opened_ = true; - INFO("DB{} load a checkpoint from {} success!", db_index_, checkpoint_path); + INFO("DB{} load a checkpoint from {} success!", db_id_, checkpoint_path); } } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index c7508273..5b5517e5 100644 --- a/src/db.h +++ b/src/db.h @@ -7,20 +7,24 @@ #pragma once -#include +#include #include +#include "praft/praft.h" #include "pstd/log.h" #include "pstd/noncopyable.h" +#include "pstd/pstd_status.h" #include "storage/storage.h" namespace pikiwidb { class DB { public: - DB(int db_index, const std::string& db_path); + DB(int64_t db_id, const std::string& db_path); ~DB(); + pstd::Status Init(std::string&& group_id); + rocksdb::Status Open(); std::unique_ptr& GetStorage() { return storage_; } @@ -37,11 +41,13 @@ class DB { void LoadDBFromCheckpoint(const std::string& path, bool sync = true); - int GetDbIndex() { return db_index_; } + int GetDBID() { return db_id_; } + + std::unique_ptr& GetPRaft() { return praft_; } private: - const int db_index_ = 0; - const std::string db_path_; + const int64_t db_id_ = 0; // region id + const std::string db_path_; // region path /** * If you want to change the pointer that points to storage, * you must first acquire a mutex lock. @@ -49,7 +55,13 @@ class DB { * you just need to obtain a shared lock. */ std::shared_mutex storage_mutex_; - std::unique_ptr storage_; + std::unique_ptr storage_{nullptr}; + std::unique_ptr praft_{nullptr}; + + // Region key range [startKey, endKey) + std::string start_key_; + std::string end_key_; + bool opened_ = false; }; diff --git a/src/pd/CMakeLists.txt b/src/pd/CMakeLists.txt new file mode 100644 index 00000000..10ad7dae --- /dev/null +++ b/src/pd/CMakeLists.txt @@ -0,0 +1,42 @@ +# Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. + +ADD_CUSTOM_COMMAND( + OUTPUT "${PROTO_OUTPUT_DIR}/pd.pb.cc" + DEPENDS protobuf + COMMAND ${PROTOBUF_PROTOC} + ARGS -I ${PROJECT_SOURCE_DIR}/src + -I ${PROJECT_SOURCE_DIR}/src/pd + --cpp_out ${PROTO_OUTPUT_DIR} + pd.proto +) +ADD_LIBRARY(pd_pb STATIC "${PROTO_OUTPUT_DIR}/pd.pb.cc") +SET(LIBRARY_OUTPUT_PATH ${PLIB_INSTALL_DIR}) +TARGET_INCLUDE_DIRECTORIES(pd_pb PRIVATE ${PROTOBUF_INCLUDE_DIR}) + +FILE(GLOB PD_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/*.cc" +) + +SET(LIBRARY_OUTPUT_PATH ${PLIB_INSTALL_DIR}) + +ADD_LIBRARY(pd ${PD_SRC}) + +TARGET_INCLUDE_DIRECTORIES(pd + PRIVATE ${PROJECT_SOURCE_DIR}/src + PRIVATE ${rocksdb_SOURCE_DIR}/include + PRIVATE ${BRAFT_INCLUDE_DIR} + PRIVATE ${BRPC_INCLUDE_DIR} + PRIVATE ${PROTO_OUTPUT_DIR} +) + +IF (CMAKE_SYSTEM_NAME STREQUAL "Linux") + SET(PD_LIB ${PD_LIB} rt) +ENDIF () + +SET(LIBRARY_OUTPUT_PATH ${PLIB_INSTALL_DIR}) +ADD_DEPENDENCIES(pd protobuf pd_pb) +TARGET_LINK_LIBRARIES(pd praft dl fmt storage pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb z ${PD_LIB}) +SET_TARGET_PROPERTIES(pd PROPERTIES LINKER_LANGUAGE CXX) \ No newline at end of file diff --git a/src/pd/pd.proto b/src/pd/pd.proto new file mode 100644 index 00000000..6eca5949 --- /dev/null +++ b/src/pd/pd.proto @@ -0,0 +1,106 @@ +syntax="proto3"; +package pikiwidb; +option cc_generic_services = true; + +import "store.proto"; + +message GetClusterInfoRequest { +}; + +message GetClusterInfoResponse { + bool success = 1; + repeated Store store = 2; +}; + +message Store { + int64 store_id = 1; + string ip = 2; + int32 port = 3; + StoreState state = 4; + repeated Region region = 5; +}; + +message Region { + int64 region_id = 1; + optional string start_key = 2; + optional string end_key = 3; + repeated RegionEpoch region_epoch = 4; + repeated Peer peers = 5; +}; + +message RegionEpoch { + int64 conf_change_ver = 1; // conf change version + int64 region_ver = 2; // region version (split or merge) +}; + +enum StoreState { + UP = 0; + OFFLINE = 1; + TOMBSTONE = 2; +}; + +message CreateAllRegionsRequest { + int64 regions_count = 1; + int32 region_peers_count = 2; + repeated RegionOptions regionOptions = 3; +}; + +message CreateAllRegionsResponse { + bool success = 1; +}; + +message DeleteAllRegionsRequest { +}; + +message DeleteAllRegionsResponse { + bool success = 1; +}; + +message AddStoreRequest { + string ip = 1; + int32 port = 2; +}; + +message AddStoreResponse { + bool success = 1; + optional int64 store_id = 2; + optional string redirect = 3; +}; + +message RemoveStoreRequest { + int64 store_id = 1; +}; + +message RemoveStoreResponse { + bool success = 1; +}; + +message OpenPDSchedulingRequest { +}; + +message OpenPDSchedulingResponse { + bool success = 1; +}; + +message ClosePDSchedulingRequest { +}; + +message ClosePDSchedulingResponse { + bool success = 1; +}; + +service PlacementDriverService { + rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse); + + rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse); + + rpc AddStore(AddStoreRequest) returns (AddStoreResponse); + + rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse); + + rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); + + rpc OpenPDScheduling(OpenPDSchedulingRequest) returns (OpenPDSchedulingResponse); + + rpc ClosePDScheduling(ClosePDSchedulingRequest) returns (ClosePDSchedulingResponse); +}; diff --git a/src/pd/pd_server.cc b/src/pd/pd_server.cc new file mode 100644 index 00000000..e29a7db0 --- /dev/null +++ b/src/pd/pd_server.cc @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "pd_server.h" + +#include "common.h" +#include "praft/praft.h" +#include "pstd/log.h" +#include "pstd/pstd_status.h" +#include "pstd/pstd_string.h" +#include "store.h" +#include "store.pb.h" + +namespace pikiwidb { + +PlacementDriverServer& PlacementDriverServer::Instance() { + static PlacementDriverServer pd_server; + return pd_server; +} + +PlacementDriverServer::~PlacementDriverServer() { + INFO("PD Server is closing..."); + Stop(); +} + +pstd::Status PlacementDriverServer::Init(PlacementDriverOptions& pd_options) { + // 1.Initializing PD's db (region) + // Currently, only independent deployment is considered, so use default value 0. + auto status = PSTORE.AddBackend(db_id_, std::move(pd_options.GetPDGroupID())); + if (!status.ok()) { + ERROR("Fail to initialize db : {}", db_id_); + return pstd::Status::Error(status.ToString()); + } + + // 2.Deploy independently as pd for now, using the default PRAFT + if (pd_options.GetInitialPDServerList() == "None") { + WARN("The PD member configuration is empty and the raft build group needs to be manually initialized"); + } else { + // @todo + // Later consider supporting pd group initialization using pd's configuration file + WARN("Later consider supporting pd group initialization using pd's configuration file"); + } + + return pstd::Status::OK(); +} + +void PlacementDriverServer::Start() { + is_started_.store(true, std::memory_order_release); + // @todo + // In the future, you need to create a worker thread pool to implement cluster node exploration and meta information + // collection +} + +void PlacementDriverServer::Stop() { + if (!is_started_.load(std::memory_order_acquire)) { + return; + } + is_started_.store(false, std::memory_order_release); + + // @todo + // the related thread pool release task is to be added +} + +std::tuple PlacementDriverServer::GenerateStoreID() { + auto db = PSTORE.GetBackend(db_id_); + // Ensures atomicity when both read and write operations are performed + db->Lock(); + DEFER { db->UnLock(); }; + + int64_t max_store_id = 0; + std::string max_store_id_str; + auto status = db->GetStorage()->Get(PD_MAX_STORE_ID, &max_store_id_str); + if (status.ok()) { + if (pstd::String2int(max_store_id_str, &max_store_id) == 0) { + ERROR("Fail to read the correct max store id value"); + return {false, -1}; + } + + max_store_id += 1; + status = db->GetStorage()->Set(PD_MAX_STORE_ID, pstd::Int2string(max_store_id)); + if (!status.ok()) { + ERROR("Fail to write the max store id"); + return {false, -1}; + } + } else if (status.IsNotFound()) { + // Note If pd is created for the first time without any store, the initial value of max_store_id is 0. + status = db->GetStorage()->Set(PD_MAX_STORE_ID, pstd::Int2string(max_store_id)); + if (!status.ok()) { + ERROR("Fail to write the max store id"); + return {false, -1}; + } + } else { + return {false, -1}; + } + + return {true, max_store_id}; +} + +std::tuple PlacementDriverServer::GenerateRegionID() { + auto db = PSTORE.GetBackend(db_id_); + // Ensures atomicity when both read and write operations are performed + db->Lock(); + DEFER { db->UnLock(); }; + + int64_t max_region_id = 0; + std::string max_region_id_str; + auto status = db->GetStorage()->Get(PD_MAX_REGION_ID, &max_region_id_str); + if (status.ok()) { + if (pstd::String2int(max_region_id_str, &max_region_id) == 0) { + ERROR("Fail to read the correct max region id value"); + return {false, -1}; + } + + max_region_id += 1; + status = db->GetStorage()->Set(PD_MAX_REGION_ID, pstd::Int2string(max_region_id)); + if (!status.ok()) { + ERROR("Fail to write the max region id"); + return {false, -1}; + } + } else if (status.IsNotFound()) { + // Note If pd is created for the first time without any store, the initial value of max_store_id is 0. + status = db->GetStorage()->Set(PD_MAX_REGION_ID, pstd::Int2string(max_region_id)); + if (!status.ok()) { + ERROR("Fail to write the max region id"); + return {false, -1}; + } + } else { + return {false, -1}; + } + + return {true, max_region_id}; +} + +std::tuple PlacementDriverServer::CheckStoreExistByIP(const std::string& ip) { + auto db = PSTORE.GetBackend(db_id_); + db->LockShared(); + DEFER { db->UnLockShared(); }; + + std::string store_id_str; + // store_id_map_: <"PikiwiDB_PD_MetaData", "pd_store_id" + store ip, storeID> hash + auto status = db->GetStorage()->HGet(PIKIWIDB_PD_METADATA, PD_STORE_ID + ip, &store_id_str); + if (status.ok()) { + int64_t store_id = 0; + if (pstd::String2int(store_id_str, &store_id) == 0) { + ERROR("Fail to read the correct max region id value"); + return {false, -1}; + } + + return {true, store_id}; + } + + return {false, -1}; +} + +// @todo +// pikiwidb does not support transactions and will have to consider consistency +// when writing multiple pieces of data at once +std::tuple PlacementDriverServer::AddStore(const std::string& ip, int32_t port) { + // 1.check whether the store has register + auto [exist, store_id] = CheckStoreExistByIP(ip); + if (exist) { + return {true, store_id}; + } + + // 2. generate store id + auto [success, new_store_id] = GenerateStoreID(); + if (!success) { + return {false, -1}; + } + + // 3. update + std::vector fvs; + + // store_id_map_: <"PikiwiDB_PD_MetaData", "pd_store_id" + store ip, storeID> hash + auto db = PSTORE.GetBackend(db_id_); + db->LockShared(); + DEFER { db->UnLockShared(); }; + int temp = 0; + auto value = pstd::Int2string(new_store_id); + fvs.push_back(storage::FieldValue(std::move(PD_STORE_ID + ip), std::move(value))); + + // store_map_: <"PikiwiDB_PD_MetaData", "pd_store_info" + storeID, store> hash + Store store; + store.set_store_id(new_store_id); + store.set_ip(ip); + store.set_port(port); + store.set_state(StoreState::UP); + std::string store_str; + if (!store.SerializeToString(&store_str)) { + return {false, -1}; + } + fvs.push_back(storage::FieldValue(std::move(PD_STORE_INFO + pstd::Int2string(new_store_id)), std::move(store_str))); + + // store_stats_map_: <"PikiwiDB_PD_MetaData", "pd_store_stats" + storeID, storeStats> hash + StoreStats store_stats; + store_stats.set_store_id(new_store_id); + std::string store_stats_str; + if (!store_stats.SerializeToString(&store_stats_str)) { + return {false, -1}; + } + fvs.push_back(storage::FieldValue(std::move(PD_STORE_STATS + pstd::Int2string(new_store_id)), store_stats_str)); + + auto status = db->GetStorage()->HMSet(PIKIWIDB_PD_METADATA, fvs); + if (!status.ok()) { + return {false, -1}; + } + + return {true, new_store_id}; +} + +void PlacementDriverServer::GetClusterInfo(GetClusterInfoResponse* response) {} + +} // namespace pikiwidb diff --git a/src/pd/pd_server.h b/src/pd/pd_server.h new file mode 100644 index 00000000..3664bfc3 --- /dev/null +++ b/src/pd/pd_server.h @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include + +#include "db.h" +#include "pd.pb.h" +#include "pstd/pstd_status.h" + +namespace pikiwidb { + +#define PDSERVER PlacementDriverServer::Instance() + +const std::string PIKIWIDB_PD_METADATA = "PikiwiDB_PD_MetaData"; +const std::string PD_STORE_INFO = "pd_store_info"; +const std::string PD_STORE_ID = "pd_store_id"; +const std::string PD_STORE_STATS = "pd_store_stats"; +const std::string PD_MAX_STORE_ID = "pd_max_store_id"; +const std::string PD_REGION_STATS = "pd_region_stats"; +const std::string PD_MAX_REGION_ID = "pd_max_region_id"; + +// PD options +class PlacementDriverOptions { + public: + PlacementDriverOptions(bool fake, std::string&& pd_group_id, std::string&& initial_pd_server_list) + : fake_(fake), pd_group_id_(std::move(pd_group_id)), initial_pd_server_list_(std::move(initial_pd_server_list)) {} + ~PlacementDriverOptions() = default; + + bool GetFake(bool fake) { return fake_; } + std::string& GetPDGroupID() { return pd_group_id_; } + std::string& GetInitialPDServerList() { return initial_pd_server_list_; } + + private: + bool fake_ = false; // Standalone or hybrid deployment + std::string pd_group_id_; // PD Raft Group ID + std::string initial_pd_server_list_; // initial list of pd server +}; + +// PD +/* +If atom write is considered, store_map_, store_id_map_, store_stats_map_, +and region_stats_map_ are all values of "PikiwiDB_PD_MetaData", +HMSET can be used to ensure atom write. Later, to read all the values of a map, +use HSCAN to obtain field_key that meets a certain prefix. + +Store and Region meta information is persisted to Floyd: +1.store_map_: <"PikiwiDB_PD_MetaData", "pd_store_info" + storeID, store> hash +2.store_id_map_: <"PikiwiDB_PD_MetaData", "pd_store_id" + store ip, storeID> hash +3.store_stats_map_: <"PikiwiDB_PD_MetaData", "pd_store_stats" + storeID, storeStats> hash +4.max_store_id_: <"pd_max_store_id", maxStoreID> string +5.region_stats_map_: <"PikiwiDB_PD_MetaData", "pd_region_stats" + regionID, regionStats> hash +6.max_region_id: <"pd_max_region_id", maxRegionID> string +*/ +class PlacementDriverServer { + public: + static PlacementDriverServer& Instance(); + + PlacementDriverServer(const PlacementDriverServer&) = delete; + void operator=(const PlacementDriverServer&) = delete; + ~PlacementDriverServer(); + + pstd::Status Init(PlacementDriverOptions& pd_options); + void Start(); + void Stop(); + + std::tuple GenerateStoreID(); + std::tuple GenerateRegionID(); + std::tuple CheckStoreExistByIP(const std::string& ip); + std::tuple AddStore(const std::string& ip, int32_t port); + void GetClusterInfo(GetClusterInfoResponse* response); + + private: + PlacementDriverServer() = default; + + int64_t db_id_ = 0; // region id of pd + std::atomic is_started_; // mark whether the fragment is started +}; + +} // namespace pikiwidb diff --git a/src/pd/pd_service.cc b/src/pd/pd_service.cc new file mode 100644 index 00000000..d846cf8f --- /dev/null +++ b/src/pd/pd_service.cc @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "pd_service.h" + +#include "pd_server.h" + +namespace pikiwidb { +void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port()); + if (!success) { + response->set_success(false); + return; + } + + response->set_success(true); + response->set_store_id(store_id); +} + +void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller, + const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller, + const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, + ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + PDSERVER.GetClusterInfo(response); +} + +void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} + +void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, + ::google::protobuf::Closure* done) {} +} // namespace pikiwidb diff --git a/src/pd/pd_service.h b/src/pd/pd_service.h new file mode 100644 index 00000000..fb2ccb8d --- /dev/null +++ b/src/pd/pd_service.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "pd.pb.h" + +namespace pikiwidb { + +class PlacementDriverServiceImpl : public PlacementDriverService { + public: + PlacementDriverServiceImpl() = default; + + void CreateAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::CreateAllRegionsRequest* request, + ::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void DeleteAllRegions(::google::protobuf::RpcController* controller, + const ::pikiwidb::DeleteAllRegionsRequest* request, + ::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override; + + void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request, + ::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override; + + void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request, + ::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override; + + void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request, + ::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override; + + void OpenPDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::OpenPDSchedulingRequest* request, + ::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override; + + void ClosePDScheduling(::google::protobuf::RpcController* controller, + const ::pikiwidb::ClosePDSchedulingRequest* request, + ::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override; +}; + +} // namespace pikiwidb diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 5108e8d9..fc70c1a8 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -159,7 +159,7 @@ bool PikiwiDB::Init() { return false; } - PSTORE.Init(g_config.databases.load(std::memory_order_relaxed)); + PSTORE.Init(); PSlowLog::Instance().SetThreshold(g_config.slow_log_time.load()); PSlowLog::Instance().SetLogLimit(static_cast(g_config.slow_log_max_len.load())); @@ -197,9 +197,6 @@ void PikiwiDB::Run() { } void PikiwiDB::Stop() { - pikiwidb::PRAFT.ShutDown(); - pikiwidb::PRAFT.Join(); - pikiwidb::PRAFT.Clear(); slave_threads_.Exit(); worker_threads_.Exit(); cmd_threads_.Stop(); @@ -282,7 +279,7 @@ int main(int ac, char* av[]) { daemonize(); } - InitLimit(); + // InitLimit(); pstd::InitRandom(); SignalSetup(); InitLogs(); diff --git a/src/praft/CMakeLists.txt b/src/praft/CMakeLists.txt index 95ae8a01..a1ae58df 100644 --- a/src/praft/CMakeLists.txt +++ b/src/praft/CMakeLists.txt @@ -9,7 +9,7 @@ ADD_CUSTOM_COMMAND( COMMAND ${PROTOBUF_PROTOC} ARGS -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out ${PROTO_OUTPUT_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/binlog.proto + binlog.proto ) ADD_LIBRARY(binlog_pb STATIC "${PROTO_OUTPUT_DIR}/binlog.pb.cc") SET(LIBRARY_OUTPUT_PATH ${PLIB_INSTALL_DIR}) @@ -21,7 +21,7 @@ ADD_CUSTOM_COMMAND( COMMAND ${PROTOBUF_PROTOC} ARGS -I ${CMAKE_CURRENT_SOURCE_DIR} --cpp_out ${PROTO_OUTPUT_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/praft.proto + praft.proto ) ADD_LIBRARY(praft_pb STATIC "${PROTO_OUTPUT_DIR}/praft.pb.cc") diff --git a/src/praft/praft.cc b/src/praft/praft.cc index c171ff4b..7017e4eb 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -9,9 +9,9 @@ #include -#include "braft/snapshot.h" +#include "braft/raft.h" #include "braft/util.h" -#include "brpc/server.h" +#include "butil/endpoint.h" #include "pstd/log.h" #include "pstd/pstd_string.h" @@ -66,105 +66,52 @@ void ClusterCmdContext::ConnectTargetNode() { auto ip = PREPL.GetMasterAddr().GetIP(); auto port = PREPL.GetMasterAddr().GetPort(); if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { - PRAFT.SendNodeRequest(PREPL.GetMaster()); + praft_->SendNodeRequest(PREPL.GetMaster()); return; } // reconnect auto fail_cb = [&](EventLoop*, const char* peer_ip, int port) { - PRAFT.OnClusterCmdConnectionFailed(EventLoop::Self(), peer_ip, port); + praft_->OnClusterCmdConnectionFailed(EventLoop::Self(), peer_ip, port); }; PREPL.SetFailCallback(fail_cb); PREPL.SetMasterState(kPReplStateNone); PREPL.SetMasterAddr(peer_ip_.c_str(), port_); } -PRaft& PRaft::Instance() { - static PRaft store; - return store; -} - -butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { - if (node_ && server_) { +butil::Status PRaft::Init(std::string&& group_id, bool initial_conf_is_null) { + if (node_) { return {0, "OK"}; } - server_ = std::make_unique(); - auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - // Add your service into RPC server - DummyServiceImpl service(&PRAFT); - if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add service"); - } - // raft can share the same RPC server. Notice the second parameter, because - // adding services into a running server is not allowed and the listen - // address of this server is impossible to get before the server starts. You - // have to specify the address of the server. - if (braft::add_service(server_.get(), port) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to add raft service"); - } - - // It's recommended to start the server before Counter is started to avoid - // the case that it becomes the leader while the service is unreacheable by - // clients. - // Notice the default options of server is used here. Check out details from - // the doc of brpc if you would like change some option; - if (server_->Start(port, nullptr) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to start server"); - } - // It's ok to start PRaft; - assert(group_id.size() == RAFT_GROUPID_LEN); - this->group_id_ = group_id; - - // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. - raw_addr_ = g_config.ip.ToString() + ":" + std::to_string(port); - butil::ip_t ip; - auto ret = butil::str2ip(g_config.ip.ToString().c_str(), &ip); - if (ret != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to convert str_ip to butil::ip_t"); - } - butil::EndPoint addr(ip, port); - - // Default init in one node. - // initial_conf takes effect only when the replication group is started from an empty node. - // The Configuration is restored from the snapshot and log files when the data in the replication group is not empty. - // initial_conf is used only to create replication groups. - // The first node adds itself to initial_conf and then calls add_peer to add other nodes. - // Set initial_conf to empty for other nodes. - // You can also start empty nodes simultaneously by setting the same inital_conf(ip:port of multiple nodes) for - // multiple nodes. - std::string initial_conf; + braft::NodeOptions node_options; if (!initial_conf_is_null) { - initial_conf = raw_addr_ + ":0,"; - } - if (node_options_.initial_conf.parse_from(initial_conf) != 0) { - server_.reset(); - return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + auto endpoint_str = butil::endpoint2str(PSTORE.GetEndPoint()); + std::string initial_conf = fmt::format("{}:{},", endpoint_str.c_str(), db_id_); + if (node_options.initial_conf.parse_from(initial_conf) != 0) { + return ERROR_LOG_AND_STATUS("Failed to parse configuration"); + } } + node_options.fsm = this; + node_options.node_owns_fsm = false; + node_options.snapshot_interval_s = 0; + auto prefix = fmt::format("local://{}{}/{}", g_config.db_path.ToString(), "_praft", db_id_); + node_options.log_uri = prefix + "/log"; + node_options.raft_meta_uri = prefix + "/raft_meta"; + node_options.snapshot_uri = prefix + "/snapshot"; + snapshot_adaptor_ = new PPosixFileSystemAdaptor(); + node_options.snapshot_file_system_adaptor = &snapshot_adaptor_; // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; - node_options_.fsm = this; - node_options_.node_owns_fsm = false; - node_options_.snapshot_interval_s = 0; - std::string prefix = "local://" + g_config.db_path.ToString() + std::to_string(db_id_) + "/_praft"; - node_options_.log_uri = prefix + "/log"; - node_options_.raft_meta_uri = prefix + "/raft_meta"; - node_options_.snapshot_uri = prefix + "/snapshot"; // node_options_.disable_cli = FLAGS_disable_cli; - snapshot_adaptor_ = new PPosixFileSystemAdaptor(); - node_options_.snapshot_file_system_adaptor = &snapshot_adaptor_; - node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id - if (node_->init(node_options_) != 0) { - server_.reset(); + node_ = std::make_unique(group_id, braft::PeerId(PSTORE.GetEndPoint(), db_id_)); // group_id + if (node_->init(node_options) != 0) { node_.reset(); return ERROR_LOG_AND_STATUS("Failed to init raft node"); } - + group_id_ = std::move(group_id); + INFO("Initialized praft successfully: node_id={}", GetNodeID()); return {0, "OK"}; } @@ -264,9 +211,11 @@ void PRaft::SendNodeRequest(PClient* client) { auto cluster_cmd_type = cluster_cmd_ctx_.GetClusterCmdType(); switch (cluster_cmd_type) { - case ClusterCmdType::kJoin: - SendNodeInfoRequest(client, "DATA"); - break; + case ClusterCmdType::kJoin: { + // SendNodeInfoRequest(client, "DATA"); + // SendNodeInfoRequest(client, "DATA"); + SendNodeAddRequest(client); + } break; case ClusterCmdType::kRemove: SendNodeRemoveRequest(client); break; @@ -289,16 +238,11 @@ void PRaft::SendNodeAddRequest(PClient* client) { assert(client); // Node id in braft are ip:port, the node id param in RAFT.NODE ADD cmd will be ignored. - int unused_node_id = 0; auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port); - UnboundedBuffer req; - req.PushData("RAFT.NODE ADD ", 14); - req.PushData(std::to_string(unused_node_id).c_str(), std::to_string(unused_node_id).size()); - req.PushData(" ", 1); - req.PushData(raw_addr.data(), raw_addr.size()); - req.PushData("\r\n", 2); - client->SendPacket(req); + auto raw_addr = g_config.ip.ToString() + ":" + std::to_string(port) + ":" + std::to_string(db_id_); + auto msg = fmt::format("RAFT.NODE ADD {} {}\r\n", group_id_, raw_addr); + client->SendPacket(msg); + INFO("Sent join request to leader successfully"); client->Clear(); } @@ -318,10 +262,10 @@ int PRaft::ProcessClusterCmdResponse(PClient* client, const char* start, int len int ret = 0; switch (cluster_cmd_type) { case ClusterCmdType::kJoin: - ret = PRAFT.ProcessClusterJoinCmdResponse(client, start, len); + ret = ProcessClusterJoinCmdResponse(client, start, len); break; case ClusterCmdType::kRemove: - ret = PRAFT.ProcessClusterRemoveCmdResponse(client, start, len); + ret = ProcessClusterRemoveCmdResponse(client, start, len); break; default: client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER response supports JOIN/REMOVE only"); @@ -388,47 +332,49 @@ void PRaft::LeaderRedirection(PClient* join_client, const std::string& reply) { // Reset the target of the connection cluster_cmd_ctx_.Clear(); - auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); + auto ret = GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); if (!ret) { // other clients have joined join_client->SetRes(CmdRes::kErrOther, "Other clients have joined"); join_client->SendPacket(join_client->Message()); join_client->Clear(); return; } - PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + GetClusterCmdCtx().ConnectTargetNode(); // Not reply any message here, we will reply after the connection is established. join_client->Clear(); } -void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { +bool PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { std::string prefix = RAFT_GROUP_ID; std::string::size_type prefix_length = prefix.length(); std::string::size_type group_id_start = reply.find(prefix); group_id_start += prefix_length; // locate the start location of "raft_group_id" std::string::size_type group_id_end = reply.find("\r\n", group_id_start); - if (group_id_end != std::string::npos) { - std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); - // initialize the slave node - auto s = PRAFT.Init(raft_group_id, true); - if (!s.ok()) { - join_client->SetRes(CmdRes::kErrOther, s.error_str()); - join_client->SendPacket(join_client->Message()); - join_client->Clear(); - // If the join fails, clear clusterContext and set it again by using the join command - cluster_cmd_ctx_.Clear(); - return; - } - - PRAFT.SendNodeAddRequest(client); - } else { + if (group_id_end == std::string::npos) { // can't find group id ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); join_client->SendPacket(join_client->Message()); join_client->Clear(); // If the join fails, clear clusterContext and set it again by using the join command cluster_cmd_ctx_.Clear(); + return false; + } + + std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); + // initialize the slave node + auto s = Init(std::move(std::string(raft_group_id)), true); + if (!s.ok()) { + join_client->SetRes(CmdRes::kErrOther, s.error_str()); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + ERROR("Failed to init raft: {}", s.error_cstr()); + return false; } + INFO("Init raft successfully, groupid={}", raft_group_id); + return true; } int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { @@ -441,7 +387,7 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Joined Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Joined Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); join_client->SetRes(CmdRes::kOK); join_client->SendPacket(join_client->Message()); join_client->Clear(); @@ -452,7 +398,12 @@ int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int } else if (reply.find(WRONG_LEADER) != std::string::npos) { LeaderRedirection(join_client, reply); } else if (reply.find(RAFT_GROUP_ID) != std::string::npos) { - InitializeNodeBeforeAdd(client, join_client, reply); + auto res = InitializeNodeBeforeAdd(client, join_client, reply); + if (!res) { + ERROR("Failed to initialize node before add"); + return len; + } + SendNodeAddRequest(client); } else { ERROR("Joined Raft cluster fail, str: {}", reply); join_client->SetRes(CmdRes::kErrOther, reply); @@ -475,7 +426,7 @@ int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, i std::string reply(start, len); if (reply.find(OK_STR) != std::string::npos) { - INFO("Removed Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + INFO("Removed Raft cluster, node id: {}, group_id: {}", GetNodeID(), group_id_); ShutDown(); Join(); Clear(); @@ -517,6 +468,26 @@ butil::Status PRaft::AddPeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::AddPeer(const std::string& endpoint, int index) { + if (!node_) { + ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + butil::str2endpoint(endpoint.c_str(), &ep); + braft::PeerId peer_id(ep, index); + node_->add_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + // WARN("Failed to add peer {} to node {}, status: {}", ep, node_->node_id().to_string(), + // done.status().error_str()); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::RemovePeer(const std::string& peer) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); @@ -535,6 +506,26 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::RemovePeer(const std::string& endpoint, int index) { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + + braft::SynchronizedClosure done; + butil::EndPoint ep; + butil::str2endpoint(endpoint.c_str(), &ep); + ep.port += g_config.raft_port_offset; + braft::PeerId peer_id(ep, index); + node_->remove_peer(peer_id, &done); + done.wait(); + + if (!done.status().ok()) { + WARN("Failed to remove peer, status: {}", done.status().error_str()); + return done.status(); + } + return done.status(); +} + butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) { if (!node_) { return ERROR_LOG_AND_STATUS("Node is not initialized"); @@ -569,10 +560,6 @@ void PRaft::ShutDown() { if (node_) { node_->shutdown(nullptr); } - - if (server_) { - server_->Stop(0); - } } // Blocking this thread until the node is eventually down. @@ -580,10 +567,6 @@ void PRaft::Join() { if (node_) { node_->join(); } - - if (server_) { - server_->Join(); - } } void PRaft::AppendLog(const Binlog& log, std::promise&& promise) { @@ -609,10 +592,6 @@ void PRaft::Clear() { if (node_) { node_.reset(); } - - if (server_) { - server_.reset(); - } } void PRaft::on_apply(braft::Iterator& iter) { diff --git a/src/praft/praft.h b/src/praft/praft.h index 60c9e475..67794675 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,11 +7,9 @@ #pragma once -#include #include #include #include -#include #include #include "braft/file_system_adaptor.h" @@ -34,10 +32,9 @@ namespace pikiwidb { #define RAFT_GROUP_ID "raft_group_id:" #define NOT_LEADER "Not leader" -#define PRAFT PRaft::Instance() - class EventLoop; class Binlog; +class PRaft; enum ClusterCmdType { kNone, @@ -46,10 +43,8 @@ enum ClusterCmdType { }; class ClusterCmdContext { - friend class PRaft; - public: - ClusterCmdContext() = default; + ClusterCmdContext(PRaft* raft) : praft_(raft) {} ~ClusterCmdContext() = default; bool Set(ClusterCmdType cluster_cmd_type, PClient* client, std::string&& peer_ip, int port, @@ -69,6 +64,7 @@ class ClusterCmdContext { void ConnectTargetNode(); private: + PRaft* praft_; ClusterCmdType cluster_cmd_type_ = ClusterCmdType::kNone; std::mutex mtx_; PClient* client_ = nullptr; @@ -94,17 +90,21 @@ class PRaftWriteDoneClosure : public braft::Closure { class PRaft : public braft::StateMachine { public: - PRaft() = default; - ~PRaft() override = default; - - static PRaft& Instance(); + PRaft(uint64_t db_id) : db_id_(db_id) {} + ~PRaft() override { + ShutDown(); + Join(); + Clear(); + } //===--------------------------------------------------------------------===// // Braft API //===--------------------------------------------------------------------===// - butil::Status Init(std::string& group_id, bool initial_conf_is_null); + butil::Status Init(std::string&& group_id, bool initial_conf_is_null); butil::Status AddPeer(const std::string& peer); + butil::Status AddPeer(const std::string& endpoint, int index); butil::Status RemovePeer(const std::string& peer); + butil::Status RemovePeer(const std::string& endpoint, int index); butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); void ShutDown(); @@ -124,7 +124,7 @@ class PRaft : public braft::StateMachine { int ProcessClusterCmdResponse(PClient* client, const char* start, int len); void CheckRocksDBConfiguration(PClient* client, PClient* join_client, const std::string& reply); void LeaderRedirection(PClient* join_client, const std::string& reply); - void InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); + bool InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); int ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); int ProcessClusterRemoveCmdResponse(PClient* client, const char* start, int len); @@ -141,7 +141,7 @@ class PRaft : public braft::StateMachine { storage::LogIndex GetTerm(uint64_t log_index); storage::LogIndex GetLastLogIndex(bool is_flush = false); - bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; } + bool IsInitialized() const { return node_ != nullptr; } private: void on_apply(braft::Iterator& iter) override; @@ -158,16 +158,11 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - std::unique_ptr server_{nullptr}; // brpc std::unique_ptr node_{nullptr}; - braft::NodeOptions node_options_; // options for raft node - std::string raw_addr_; // ip:port of this node - scoped_refptr snapshot_adaptor_ = nullptr; - ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command - std::string group_id_; // group id - int db_id_ = 0; // db_id - + ClusterCmdContext cluster_cmd_ctx_{this}; // context for cluster join/remove command + std::string group_id_; // group id + int64_t db_id_ = 0; // db_id bool is_node_first_start_up_ = true; }; diff --git a/src/praft/praft.proto b/src/praft/praft.proto index 61a495f2..9bb140e7 100644 --- a/src/praft/praft.proto +++ b/src/praft/praft.proto @@ -2,12 +2,33 @@ syntax="proto3"; package pikiwidb; option cc_generic_services = true; -message DummyRequest { -}; +message NodeAddRequest { + string group_id = 1; + string endpoint = 2; + uint32 index = 3; + uint32 role = 4; +} -message DummyResponse { -}; +message NodeAddResponse { + bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; +} + +message NodeRemoveRequest { + string group_id = 1; + string endpoint = 2; + uint32 index = 3; + uint32 role = 4; +} + +message NodeRemoveResponse { + bool success = 1; + uint32 error_code = 2; + string leader_endpoint = 3; +} -service DummyService { - rpc DummyMethod(DummyRequest) returns (DummyResponse); +service PRaftService { + rpc AddNode(NodeAddRequest) returns (NodeAddResponse); + rpc RemoveNode(NodeRemoveRequest) returns (NodeRemoveResponse); }; diff --git a/src/praft/praft_service.cc b/src/praft/praft_service.cc new file mode 100644 index 00000000..12b92261 --- /dev/null +++ b/src/praft/praft_service.cc @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "praft_service.h" + +#include "fmt/format.h" +#include "store.h" + +namespace pikiwidb { +void PRaftServiceImpl::AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->group_id(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto db = PSTORE.GetDBByGroupID(groupid); + if (!db) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + auto& praft = db->GetPRaft(); + if (!praft->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + response->set_leader_endpoint(praft->GetLeaderAddress()); + return; + } + + auto status = praft->AddPeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft->GetLeaderAddress()); + return; + } + response->set_success(true); +} + +void PRaftServiceImpl::RemoveNode(::google::protobuf::RpcController* controller, + const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + auto groupid = request->group_id(); + auto end_point = request->endpoint(); + auto index = request->index(); + auto role = request->role(); + + auto db = PSTORE.GetDBByGroupID(groupid); + if (!db) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorDisMatch)); + response->set_leader_endpoint(end_point); + return; + } + + auto& praft = db->GetPRaft(); + if (!praft->IsLeader()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorReDirect)); + response->set_leader_endpoint(praft->GetLeaderAddress()); + return; + } + + auto status = praft->RemovePeer(end_point, index); + if (!status.ok()) { + response->set_success(false); + response->set_error_code(static_cast(PRaftErrorCode::kErrorAddNode)); + response->set_leader_endpoint(praft->GetLeaderAddress()); + return; + } + response->set_success(true); +} + +} // namespace pikiwidb \ No newline at end of file diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index d7b655a2..365a575c 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -12,15 +12,14 @@ namespace pikiwidb { class PRaft; - -class DummyServiceImpl : public DummyService { +class PRaftServiceImpl : public PRaftService { public: - explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {} - void DummyMethod(::google::protobuf::RpcController* controller, const ::pikiwidb::DummyRequest* request, - ::pikiwidb::DummyResponse* response, ::google::protobuf::Closure* done) override {} + PRaftServiceImpl() = default; + void AddNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeAddRequest* request, + ::pikiwidb::NodeAddResponse* response, ::google::protobuf::Closure* done); - private: - PRaft* praft_ = nullptr; + void RemoveNode(::google::protobuf::RpcController* controller, const ::pikiwidb::NodeRemoveRequest* request, + ::pikiwidb::NodeRemoveResponse* response, ::google::protobuf::Closure* done); }; } // namespace pikiwidb diff --git a/src/praft/psnapshot.cc b/src/praft/psnapshot.cc index 28fc99c0..b927da85 100644 --- a/src/praft/psnapshot.cc +++ b/src/praft/psnapshot.cc @@ -28,7 +28,7 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o if ((oflag & IS_RDONLY) == 0) { // This is a read operation bool snapshots_exists = false; std::string snapshot_path; - int db_id = -1; + int64_t db_id = -1; // parse snapshot path butil::FilePath parse_snapshot_path(path); @@ -83,7 +83,7 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o auto& new_meta = const_cast(snapshot_meta_memtable.meta()); auto last_log_index = PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex(); new_meta.set_last_included_index(last_log_index); - auto last_log_term = PRAFT.GetTerm(last_log_index); + auto last_log_term = PSTORE.GetBackend(db_id)->GetPRaft()->GetTerm(last_log_index); new_meta.set_last_included_term(last_log_term); INFO("Succeed to fix db_{} snapshot meta: {}, {}", db_id, last_log_index, last_log_term); diff --git a/src/store.cc b/src/store.cc index 24a0f2ba..87b3bc6b 100644 --- a/src/store.cc +++ b/src/store.cc @@ -7,16 +7,27 @@ #include "store.h" +#include #include #include +#include + +#include "braft/raft.h" +#include "braft/route_table.h" +#include "braft/util.h" +#include "brpc/channel.h" +#include "brpc/controller.h" #include "config.h" #include "db.h" +#include "pd.pb.h" +#include "pd/pd_server.h" +#include "praft/praft_service.h" #include "pstd/log.h" #include "pstd/pstd_string.h" +#include "pstd_status.h" namespace pikiwidb { - PStore::~PStore() { INFO("STORE is closing..."); } PStore& PStore::Instance() { @@ -24,25 +35,213 @@ PStore& PStore::Instance() { return store; } -void PStore::Init(int db_number) { - db_number_ = db_number; - backends_.reserve(db_number_); - for (int i = 0; i < db_number_; i++) { - auto db = std::make_unique(i, g_config.db_path); - db->Open(); - backends_.push_back(std::move(db)); - INFO("Open DB_{} success!", i); +void PStore::Init() { + // 1. init rpc + if (!InitRpcServer()) { + ERROR("STORE Init failed!"); + return; + } + + // 2. Currently, only pd independent deployment is supported if the current node is not a pd node, + // the current node needs to report to the pd node that it is online as a common node. + if (!RegisterStoreToPDServer()) { + ERROR("STORE Init failed!"); + return; } + + // 3.If the node acts as a pd, then the initializer node can build a pd group based on the configuration. + if (g_config.as_pd.load(std::memory_order_relaxed)) { + PlacementDriverOptions pd_options(g_config.fake.load(std::memory_order_relaxed), + std::move(g_config.pd_group_id.ToString()), + std::move(g_config.pd_conf.ToString())); + PDSERVER.Init(pd_options); + PDSERVER.Start(); + } + + is_started_.store(true, std::memory_order_relaxed); + INFO("STORE Init success!"); } +bool PStore::InitRpcServer() { + rpc_server_ = std::make_unique(); + auto ip = g_config.ip.ToString(); + butil::ip_t rpc_ip; + butil::str2ip(ip.c_str(), &rpc_ip); + auto rpc_port = + g_config.port.load(std::memory_order_relaxed) + g_config.raft_port_offset.load(std::memory_order_relaxed); + endpoint_ = butil::EndPoint(rpc_ip, rpc_port); + + if (braft::add_service(rpc_server_.get(), endpoint_) != 0) { + rpc_server_.reset(); + ERROR("Failed to add raft service"); + return false; + } + + // Add praft service into RPC server + praft_service_ = std::make_unique(); + if (rpc_server_->AddService(praft_service_.get(), brpc::SERVER_OWNS_SERVICE) != 0) { + rpc_server_.reset(); + ERROR("Failed to add praft service"); + return false; + } + + // Add PDService if the node as the pd + if (g_config.as_pd.load(std::memory_order_relaxed)) { + pd_service_ = std::make_unique(); + if (rpc_server_->AddService(pd_service_.get(), brpc::SERVER_OWNS_SERVICE) != 0) { + rpc_server_.reset(); + ERROR("Failed to add pd service"); + return false; + } + } + + // Add StoreService if the node is deployed in mixed mode or is not currently used as a pd node + if (!g_config.as_pd.load(std::memory_order_relaxed) || g_config.fake.load(std::memory_order_relaxed)) { + store_service_ = std::make_unique(); + if (rpc_server_->AddService(store_service_.get(), brpc::SERVER_OWNS_SERVICE) != 0) { + rpc_server_.reset(); + ERROR("Failed to add store service"); + return false; + } + } + + if (rpc_server_->Start(endpoint_, nullptr) != 0) { + rpc_server_.reset(); + ERROR("Failed to start server"); + return false; + } + + return true; +} + +bool PStore::RegisterStoreToPDServer() { + if (!g_config.as_pd.load(std::memory_order_relaxed)) { + // Register configuration of target group to RouteTable + if (braft::rtb::update_configuration(g_config.pd_group_id.ToString(), g_config.pd_conf.ToString()) != 0) { + ERROR("Fail to register configuration {} of group {}", g_config.pd_conf.ToString(), + g_config.pd_group_id.ToString()); + return false; + } + + auto conf = g_config.pd_conf.ToString(); + int retry_times = std::count_if(conf.begin(), conf.end(), [](char& c) { return c == ','; }) + 2; + for (int i = 0; i < retry_times; i++) { + braft::PeerId leader; + // select leader of the target group from RouteTable + if (braft::rtb::select_leader(g_config.pd_group_id, &leader) != 0) { + // leader is unknow in RouteTable. Ask RouteTable to refresh leader by sending RPCs. + butil::Status st = braft::rtb::refresh_leader(g_config.pd_group_id, g_config.request_timeout_ms); + if (!st.ok()) { + // not sure about the leader, sleep for a while and the ask again. + WARN("Fail to refresh_leader : {}", st.error_str()); + std::chrono::milliseconds duration(g_config.request_timeout_ms); + std::this_thread::sleep_for(duration); + } + continue; + } + + // Now we know who is the leader, construct Stub and then sending + // rpc + brpc::Channel channel; + if (channel.Init(leader.addr, NULL) != 0) { + WARN("Fail to init channel to {}", leader.to_string()); + std::chrono::milliseconds duration(g_config.request_timeout_ms); + std::this_thread::sleep_for(duration); + continue; + } + PlacementDriverService_Stub stub(&channel); + + brpc::Controller cntl; + cntl.set_timeout_ms(g_config.request_timeout_ms); + AddStoreResponse response; + AddStoreRequest request; + request.set_ip(g_config.ip.ToString()); + request.set_port(g_config.port.load(std::memory_order_relaxed)); + stub.AddStore(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + WARN("Fail to send request to {} : {}", leader.to_string(), cntl.ErrorText()); + // clear leadership since this RPC failed. + braft::rtb::update_leader(g_config.pd_group_id.ToString(), braft::PeerId()); + std::chrono::milliseconds duration(g_config.request_timeout_ms); + std::this_thread::sleep_for(duration); + continue; + } + + if (!response.success()) { + WARN("Fail to send request to {}, redirecting to {}", leader.to_string(), + response.has_redirect() ? response.redirect() : "nowhere"); + // update route table since we have redirect information + braft::rtb::update_leader(g_config.pd_group_id.ToString(), response.redirect()); + std::chrono::milliseconds duration(g_config.request_timeout_ms); + std::this_thread::sleep_for(duration); + continue; + } + + SetStoreID(response.store_id()); + return true; + } + + return false; + } + + return true; +} + +std::shared_ptr PStore::GetBackend(int64_t db_id) { + std::shared_lock lock(store_mutex_); + auto it = backends_table_.find(db_id); + if (it != backends_table_.end()) { + return it->second; + } + + WARN("the db of {} is not exist!", db_id); + return nullptr; +} + +std::shared_ptr PStore::GetDBByGroupID(const std::string& group_id) { + std::shared_lock lock(store_mutex_); + auto it = group_id_of_db_id_.find(group_id); + if (it != group_id_of_db_id_.end()) { + return GetBackend(it->second); + } + + WARN("the group_id of {} is not exist!", group_id); + return nullptr; +} + +pstd::Status PStore::AddBackend(int64_t db_id, std::string&& group_id) { + std::lock_guard lock(store_mutex_); + auto it = backends_table_.find(db_id); + if (it != backends_table_.end()) { + return pstd::Status::OK(); + } + + backends_table_.insert({db_id, std::make_shared(db_id, g_config.db_path)}); + group_id_of_db_id_.insert({group_id, db_id}); + return backends_table_[db_id]->Init(std::move(group_id)); +} + +pstd::Status PStore::RemoveBackend(int64_t db_id) { + std::lock_guard lock(store_mutex_); + auto it = backends_table_.find(db_id); + if (it != backends_table_.end()) { + group_id_of_db_id_.erase(it->second->GetPRaft()->GetGroupID()); + backends_table_.erase(it); + } + + return pstd::Status::OK(); +} + void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { - if (task.db < 0 || task.db >= db_number_) { - WARN("The database index is out of range."); + auto db = GetBackend(task.db); + if (db == nullptr) { + WARN("The database of db_id is not exit."); return; } - auto& db = backends_.at(task.db); + switch (task.type) { case kCheckpoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { @@ -73,4 +272,5 @@ void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { } }); } + } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index e7daf3d4..19d4510b 100644 --- a/src/store.h +++ b/src/store.h @@ -7,17 +7,35 @@ #pragma once +#include +#include +#include +#include +#include + +#include "praft/praft.h" +#include "praft/praft_service.h" + #define GLOG_NO_ABBREVIATED_SEVERITIES -#include +#include +#include #include +#include #include -#include "common.h" +#include "brpc/server.h" +#include "butil/endpoint.h" + #include "db.h" +#include "pd/pd_service.h" +#include "praft/praft_service.h" +#include "pstd/pstd_status.h" #include "storage/storage.h" +#include "store_service.h" namespace pikiwidb { +class RaftServiceImpl; enum TaskType { kCheckpoint = 0, kLoadDBFromCheckpoint, kEmpty }; @@ -32,13 +50,20 @@ struct TaskContext { bool sync = false; TaskContext() = delete; TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} - TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} - TaskContext(TaskType t, int d, const std::map& a, bool s = false) + TaskContext(TaskType t, uint32_t d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, uint32_t d, const std::map& a, bool s = false) : type(t), db(d), args(a), sync(s) {} }; using TasksVector = std::vector; +enum PRaftErrorCode { + kErrorDisMatch = 0, + kErrorAddNode, + kErrorRemoveNode, + kErrorReDirect, +}; + class PStore { public: static PStore& Instance(); @@ -47,18 +72,42 @@ class PStore { void operator=(const PStore&) = delete; ~PStore(); - void Init(int db_number); + void Init(); + bool InitRpcServer(); + bool RegisterStoreToPDServer(); + + void SetStoreID(int64_t store_id) { store_id_.store(store_id, std::memory_order_relaxed); } + + int64_t GetStoreID() { return store_id_.load(std::memory_order_relaxed); } - std::unique_ptr& GetBackend(int32_t index) { return backends_[index]; }; + std::shared_ptr GetBackend(int64_t db_id); + + std::shared_ptr GetDBByGroupID(const std::string& group_id); + + pstd::Status AddBackend(int64_t db_id, std::string&& group_id); + + pstd::Status RemoveBackend(int64_t db_id); void HandleTaskSpecificDB(const TasksVector& tasks); - int GetDBNumber() const { return db_number_; } + const butil::EndPoint& GetEndPoint() const { return endpoint_; } private: PStore() = default; - int db_number_ = 0; - std::vector> backends_; + + std::atomic store_id_ = {0}; + butil::EndPoint endpoint_; + + std::unique_ptr rpc_server_{nullptr}; + std::unique_ptr praft_service_{nullptr}; // praft service + std::unique_ptr pd_service_{nullptr}; // pd service + std::unique_ptr store_service_{nullptr}; // store service + + std::shared_mutex store_mutex_; + std::unordered_map> backends_table_; // / + std::unordered_map group_id_of_db_id_; // / + + std::atomic is_started_ = {false}; }; #define PSTORE PStore::Instance() diff --git a/src/store.proto b/src/store.proto new file mode 100644 index 00000000..f858ca9c --- /dev/null +++ b/src/store.proto @@ -0,0 +1,129 @@ +syntax="proto3"; +package pikiwidb; +option cc_generic_services = true; + +message StoreStatsRequest { + bool only_heartbeat = 1; +}; + +message StoreStatsResponse { + bool success = 1; + optional StoreStats store_stats = 2; +}; + +message StoreStats { + int64 store_id = 1; // store ID + optional int64 capacity = 2; // store storage capacity + optional int64 available = 3; // store Remaining available size + optional int32 region_count = 4; // number of regions + optional int32 leader_region_count = 5; // number of leader + optional int32 sending_snap_count = 6; // number of snapshots sent currently + optional int32 receiving_snap_count = 7; // number of snapshots recvd currently + optional int32 applying_snap_count = 8; // number of regions where snapshots are being applied + optional int64 start_time = 9; // store startup time (Unix timestamp, milliseconds) + optional bool busy = 10; // whether store is busy + optional int64 bytes_written = 11; // number of bytes written to the store during the current period + optional int64 bytes_read = 12; // number of bytes read for the store during the period + optional int64 keys_written = 13; // number of keys written to the store in the current period + optional int64 keys_read = 14; // indicates the number of keys read from the store in the current period +} + +// only send to leader +message RegionStatsRequest { + bool only_heartbeat = 1; +}; + + +message RegionStatsResponse { + bool success = 1; + optional RegionStats region_stats = 2; +} + +message RegionStats { + int64 region_id = 1; // region ID + optional Peer leader = 2; // leader + optional DownPeers down_peers = 3; // leader considers that these peers are offline + optional PendingPeers pending_peers = 4; // leader Indicates the peer that cannot determine whether it is working properly + optional int64 bytes_written = 5; // number of bytes written to the region during the current period + optional int64 bytes_read = 6; // number of bytes read for the region during the period + optional int64 keys_written = 7; // number of keys written to the region in the current period + optional int64 keys_read = 8; // number of keys written to the region in the current period + optional int64 approximate_size = 9; // approximate area size + optional int64 approximate_keys = 10; // approximate key number +}; + +message DownPeers { + repeated Peer down_peers = 1; +} + +message PendingPeers { + repeated Peer pending_peer = 1; +} + +message Peer { + int64 peer_id = 1; + int64 store_id = 2; + string ip = 3; + int32 port = 4; +}; + +message RegionOptions { + int64 region_id = 1; + optional string start_key = 2; + optional string end_key = 3; + string raft_group_id = 4; +}; + +message InitRegionPeerRequest { + RegionOptions region_options = 1; +}; + +message InitRegionPeerResponse { + bool success = 1; + optional int64 peer_id = 2; +}; + +message AddRegionPeerRequest { + int64 region_id = 1; + Peer peer = 2; +}; + +message AddRegionResponse { + bool success = 1; + optional Peer old_peers = 2; + optional Peer new_peers = 3; +}; + +message RemoveRegionPeerRequest { + int64 region_id = 1; + Peer peer = 2; +}; + +message RemoveRegionPeerResponse { + bool success = 1; + optional Peer old_peers = 2; + optional Peer new_peers = 3; +}; + +message TransferLeaderRequest { + int64 region_id = 1; + Peer peer = 2; +}; + +message TransferLeaderResponse { + bool success = 1; +}; + +service StoreService { + rpc GetStoreStats(StoreStatsRequest) returns (StoreStatsResponse); + + rpc GetRegionStats(RegionStatsRequest) returns (RegionStatsResponse); + + rpc InitRegionPeer(InitRegionPeerRequest) returns (InitRegionPeerResponse); + + rpc AddRegionPeer(AddRegionPeerRequest) returns (AddRegionResponse); + + rpc RemoveRegionPeer(RemoveRegionPeerRequest) returns (RemoveRegionPeerResponse); + + rpc TransferLeader(TransferLeaderRequest) returns (TransferLeaderResponse); +}; diff --git a/src/store_service.cc b/src/store_service.cc new file mode 100644 index 00000000..b2bafbee --- /dev/null +++ b/src/store_service.cc @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "store_service.h" + +namespace pikiwidb { +void StoreServiceImpl::GetStoreStats(::google::protobuf::RpcController* controller, + const ::pikiwidb::StoreStatsRequest* request, + ::pikiwidb::StoreStatsResponse* response, ::google::protobuf::Closure* done) {} + +void StoreServiceImpl::GetRegionStats(::google::protobuf::RpcController* controller, + const ::pikiwidb::RegionStatsRequest* request, + ::pikiwidb::RegionStatsResponse* response, ::google::protobuf::Closure* done) {} + +void StoreServiceImpl::InitRegionPeer(::google::protobuf::RpcController* controller, + const ::pikiwidb::InitRegionPeerRequest* request, + ::pikiwidb::InitRegionPeerResponse* response, ::google::protobuf::Closure* done) { +} + +void StoreServiceImpl::AddRegionPeer(::google::protobuf::RpcController* controller, + const ::pikiwidb::AddRegionPeerRequest* request, + ::pikiwidb::AddRegionResponse* response, ::google::protobuf::Closure* done) {} + +void StoreServiceImpl::RemoveRegionPeer(::google::protobuf::RpcController* controller, + const ::pikiwidb::RemoveRegionPeerRequest* request, + ::pikiwidb::RemoveRegionPeerResponse* response, + ::google::protobuf::Closure* done) {} + +void StoreServiceImpl::TransferLeader(::google::protobuf::RpcController* controller, + const ::pikiwidb::TransferLeaderRequest* request, + ::pikiwidb::TransferLeaderResponse* response, ::google::protobuf::Closure* done) { +} +} // namespace pikiwidb diff --git a/src/store_service.h b/src/store_service.h new file mode 100644 index 00000000..930e88fa --- /dev/null +++ b/src/store_service.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include "store.pb.h" + +namespace pikiwidb { + +class StoreServiceImpl : public StoreService { + public: + StoreServiceImpl() = default; + + void GetStoreStats(::google::protobuf::RpcController* controller, const ::pikiwidb::StoreStatsRequest* request, + ::pikiwidb::StoreStatsResponse* response, ::google::protobuf::Closure* done) override; + + void GetRegionStats(::google::protobuf::RpcController* controller, const ::pikiwidb::RegionStatsRequest* request, + ::pikiwidb::RegionStatsResponse* response, ::google::protobuf::Closure* done) override; + + void InitRegionPeer(::google::protobuf::RpcController* controller, const ::pikiwidb::InitRegionPeerRequest* request, + ::pikiwidb::InitRegionPeerResponse* response, ::google::protobuf::Closure* done) override; + + void AddRegionPeer(::google::protobuf::RpcController* controller, const ::pikiwidb::AddRegionPeerRequest* request, + ::pikiwidb::AddRegionResponse* response, ::google::protobuf::Closure* done) override; + + void RemoveRegionPeer(::google::protobuf::RpcController* controller, + const ::pikiwidb::RemoveRegionPeerRequest* request, + ::pikiwidb::RemoveRegionPeerResponse* response, ::google::protobuf::Closure* done) override; + + void TransferLeader(::google::protobuf::RpcController* controller, const ::pikiwidb::TransferLeaderRequest* request, + ::pikiwidb::TransferLeaderResponse* response, ::google::protobuf::Closure* done) override; +}; + +} // namespace pikiwidb diff --git a/tests/consistency_test.go b/tests/consistency_test.go index 019b4a76..8d8f3783 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -775,12 +775,12 @@ var _ = Describe("Consistency", Ordered, func() { } { // set write on leader - set, err := leader.SetEx(ctx, testKey, testValue, 3).Result() + set, err := leader.SetEx(ctx, testKey, testValue, 3*time.Second).Result() Expect(err).NotTo(HaveOccurred()) Expect(set).To(Equal("OK")) // read check - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) readChecker(func(c *redis.Client) { _, err := c.Get(ctx, testKey).Result() Expect(err).To(Equal(redis.Nil))