Skip to content

Commit

Permalink
Merge pull request #196 from neuralinternet/close-port-issue
Browse files Browse the repository at this point in the history
Fix the port check issue
  • Loading branch information
userhasaccess authored Sep 18, 2024
2 parents 58145c5 + 6202bd4 commit d6eb91a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 54 deletions.
4 changes: 2 additions & 2 deletions compute/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import string

# Define the version of the template module.
__version__ = "1.4.9"
__version__ = "1.5.0"
__minimal_miner_version__ = "1.4.9"
__minimal_validator_version__ = "1.4.9"
__minimal_validator_version__ = "1.5.0"

version_split = __version__.split(".")
__version_as_int__ = (100 * int(version_split[0])) + (10 * int(version_split[1])) + (1 * int(version_split[2]))
Expand Down
36 changes: 19 additions & 17 deletions neurons/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ class Miner:

miner_http_server: TCPServer



_axon: bt.axon

@property
Expand Down Expand Up @@ -185,6 +183,20 @@ def __init__(self):
self.wandb.update_specs()

# check allocation status
self.__check_alloaction_errors()

# Disable the Spec request and replaced with WanDB
# self.request_specs_processor = RequestSpecsProcessor()

self.last_updated_block = self.current_block - (self.current_block % 100)
self.allocate_action = False

# if (
# not self.wandb.sync_allocated(self.wallet.hotkey.ss58_address)
# or not allocation_key_encoded
# ):
# self.miner_http_server = start_server(self.config.ssh.port)
def __check_alloaction_errors(self):
file_path = "allocation_key"
allocation_key_encoded = None
if os.path.exists(file_path):
Expand All @@ -210,19 +222,6 @@ def __init__(self):
bt.logging.info(
"Container is already running without allocated. Killing the container."
)

# Disable the Spec request and replaced with WanDB
# self.request_specs_processor = RequestSpecsProcessor()

self.last_updated_block = self.current_block - (self.current_block % 100)
self.allocate_action = False

if (
not self.wandb.sync_allocated(self.wallet.hotkey.ss58_address)
or not allocation_key_encoded
):
self.miner_http_server = start_server(self.config.ssh.port)

def init_axon(self):
# Step 6: Build and link miner functions to the axon.
# The axon handles request processing, allowing validators to send this process requests.
Expand Down Expand Up @@ -472,7 +471,7 @@ def allocate(self, synapse: Allocate) -> Allocate:
if timeline > 0:
if self.allocate_action == False:
self.allocate_action = True
stop_server(self.miner_http_server)
# stop_server(self.miner_http_server)
result = register_allocation(timeline, device_requirement, public_key, docker_requirement)
self.allocate_action = False
synapse.output = result
Expand All @@ -481,7 +480,7 @@ def allocate(self, synapse: Allocate) -> Allocate:
synapse.output = {"status": False}
else:
result = deregister_allocation(public_key)
self.miner_http_server = start_server(self.config.ssh.port)
# self.miner_http_server = start_server(self.config.ssh.port)
synapse.output = result
self.update_allocation(synapse)
synapse.output["port"] = int(self.config.ssh.port)
Expand Down Expand Up @@ -669,6 +668,9 @@ async def start(self):
else:
bt.logging.warning(f"API: Could not find the server port that was provided to validator")
self.wandb.update_miner_port_open(result)

# check allocation status
self.__check_alloaction_errors()

# Log chain data to wandb
chain_data = {
Expand Down
90 changes: 55 additions & 35 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,25 @@ def sync_status(self):
current_version = __version_as_int__
if subnet_prometheus_version != current_version:
self.init_prometheus(force_update=True)
def remove_duplicate_penalized_hotkeys(self):
"""
Removes any duplicate entries in the penalized_hotkeys_checklist
based on the 'hotkey' field.
"""
seen = set()
unique_penalized_list = []

for item in self.penalized_hotkeys_checklist:
if item['hotkey'] not in seen:
unique_penalized_list.append(item)
seen.add(item['hotkey'])

self.penalized_hotkeys_checklist = unique_penalized_list
bt.logging.info("Removed duplicate hotkeys from penalized_hotkeys_checklist.")

def sync_checklist(self):
self.threads = []
self.penalized_hotkeys_checklist = self.wandb.get_penalized_hotkeys_checklist(self.get_valid_validator_hotkeys(), True)
for i in range(0, len(self.uids), self.validator_challenge_batch_size):
for _uid in self.uids[i : i + self.validator_challenge_batch_size]:
try:
Expand All @@ -373,13 +389,16 @@ def sync_checklist(self):
)
except KeyError:
continue
self.remove_duplicate_penalized_hotkeys()

for thread in self.threads:
thread.start()

for thread in self.threads:
thread.join()

self.wandb.update_penalized_hotkeys_checklist(self.penalized_hotkeys_checklist)

def sync_miners_info(self, queryable_tuple_uids_axons: List[Tuple[int, bt.AxonInfo]]):
if queryable_tuple_uids_axons:
for uid, axon in queryable_tuple_uids_axons:
Expand Down Expand Up @@ -566,45 +585,46 @@ def execute_miner_checking_request(self, uid, axon: bt.AxonInfo):
response = dendrite.query(axon, Allocate(timeline=1, checking=True), timeout=30)
port = response.get("port", "")
status = response.get("status", False)

