Skip to content

Commit

Permalink
Support specification of node pool type in announcer
Browse files Browse the repository at this point in the history
  • Loading branch information
jaystarshot committed Feb 21, 2025
1 parent 03b8cd9 commit fe3a38b
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 2 deletions.
8 changes: 6 additions & 2 deletions presto-native-execution/presto_cpp/main/Announcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ std::string announcementBody(
const std::string& environment,
const std::string& nodeLocation,
const bool sidecar,
const std::vector<std::string>& connectorIds) {
const std::vector<std::string>& connectorIds,
const std::string& nodePoolType) {
std::string id =
boost::lexical_cast<std::string>(boost::uuids::random_generator()());

Expand All @@ -49,6 +50,7 @@ std::string announcementBody(
{"coordinator", false},
{"sidecar", sidecar},
{"connectorIds", folly::join(',', connectorIds)},
{"pool_type", nodePoolType},
{uriScheme,
fmt::format("{}://{}:{}", uriScheme, address, port)}}}}}}};
return body.dump();
Expand Down Expand Up @@ -84,6 +86,7 @@ Announcer::Announcer(
const bool sidecar,
const std::vector<std::string>& connectorIds,
const uint64_t maxFrequencyMs,
const std::string& nodePoolType,
folly::SSLContextPtr sslContext)
: PeriodicServiceInventoryManager(
address,
Expand All @@ -100,7 +103,8 @@ Announcer::Announcer(
environment,
nodeLocation,
sidecar,
connectorIds)),
connectorIds,
nodePoolType)),
announcementRequest_(
announcementRequest(address, port, nodeId, announcementBody_)) {}

Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/Announcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Announcer : public PeriodicServiceInventoryManager {
const bool sidecar,
const std::vector<std::string>& connectorIds,
const uint64_t maxFrequencyMs_,
const std::string& nodePoolType,
folly::SSLContextPtr sslContext);

~Announcer() = default;
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ void PrestoServer::run() {
address_ = fmt::format("[{}]", address_);
}
nodeLocation_ = nodeConfig->nodeLocation();
nodePoolType_ = systemConfig->poolType();
prestoBuiltinFunctionPrefix_ = systemConfig->prestoDefaultNamespacePrefix();
} catch (const velox::VeloxUserError& e) {
PRESTO_STARTUP_LOG(ERROR) << "Failed to start server due to " << e.what();
Expand Down Expand Up @@ -579,6 +580,7 @@ void PrestoServer::run() {
systemConfig->prestoNativeSidecar(),
catalogNames,
systemConfig->announcementMaxFrequencyMs(),
nodePoolType_,
sslContext_);
updateAnnouncerDetails();
announcer_->start();
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class PrestoServer {
std::string nodeId_;
std::string address_;
std::string nodeLocation_;
std::string nodePoolType_;
folly::SSLContextPtr sslContext_;
std::string prestoBuiltinFunctionPrefix_;
};
Expand Down
12 changes: 12 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ SystemConfig::SystemConfig() {
BOOL_PROP(kEnableRuntimeMetricsCollection, false),
BOOL_PROP(kPlanValidatorFailOnNestedLoopJoin, false),
STR_PROP(kPrestoDefaultNamespacePrefix, "presto.default"),
STR_PROP(kPoolType, "DEFAULT"),
};
}

Expand Down Expand Up @@ -290,6 +291,17 @@ std::string SystemConfig::prestoVersion() const {
return requiredProperty(std::string(kPrestoVersion));
}

std::string SystemConfig::poolType() const {
static const std::unordered_set<std::string> validTypes = {"LEAF", "INTERMEDIATE", "DEFAULT"};
static constexpr std::string_view kPoolTypeDefault = "DEFAULT";
auto value = optionalProperty<std::string>(kPoolType).value_or(std::string(kPoolTypeDefault));
VELOX_USER_CHECK(
validTypes.count(value),
"{} must be one of 'LEAF', 'INTERMEDIATE', or 'DEFAULT'",
kPoolType);
return value;
}

bool SystemConfig::mutableConfig() const {
return optionalProperty<bool>(kMutableConfig).value();
}
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kPrestoDefaultNamespacePrefix{
"presto.default-namespace"};

// Specifies the type of worker pool
static constexpr std::string_view kPoolType{"pool-type"};

SystemConfig();

virtual ~SystemConfig() = default;
Expand Down Expand Up @@ -898,6 +901,8 @@ class SystemConfig : public ConfigBase {

bool prestoNativeSidecar() const;
std::string prestoDefaultNamespacePrefix() const;

std::string poolType() const;
};

/// Provides access to node properties defined in node.properties file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ TEST_P(AnnouncerTestSuite, basic) {
true,
{"hive", "tpch"},
500 /*milliseconds*/,
"DEFAULT",
useHttps ? sslContext_ : nullptr);

announcer.start();
Expand Down

0 comments on commit fe3a38b

Please sign in to comment.