From 40250e8471dd1db300aca9fdf0e9366d904d5b52 Mon Sep 17 00:00:00 2001 From: Tobias Pfandzelter Date: Fri, 15 Sep 2023 15:41:24 +0200 Subject: [PATCH] fix minor apple silicon issues with python3.9 --- celestial/connection_manager.py | 114 +++++++++++++++++++++++--------- celestial/machine_connector.py | 33 ++++++--- 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/celestial/connection_manager.py b/celestial/connection_manager.py index 47b5930..29396f3 100644 --- a/celestial/connection_manager.py +++ b/celestial/connection_manager.py @@ -27,19 +27,19 @@ from proto.celestial import celestial_pb2, celestial_pb2_grpc -class ConnectionManager(): + +class ConnectionManager: def __init__( self, hosts: typing.List[str], peeringhosts: typing.List[str], - allowed_concurrent: int = 512 + allowed_concurrent: int = 512, ): - - self.stubs: typing.List[celestial_pb2_grpc.CelestialStub] = [] + stubs: typing.List[celestial_pb2_grpc.CelestialStub] = [] for host in hosts: channel = grpc.insecure_channel(host) - self.stubs.append(celestial_pb2_grpc.CelestialStub(channel)) + stubs.append(celestial_pb2_grpc.CelestialStub(channel)) self.hosts = hosts @@ -56,31 +56,75 @@ def __init__( for i in range(len(self.hosts)): irr.index = i - self.stubs[i].InitRemotes(irr) + stubs[i].InitRemotes(irr) for i in range(len(self.hosts)): e = celestial_pb2.Empty() - self.stubs[i].StartPeering(e) + stubs[i].StartPeering(e) def init_mutex(self) -> None: + self.stubs: typing.List[celestial_pb2_grpc.CelestialStub] = [] + + for host in self.hosts: + channel = grpc.insecure_channel(host) + self.stubs.append(celestial_pb2_grpc.CelestialStub(channel)) + self.mutexes: typing.Dict[str, td.Semaphore] = {} for host in self.hosts: self.mutexes[host] = td.Semaphore(self.allowed_concurrent) - def __register(self, conn: MachineConnector, bandwidth: int, active: bool, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str) -> None: - + def __register( + self, + conn: MachineConnector, + bandwidth: int, + active: bool, + vcpu_count: int, + mem_size_mib: int, + ht_enabled: bool, + disk_size_mib: int, + kernel: str, + rootfs: str, + bootparams: str, + ) -> None: self.mutexes[conn.host].acquire() try: - conn.create_machine(vcpu_count=vcpu_count, mem_size_mib=mem_size_mib, ht_enabled=ht_enabled, disk_size_mib=disk_size_mib, kernel=kernel, rootfs=rootfs, bootparams=bootparams, active=active, bandwidth=bandwidth) + conn.create_machine( + vcpu_count=vcpu_count, + mem_size_mib=mem_size_mib, + ht_enabled=ht_enabled, + disk_size_mib=disk_size_mib, + kernel=kernel, + rootfs=rootfs, + bootparams=bootparams, + active=active, + bandwidth=bandwidth, + ) except Exception as e: - print("❌ caught exception while trying to create machine %d shell %d:" % (conn.id, conn.shell), e) + print( + "❌ caught exception while trying to create machine %d shell %d:" + % (conn.id, conn.shell), + e, + ) self.mutexes[conn.host].release() - - def register_machine(self, shell_no: int, id: int, bandwidth: int, active: bool, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str, host_affinity: typing.List[int], name: str="") -> MachineConnector: - + def register_machine( + self, + shell_no: int, + id: int, + bandwidth: int, + active: bool, + vcpu_count: int, + mem_size_mib: int, + ht_enabled: bool, + disk_size_mib: int, + kernel: str, + rootfs: str, + bootparams: str, + host_affinity: typing.List[int], + name: str = "", + ) -> MachineConnector: # assign a random stub to this connection # # how do we get a host for a machine? serveral possibilities @@ -97,23 +141,25 @@ def register_machine(self, shell_no: int, id: int, bandwidth: int, active: bool, conn = MachineConnector(stub=stub, host=host, shell=shell_no, id=id, name=name) - td.Thread(target=self.__register, kwargs={ - "conn": conn, - "vcpu_count": vcpu_count, - "mem_size_mib": mem_size_mib, - "ht_enabled": ht_enabled, - "disk_size_mib": disk_size_mib, - "kernel": kernel, - "rootfs": rootfs, - "bootparams": bootparams, - "active": active, - "bandwidth": bandwidth - }).start() + td.Thread( + target=self.__register, + kwargs={ + "conn": conn, + "vcpu_count": vcpu_count, + "mem_size_mib": mem_size_mib, + "ht_enabled": ht_enabled, + "disk_size_mib": disk_size_mib, + "kernel": kernel, + "rootfs": rootfs, + "bootparams": bootparams, + "active": active, + "bandwidth": bandwidth, + }, + ).start() return conn def collect_host_infos(self) -> typing.Tuple[int, int, int]: - cpu_count = 0 mem = 0 machine_count = 0 @@ -129,7 +175,7 @@ def collect_host_infos(self) -> typing.Tuple[int, int, int]: machine_count += 1 cpu_count += info.cpu - mem += info.mem/1000000 + mem += info.mem / 1000000 return machine_count, cpu_count, mem @@ -141,7 +187,6 @@ def block_host_ready(self, tbar: tqdm.tqdm, total_machines: int) -> None: while not all(ready): for i in range(len(self.hosts)): - if ready[i]: continue @@ -167,8 +212,13 @@ def block_host_ready(self, tbar: tqdm.tqdm, total_machines: int) -> None: if not sum(total) == total_machines: raise ValueError("reported created machines not equal total machines") - def init(self, db: bool, db_host: typing.Optional[str], shell_count: int, shells: typing.List[ShellConfig]) -> None: - + def init( + self, + db: bool, + db_host: typing.Optional[str], + shell_count: int, + shells: typing.List[ShellConfig], + ) -> None: isr = celestial_pb2.InitRequest() isr.database = db @@ -189,4 +239,4 @@ def init(self, db: bool, db_host: typing.Optional[str], shell_count: int, shells stub = celestial_pb2_grpc.CelestialStub(channel) - res = stub.Init(isr) \ No newline at end of file + res = stub.Init(isr) diff --git a/celestial/machine_connector.py b/celestial/machine_connector.py index 4f3e9a2..bef453a 100644 --- a/celestial/machine_connector.py +++ b/celestial/machine_connector.py @@ -21,16 +21,16 @@ from proto.celestial import celestial_pb2, celestial_pb2_grpc -class MachineConnector(): + +class MachineConnector: def __init__( self, stub: celestial_pb2_grpc.CelestialStub, host: str, shell: int, id: int, - name: str="" + name: str = "", ): - self.stub = stub self.host = host @@ -38,8 +38,18 @@ def __init__( self.id = id self.name = name - def create_machine(self, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, disk_size_mib: int, kernel: str, rootfs: str, bootparams: str, active: bool, bandwidth: int) -> None: - + def create_machine( + self, + vcpu_count: int, + mem_size_mib: int, + ht_enabled: bool, + disk_size_mib: int, + kernel: str, + rootfs: str, + bootparams: str, + active: bool, + bandwidth: int, + ) -> None: cmr = celestial_pb2.CreateMachineRequest() cmr.machine.shell = self.shell cmr.machine.id = self.id @@ -55,22 +65,25 @@ def create_machine(self, vcpu_count: int, mem_size_mib: int, ht_enabled: bool, d cmr.networkconfig.bandwidth = bandwidth - cmr.status = active + cmr.status = bool(active) self.stub.CreateMachine(cmr) def modify_machine(self, active: bool) -> None: - r = celestial_pb2.ModifyMachineRequest() r.machine.shell = self.shell r.machine.id = self.id - r.status = active + r.status = bool(active) td.Thread(target=self.stub.ModifyMachine, args=(r,)).start() - def modify_links(self, remove_set: typing.List[typing.Dict[str,int]], modify_set: typing.List[typing.Dict[str,typing.Union[int, float]]]) -> None: + def modify_links( + self, + remove_set: typing.List[typing.Dict[str, int]], + modify_set: typing.List[typing.Dict[str, typing.Union[int, float]]], + ) -> None: r = celestial_pb2.ModifyLinksRequest() r.a.shell = self.shell @@ -96,4 +109,4 @@ def modify_links(self, remove_set: typing.List[typing.Dict[str,int]], modify_set r.modify.append(ml) if len(remove_set) + len(modify_set) > 0: - self.stub.ModifyLinks(r) \ No newline at end of file + self.stub.ModifyLinks(r)