checklist_hotkeys = [item['hotkey'] for item in self.penalized_hotkeys_checklist]
if port:
is_port_open = check_port(axon.ip, port)
penalized_hotkeys_checklist = self.wandb.get_penalized_hotkeys_checklist(self.get_valid_validator_hotkeys(), True)
checklist_hotkeys = [item['hotkey'] for item in penalized_hotkeys_checklist]

if is_port_open:
if not status:
is_port_open = check_port(axon.ip, port)
if (axon.hotkey not in checklist_hotkeys )and (not is_port_open):
self.penalized_hotkeys_checklist.append({"hotkey": axon.hotkey, "status_code": "PORT_CLOSED", "description": "The port of ssh server is closed"})
bt.logging.info(
f"Debug {Allocate.__name__} - status of Checking allocation - {status} {uid} - Port is closed and not usable, even though it has been allocated."
)
else:
bt.logging.info(f"Debug {Allocate.__name__} - status of Checking allocation - {status} {uid} - Port is open and the miner is allocated")
else:
is_ssh_access = True
bt.logging.info(f"Debug {Allocate.__name__} - status of Checking allocation - {status} {uid}")
if status is True: # if it's able to check allocation
private_key, public_key = rsa.generate_key_pair()
device_requirement = {"cpu": {"count": 1}, "gpu": {}, "hard_disk": {"capacity": 1073741824}, "ram": {"capacity": 1073741824}}

try:
response = dendrite.query(axon, Allocate(timeline=1, device_requirement=device_requirement, checking=False, public_key=public_key), timeout=60)
if response and response["status"] is True:
bt.logging.info(f"Debug {Allocate.__name__} - Successfully Allocated - {uid}")
private_key = private_key.encode("utf-8")
decrypted_info_str = rsa.decrypt_data(private_key, base64.b64decode(response["info"]))
info = json.loads(decrypted_info_str)
is_ssh_access = check_ssh_login(axon.ip, port, info['username'], info['password'])
except Exception as e:
bt.logging.error(f"{e}")
return

allocation_status = False
private_key, public_key = rsa.generate_key_pair()
device_requirement = {"cpu": {"count": 1}, "gpu": {}, "hard_disk": {"capacity": 1073741824}, "ram": {"capacity": 1073741824}}
try:
response = dendrite.query(axon, Allocate(timeline=1, device_requirement=device_requirement, checking=False, public_key=public_key), timeout=60)
if response and response["status"] is True:
allocation_status = True
bt.logging.info(f"Debug {Allocate.__name__} - Successfully Allocated - {uid}")
private_key = private_key.encode("utf-8")
decrypted_info_str = rsa.decrypt_data(private_key, base64.b64decode(response["info"]))
info = json.loads(decrypted_info_str)
is_ssh_access = check_ssh_login(axon.ip, port, info['username'], info['password'])
except Exception as e:
bt.logging.error(f"{e}")
while True and allocation_status:
deregister_response = dendrite.query(axon, Allocate(timeline=0, checking=False, public_key=public_key), timeout=60)
if deregister_response and deregister_response["status"] is True:
bt.logging.info(f"Debug {Allocate.__name__} - Deallocated - {uid}")


break
else:
bt.logging.error(f"Debug {Allocate.__name__} - Failed to deallocate - {uid} will retry in 5 seconds")
time.sleep(5)
if axon.hotkey in checklist_hotkeys:
penalized_hotkeys_checklist = [item for item in penalized_hotkeys_checklist if item['hotkey'] != axon.hotkey]
self.penalized_hotkeys_checklist = [item for item in self.penalized_hotkeys_checklist if item['hotkey'] != axon.hotkey]
if not is_ssh_access:
penalized_hotkeys_checklist.append({"hotkey": axon.hotkey, "status_code": "SSH_ACCESS_DISABLED", "description": "It can not access to the server via ssh"})
else:
if axon.hotkey not in checklist_hotkeys:
penalized_hotkeys_checklist.append({"hotkey": axon.hotkey, "status_code": "PORT_CLOSED", "description": "The port of ssh server is closed"})

self.wandb.update_penalized_hotkeys_checklist(penalized_hotkeys_checklist)
bt.logging.info(f"Debug {Allocate.__name__} - status of Checking allocation - {status} {uid} - SSH access is disabled")
self.penalized_hotkeys_checklist.append({"hotkey": axon.hotkey, "status_code": "SSH_ACCESS_DISABLED", "description": "It can not access to the server via ssh"})

def execute_specs_request(self):
if len(self.queryable_for_specs) > 0:
Expand Down Expand Up @@ -686,11 +706,11 @@ def execute_specs_request(self):
bt.logging.info(f"{hotkey} - {specs}")
"""
self.finalized_specs_once = True

def get_specs_wandb(self):

bt.logging.info(f"💻 Hardware list of uids queried (Wandb): {list(self._queryable_uids.keys())}")

specs_dict = self.wandb.get_miner_specs(self._queryable_uids)
# Update the local db with the data from wandb
update_miner_details(self.db, list(specs_dict.keys()), list(specs_dict.values()))
Expand Down

0 comments on commit d6eb91a

Please sign in to comment.