Skip to content

Commit

Permalink
fix minor apple silicon issues with python3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
pfandzelter committed Sep 15, 2023
1 parent 100424a commit 40250e8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 42 deletions.
114 changes: 82 additions & 32 deletions celestial/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@

from proto.celestial import celestial_pb2, celestial_pb2_grpc

class ConnectionManager():

class ConnectionManager:
def __init__(
self,
hosts: typing.List[str],
peeringhosts: typing.List[str],
allowed_concurrent: int = 512
allowed_concurrent: int = 512,
):

self.stubs: typing.List[celestial_pb2_grpc.CelestialStub] = []
stubs: typing.List[celestial_pb2_grpc.CelestialStub] = []

for host in hosts:
channel = grpc.insecure_channel(host)
self.stubs.append(celestial_pb2_grpc.CelestialStub(channel))
stubs.append(celestial_pb2_grpc.CelestialStub(channel))

self.hosts = hosts

Expand All @@ -56,31 +56,75 @@ def __init__(

for i in range(len(self.hosts)):
irr.index = i
self.stubs[i].InitRemotes(irr)
stubs[i].InitRemotes(irr)

for i in range(len(self.hosts)):
e = celestial_pb2.Empty()
self.stubs[i].StartPeering(e)
stubs[i].StartPeering(e)

def init_mutex(self) -> None:
self.stubs: typing.List[celestial_pb2_grpc.CelestialStub] = []

for host in self.hosts:
channel = grpc.insecure_channel(host)
self.stubs.append(celestial_pb2_grpc.CelestialStub(channel))

self.mutexes: typing.Dict[str, td.Semaphore] = {}

for host in self.hosts:
self.mutexes[host] = td.Semaphore(self.allowed_concurrent)

def __register(self, conn: MachineConnector, bandwidth: int, active: bool, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str) -> None:

def __register(
self,
conn: MachineConnector,
bandwidth: int,
active: bool,
vcpu_count: int,
mem_size_mib: int,
ht_enabled: bool,
disk_size_mib: int,
kernel: str,
rootfs: str,
bootparams: str,
) -> None:
self.mutexes[conn.host].acquire()
try:
conn.create_machine(vcpu_count=vcpu_count, mem_size_mib=mem_size_mib, ht_enabled=ht_enabled, disk_size_mib=disk_size_mib, kernel=kernel, rootfs=rootfs, bootparams=bootparams, active=active, bandwidth=bandwidth)
conn.create_machine(
vcpu_count=vcpu_count,
mem_size_mib=mem_size_mib,
ht_enabled=ht_enabled,
disk_size_mib=disk_size_mib,
kernel=kernel,
rootfs=rootfs,
bootparams=bootparams,
active=active,
bandwidth=bandwidth,
)
except Exception as e:
print("❌ caught exception while trying to create machine %d shell %d:" % (conn.id, conn.shell), e)
print(
"❌ caught exception while trying to create machine %d shell %d:"
% (conn.id, conn.shell),
e,
)

self.mutexes[conn.host].release()


def register_machine(self, shell_no: int, id: int, bandwidth: int, active: bool, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str, host_affinity: typing.List[int], name: str="") -> MachineConnector:

def register_machine(
self,
shell_no: int,
id: int,
bandwidth: int,
active: bool,
vcpu_count: int,
mem_size_mib: int,
ht_enabled: bool,
disk_size_mib: int,
kernel: str,
rootfs: str,
bootparams: str,
host_affinity: typing.List[int],
name: str = "",
) -> MachineConnector:
# assign a random stub to this connection
#
# how do we get a host for a machine? serveral possibilities
Expand All @@ -97,23 +141,25 @@ def register_machine(self, shell_no: int, id: int, bandwidth: int, active: bool,

conn = MachineConnector(stub=stub, host=host, shell=shell_no, id=id, name=name)

td.Thread(target=self.__register, kwargs={
"conn": conn,
"vcpu_count": vcpu_count,
"mem_size_mib": mem_size_mib,
"ht_enabled": ht_enabled,
"disk_size_mib": disk_size_mib,
"kernel": kernel,
"rootfs": rootfs,
"bootparams": bootparams,
"active": active,
"bandwidth": bandwidth
}).start()
td.Thread(
target=self.__register,
kwargs={
"conn": conn,
"vcpu_count": vcpu_count,
"mem_size_mib": mem_size_mib,
"ht_enabled": ht_enabled,
"disk_size_mib": disk_size_mib,
"kernel": kernel,
"rootfs": rootfs,
"bootparams": bootparams,
"active": active,
"bandwidth": bandwidth,
},
).start()

return conn

def collect_host_infos(self) -> typing.Tuple[int, int, int]:

cpu_count = 0
mem = 0
machine_count = 0
Expand All @@ -129,7 +175,7 @@ def collect_host_infos(self) -> typing.Tuple[int, int, int]:

machine_count += 1
cpu_count += info.cpu
mem += info.mem/1000000
mem += info.mem / 1000000

return machine_count, cpu_count, mem

Expand All @@ -141,7 +187,6 @@ def block_host_ready(self, tbar: tqdm.tqdm, total_machines: int) -> None:

while not all(ready):
for i in range(len(self.hosts)):

if ready[i]:
continue

Expand All @@ -167,8 +212,13 @@ def block_host_ready(self, tbar: tqdm.tqdm, total_machines: int) -> None:
if not sum(total) == total_machines:
raise ValueError("reported created machines not equal total machines")

def init(self, db: bool, db_host: typing.Optional[str], shell_count: int, shells: typing.List[ShellConfig]) -> None:

def init(
self,
db: bool,
db_host: typing.Optional[str],
shell_count: int,
shells: typing.List[ShellConfig],
) -> None:
isr = celestial_pb2.InitRequest()

isr.database = db
Expand All @@ -189,4 +239,4 @@ def init(self, db: bool, db_host: typing.Optional[str], shell_count: int, shells

stub = celestial_pb2_grpc.CelestialStub(channel)

res = stub.Init(isr)
res = stub.Init(isr)
33 changes: 23 additions & 10 deletions celestial/machine_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,35 @@

from proto.celestial import celestial_pb2, celestial_pb2_grpc

class MachineConnector():

class MachineConnector:
def __init__(
self,
stub: celestial_pb2_grpc.CelestialStub,
host: str,
shell: int,
id: int,
name: str=""
name: str = "",
):

self.stub = stub
self.host = host

self.shell = shell
self.id = id
self.name = name

def create_machine(self, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str, active: bool, bandwidth: int) -> None:

def create_machine(
self,
vcpu_count: int,
mem_size_mib: int,
ht_enabled: bool,
disk_size_mib: int,
kernel: str,
rootfs: str,
bootparams: str,
active: bool,
bandwidth: int,
) -> None:
cmr = celestial_pb2.CreateMachineRequest()
cmr.machine.shell = self.shell
cmr.machine.id = self.id
Expand All @@ -55,22 +65,25 @@ def create_machine(self, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, d

cmr.networkconfig.bandwidth = bandwidth

cmr.status = active
cmr.status = bool(active)

self.stub.CreateMachine(cmr)

def modify_machine(self, active: bool) -> None:

r = celestial_pb2.ModifyMachineRequest()

r.machine.shell = self.shell
r.machine.id = self.id

r.status = active
r.status = bool(active)

td.Thread(target=self.stub.ModifyMachine, args=(r,)).start()

def modify_links(self, remove_set: typing.List[typing.Dict[str,int]], modify_set: typing.List[typing.Dict[str,typing.Union[int, float]]]) -> None:
def modify_links(
self,
remove_set: typing.List[typing.Dict[str, int]],
modify_set: typing.List[typing.Dict[str, typing.Union[int, float]]],
) -> None:
r = celestial_pb2.ModifyLinksRequest()

r.a.shell = self.shell
Expand All @@ -96,4 +109,4 @@ def modify_links(self, remove_set: typing.List[typing.Dict[str,int]], modify_set
r.modify.append(ml)

if len(remove_set) + len(modify_set) > 0:
self.stub.ModifyLinks(r)
self.stub.ModifyLinks(r)

0 comments on commit 40250e8

Please sign in to comment.