Skip to content

Commit

Permalink
Merge pull request #35 from vantage6/proxy-and-network-rules
Browse files Browse the repository at this point in the history
PoC now running v6-compliant algorithms with central and partial functions
  • Loading branch information
hcadavid authored Nov 18, 2024
2 parents ed580bd + ff8a432 commit a00b3e4
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 137 deletions.
16 changes: 9 additions & 7 deletions integration_poc/container_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,21 +234,23 @@ def run(self, run_id: int, task_info: dict, image: str,

str_task_id = str(task_info["id"])
str_run_id = str(run_id)
parent_id = get_parent_id(task_info)
parent_id = str(get_parent_id(task_info))

_io_related_env_variables: List[V1EnvVar]

_volumes, _volume_mounts, _io_related_env_variables = self._create_volume_mounts(run_id=str_run_id,docker_input=docker_input,token=token,databases_to_use=databases_to_use)

# Setting the environment variables required by V6 algorithms.
# As these environment variables are used within the container/POD environment, file paths are relative
# to the mount paths (i.e., the container's file system) created by the method above (_crate_volume_mounts)
# to the mount paths (i.e., the container's file system) created by the method _crate_volume_mounts
#
env_vars: List[V1EnvVar] = [
#TODO Replace xxxxx with the FQDN of the proxy
client.V1EnvVar(name="HOST", value=os.environ.get("PROXY_SERVER_HOST","xxxxxxxxxx")),
client.V1EnvVar(name="PORT", value=os.environ.get("PROXY_SERVER_PORT", '8080')),
client.V1EnvVar(name="API_PATH", value=''),
env_vars: List[V1EnvVar] = [
client.V1EnvVar(name="HOST", value=os.environ.get("PROXY_SERVER_HOST",pod_node_constants.V6_NODE_FQDN)),
client.V1EnvVar(name="PORT", value=os.environ.get("PROXY_SERVER_PORT", str(pod_node_constants.V6_NODE_PROXY_PORT))),
#TODO This environment variable correspond to the API PATH of the PROXY (not to be confused of the one of the
# actual server). This variable should be eventually removed, as it is not being used to setup such PATH, so if
# it is changed to a value different than empty, it leads to an error.
client.V1EnvVar(name="API_PATH", value=""),
]

env_vars.extend(_io_related_env_variables)
Expand Down
52 changes: 0 additions & 52 deletions integration_poc/kubeconfs/network_policies.yaml

This file was deleted.

33 changes: 26 additions & 7 deletions integration_poc/kubeconfs/node_pod_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@ apiVersion: v1
kind: Pod
metadata:
name: v6-node-pod
namespace: v6-node
namespace: v6-jobs
labels:
app: v6-node-server-proxy
spec:
hostname: v6-node-proxy
subdomain: poc
hostAliases:
hostname: v6-node-pod # a better name yet to be defined
subdomain: v6-pod-subdomain # => POD FQDN: v6-node-pod.v6-pod-subdomain.v6-node
hostAliases: # only needed when working within a tailnet network
- ip: "10.2.67.147"
hostnames:
- "v6-server.tail984a0.ts.net"
containers:
- name: v6-node-server
image: docker.io/hcadavidescience/v6_k8s_node:latest
tty: true
ports:
- containerPort: 4567
env:
- name: HOST_IP
- name: HOST_IP # TODO check if this is necessary
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: PORT
value: "5000"
- name: PORT # TODO check if this is necessary
value: "4567"
command: ["python", "v6_k8s_node.py"]
volumeMounts:
- name: task-files-root
Expand All @@ -57,3 +61,18 @@ spec:
- name: v6-node-default-database
hostPath:
path: /home/hcadavid/k8s/v6-on-kubernetes-PoC/csv/employees.csv

---
apiVersion: v1
kind: Service
metadata:
name: v6proxy-subdomain
namespace: v6-jobs
spec:
selector:
app: v6-node-server-proxy
ports:
- protocol: TCP
port: 4567
targetPort: 4567

52 changes: 52 additions & 0 deletions integration_poc/kubeconfs/node_pod_network_policies.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#https://kubernetes.io/docs/tasks/administer-cluster/dns-debugging-resolution/

apiVersion: v1
kind: Namespace
metadata:
name: v6-node

---
apiVersion: v1
kind: Namespace
metadata:
name: v6-jobs

---

kind: NetworkPolicy # Policy to disable job's networking, except with the Node
apiVersion: networking.k8s.io/v1
metadata:
name: v6-jobs-allow-egress-to-node-proxy-only
namespace: v6-jobs
spec:
podSelector: {} # This selects all pods in the v6-jobs namespace
policyTypes:
- Egress
egress:
- {}
# - to:
# - podSelector:
# matchLabels:
# app: v6-node-server-proxy # This targets the dummy-v6-proxy pod
# ports:
# - protocol: TCP
# port: 4567
# - to:
# - namespaceSelector: {} # allow only the use DNS within the cluster
# ports:
# - protocol: UDP #This enables internal DNS resolution (otherwhise the static IP addresses of the proxy should be given)
# port: 53



#spec:
# podSelector:
# matchLabels:
# app: v6-node-server-proxy
# policyTypes:
# - Egress
# egress:
# - {} # This rule allows egress traffic to any destination (could be restricted to the V6-server only)



2 changes: 2 additions & 0 deletions integration_poc/pod_node_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
KUBE_CONFIG_FILE_PATH = '/app/.kube/config'
V6_NODE_CONFIG_FILE = '/app/.v6node/configs/node_legacy_config.yaml'
V6_NODE_DATABASE_BASE_PATH = '/app/.databases/'
V6_NODE_FQDN = 'http://v6proxy-subdomain.v6-jobs.svc.cluster.local' # Must be consistent with kubeconfs/node_pod_config.yaml
V6_NODE_PROXY_PORT = 4567
125 changes: 57 additions & 68 deletions integration_poc/v6_k8s_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ def setup_encryption(self) -> None:
raise Exception("Expectations on encryption don't match?!")

if encrypted_collaboration:
self.log.warn("Enabling encryption!")
self.log.warning("Enabling encryption!")
private_key_file = self.private_key_filename()
self.client.setup_encryption(private_key_file)

else:
self.log.warn("Disabling encryption!")
self.log.warning("Disabling encryption!")
self.client.setup_encryption(None)


Expand Down Expand Up @@ -162,85 +162,74 @@ def authenticate(self):


def __proxy_server_worker(self) -> None:

"""
Proxy algorithm container communcation.
A proxy for communication between algorithms and central
server.
"""
#if self.ctx.running_in_docker:
# NODE_PROXY_SERVER_HOSTNAME points to the name of the proxy
# when running in the isolated docker network.
#default_proxy_host = "127.0.0.1"
#else:
# If we're running non-dockerized, assume that the proxy is
# accessible from within the docker algorithm container on
# host.docker.internal.
#default_proxy_host = "host.docker.internal"
default_proxy_host = "localhost"

if self.k8s_container_manager.running_on_guest_env:
default_proxy_host = pod_node_constants.V6_NODE_FQDN
else:
#TODO to be removed
default_proxy_host = "host.docker.internal"

# If PROXY_SERVER_HOST was set in the environment, it overrides our
# value.

proxy_host = os.environ.get("PROXY_SERVER_HOST", default_proxy_host)
#os.environ["PROXY_SERVER_HOST"] = proxy_host
os.environ["PROXY_SERVER_HOST"] = proxy_host

#proxy_port = int(os.environ.get("PROXY_SERVER_PORT", 8080))
proxy_port = 4567
proxy_port = pod_node_constants.V6_NODE_PROXY_PORT

# 'app' is defined in vantage6.node.proxy_server
debug_mode = self.debug.get("proxy_server", False)
if debug_mode:
self.log.debug("Debug mode enabled for proxy server")
proxy_server.app.debug = True
proxy_server.app.config["SERVER_IO"] = self.client

#The value on the module variable 'server_url' defines the target of the 'make_request' method.
#TODO improve encapsulation here - why proxy_server.server_url, and proxy_host?
proxy_server.server_url = self.client.base_path
self.log.info(">>>> Setting target endpoint for the algorithm's client as : %s",proxy_server.server_url)

"""
try:
self.log.info("Starting proxyserver at '%s:%s'", proxy_host, proxy_port)

# 'app' is defined in vantage6.node.proxy_server
debug_mode = self.debug.get("proxy_server", False)
if debug_mode:
self.log.debug("Debug mode enabled for proxy server")
proxy_server.app.debug = True
proxy_server.app.config["SERVER_IO"] = self.client
proxy_server.server_url = self.client.base_path
except Exception as e:
print(e)
"""
try:

# set up proxy server logging
#log_level = getattr(logging, self.config["logging"]["level"].upper())
#self.proxy_log = get_file_logger(
# #"proxy_server", self.ctx.proxy_log_file, log_level_file=log_level
# "proxy_server", './test_log_file.txt', log_level_file=logging.DEBUG, log_level_console = logging.DEBUG,
#)

# this is where we try to find a port for the proxyserver
for try_number in range(5):
self.log.info("Starting proxyserver at '%s:%s'", proxy_host, proxy_port)
http_server = WSGIServer(
("0.0.0.0", proxy_port), proxy_server.app #, log=self.proxy_log
)
try:
print('Starting proxy')
http_server.serve_forever()
except OSError as e:
print(e)
self.log.info("Error during attempt %s", try_number)
self.log.info("%s: %s", type(e), e)
if e.errno == 48:
proxy_port = random.randint(2048, 16384)
self.log.warning("Retrying with a different port: %s", proxy_port)
os.environ["PROXY_SERVER_PORT"] = str(proxy_port)

else:
raise

except Exception as e:
print(e)
self.log.error("Proxyserver could not be started or crashed!")
self.log.error(e)
except Exception as e:
print("Exception catched")
print(e)
# set up proxy server logging
log_level = getattr(logging, self.config["logging"]["level"].upper())
self.proxy_log = get_file_logger(
"proxy_server", self.ctx.proxy_log_file, log_level_file=log_level
)

# this is where we try to find a port for the proxyserver
for try_number in range(5):
self.log.info("Starting proxyserver at '%s:%s'", proxy_host, proxy_port)
http_server = WSGIServer(
("0.0.0.0", proxy_port), proxy_server.app, log=self.proxy_log
)

try:
http_server.serve_forever()


except OSError as e:
self.log.info("Error during attempt %s", try_number)
self.log.info("%s: %s", type(e), e)

if e.errno == 48:
proxy_port = random.randint(2048, 16384)
self.log.warning("Retrying with a different port: %s", proxy_port)
os.environ["PROXY_SERVER_PORT"] = str(proxy_port)

else:
raise

except Exception as e:
self.log.error("Proxyserver could not be started or crashed!")
self.log.error(e)



def connect_to_socket(self) -> None:
Expand Down Expand Up @@ -791,7 +780,7 @@ def __start_task(self, task_incl_run: dict) -> None:

node = NodePod(ctx)
node.start_processing_threads()
print("Success")




2 changes: 1 addition & 1 deletion v6-client/rclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

average_task = client.task.create(
collaboration=1,
organizations=[2],
organizations=[4],
name="poc_model",
image="hcadavidescience/poc_model_training",
description='',
Expand Down
4 changes: 2 additions & 2 deletions v6-client/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ requests==2.32.3
schema==0.7.5
six==1.16.0
urllib3==2.2.1
vantage6-client==4.5.5
vantage6-common==4.5.5
vantage6-client==4.7.0
vantage6-common==4.7.0

0 comments on commit a00b3e4

Please sign in to comment.