Skip to content

Commit

Permalink
[RPC] Change framework initialization RPC call.
Browse files Browse the repository at this point in the history
  • Loading branch information
sukritkalra committed Feb 12, 2024
1 parent 3732e5c commit 0444da9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
7 changes: 4 additions & 3 deletions rpc/protos/erdos_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ service SchedulerService {
}

message RegisterFrameworkRequest {
string framework_name = 1;
string framework_uri = 2;
string name = 1;
string uri = 2;
uint64 timestamp = 3;
}

message RegisterFrameworkResponse {
bool success = 1;
string response_message = 2;
string message = 2;
}
31 changes: 21 additions & 10 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
)

import erdos_scheduler_pb2
import erdos_scheduler_pb2_grpc
import grpc
from absl import app, flags
Expand All @@ -26,23 +27,33 @@
class SchedulerServiceServicer(erdos_scheduler_pb2_grpc.SchedulerServiceServicer):
def __init__(self) -> None:
"""Initialize the service, and setup the logger."""
self._logger = setup_logging(name=FLAGS.log_file, level=FLAGS.log_level)
# Values used by the Servicer.
self._logger = setup_logging(name=FLAGS.log_file, log_level=FLAGS.log_level)
self._initialized = False
self._initialization_time = -1
self._master_uri = None
super().__init__()

def RegisterFramework(self, request, context):
"""Registers a new framework with the backend scheduler.
This is the entry point for a new instance of Spark / Flink to register
itself with the backend scheduler, and is intended as an EHLO.
"""
framework_name = request.framework_name
framework_uri = request.framework_uri
self._logger.info(
"Registering framework %s with URI %s", framework_name, framework_uri
)
return erdos_scheduler_pb2_grpc.RegisterFrameworkResponse(
if self._initialized:
return erdos_scheduler_pb2.RegisterFrameworkResponse(
success=False,
message=f"Framework already registered at "
f"{self._initialization_time} at the address {self._master_uri}",
)

# Setup a new Framework instance.
framework_name = request.name
self._master_uri = request.uri
self._initialization_time = request.timestamp
print("Registering framework %s with URI %s", framework_name, self._master_uri)
return erdos_scheduler_pb2.RegisterFrameworkResponse(
success=True,
message=f"Framework {framework_name} at {framework_uri} "
f"registered successfully!",
message=f"{framework_name} at {self._master_uri} registered successfully!",
)


Expand All @@ -51,7 +62,7 @@ def serve(args):
# Initialize the server.
server = grpc.server(futures.ThreadPoolExecutor(max_workers=FLAGS.max_workers))
erdos_scheduler_pb2_grpc.add_SchedulerServiceServicer_to_server(
erdos_scheduler_pb2_grpc.SchedulerServiceServicer(), server
SchedulerServiceServicer(), server
)

# Start the server.
Expand Down

0 comments on commit 0444da9

Please sign in to comment.