Skip to content

Commit

Permalink
daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
MenxLi committed Feb 17, 2025
1 parent 83bb3a4 commit 8eeea0f
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 13 deletions.
15 changes: 11 additions & 4 deletions docs/quick-start-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pip install pody
然后设置好相应环境变量:
```sh
export PODY_API_BASE="http://10.254.29.178:8000";
export PODY_API_USERNAME="limengxun";
export PODY_API_PASSWORD="123";
export PODY_USERNAME="limengxun";
export PODY_PASSWORD="123";
```

::: tip
Expand Down Expand Up @@ -104,6 +104,13 @@ cat init.sh | pody fetch pod/exec ins:main cmd:
ssh -p 20806 root@10.254.29.178
```

:::warning
用户通常会被赋予资源使用限制,可以通过`user/info`查看。
如果超过使用数量限制,Pody守护进程可能会杀掉你的容器!
此时,可以重启容器,并在`/log/pody/...`目录下查看日志文件,以排查问题。
:::

## 更多操作
更多操作请参考[API文档](/api.md)
关于Pody-CLI的更多信息请参考[这里](/pody-cli.md)
更多操作请参考[API文档](./api.md)
关于Pody-CLI的更多信息请参考[这里](./pody-cli.md)
Pody的源码可以在[这里](https://github.com/MenxLi/pody)查看。
2 changes: 1 addition & 1 deletion pody/eng/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ProcessInfo:
def query_process(pid: int) -> ProcessInfo:
def _cgroup_from_pid(pid: int) -> str:
with open(f"/proc/{pid}/cgroup") as f:
return f.read().split("\n")[0]
return f.read()

try:
proc = psutil.Process(pid)
Expand Down
23 changes: 23 additions & 0 deletions pody/eng/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,29 @@ def exec_container_bash(
proc.join()
return q.get()

def container_from_pid(client: docker.client.DockerClient, host_pid: int) -> Optional[str]:
try:
with open(f"/proc/{host_pid}/cgroup", "r") as f:
cgroup_info = f.read()
except FileNotFoundError:
return None # Not running inside Docker

# Extract container ID (Docker uses /docker/<container_id> in cgroups)
container_id = None
for line in cgroup_info.splitlines():
parts = line.split(':')
if len(parts) == 3 and "docker" in parts[2]:
container_id = parts[2].split('/')[-1]
# some systems have a different format
if container_id.startswith("docker-") and container_id.endswith(".scope"):
container_id = container_id[len("docker-"):-len(".scope")]
break

if not container_id:
return None # Not running inside Docker
container = client.containers.get(container_id)
return container.name

if __name__ == "__main__":
client = docker.from_env()
config = ContainerConfig(
Expand Down
14 changes: 14 additions & 0 deletions pody/eng/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ def has_user(self, username: str)->bool:
with self.cursor() as cur:
cur.execute("SELECT id FROM users WHERE username = ?", (username,))
return cur.fetchone() is not None

def get_user(self, user_id: str | int):
if isinstance(user_id, str):
with self.cursor() as cur:
cur.execute("SELECT id, username, is_admin FROM users WHERE username = ?", (user_id,))
res = cur.fetchone()
if res is None: return UserRecord(0, '', False)
else: return UserRecord(*res)
else:
with self.cursor() as cur:
cur.execute("SELECT id, username, is_admin FROM users WHERE id = ?", (user_id,))
res = cur.fetchone()
if res is None: return UserRecord(0, '', False)
else: return UserRecord(*res)

def check_user(self, credential: str):
with self.cursor() as cur:
Expand Down
8 changes: 7 additions & 1 deletion pody/svc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional

from fastapi.staticfiles import StaticFiles
from .daemon import start_daemon
from .app_base import *
from .router_host import router_host
from .router_pod import router_pod
Expand Down Expand Up @@ -55,4 +56,9 @@ def start_server(
port: int = 8000,
workers: Optional[int] = None,
):
uvicorn.run(f"pody.svc.app:app", host=host, port=port, workers=workers)
daemon_p = start_daemon()
try:
uvicorn.run(f"pody.svc.app:app", host=host, port=port, workers=workers)
finally:
daemon_p.terminate()
daemon_p.join()
66 changes: 66 additions & 0 deletions pody/svc/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import time
import docker
import multiprocessing as mp
from ..eng.user import UserDatabase
from ..eng.gpu import GPUHandler
from ..eng.docker import exec_container_bash
from .router_host import gpu_status_impl

def leave_info(container_name, info: str, level: str = "info"):
assert "'" not in info, "Single quote is not allowed in info"
curr_time_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
logdir = "/log/pody"
fname = f"{curr_time_str}.{level}.log"
exec_container_bash(container_name, f"mkdir -p {logdir} && echo '{info}' > {logdir}/{fname}")

def task_check_gpu_usage():
client = docker.from_env()
user_db = UserDatabase()

gpu_processes = gpu_status_impl(client, list(range(GPUHandler().device_count())))
user_proc_count: dict[str, int] = {}
user_procs: dict[str, list[dict[str, str]]] = {}
for i, ps in gpu_processes.items():
this_gpu_users = set()
for p in ps:
pod_name: str = p['pod']
if not pod_name: # skip host process
continue
username = pod_name.split('-')[0]
this_gpu_users.add(username)
user_procs[username] = user_procs.get(username, []) + [p]
for user in this_gpu_users:
user_proc_count[user] = user_proc_count.get(user, 0) + 1

print("[Daemon] GPU usage: {}".format(user_proc_count))

for username, proc_count in user_proc_count.items():
user = user_db.get_user(username)
if user.userid == 0: # skip task not related to this database
continue
max_gpu_count = user_db.check_user_quota(username).gpu_count
if max_gpu_count >= 0 and proc_count > max_gpu_count:
# kill container from this user (the one with the shortest uptime)
# not process because we may not have permission to kill process...
user_procs[username].sort(key=lambda x: x['uptime'])
p = user_procs[username][0]
pod_name = p['pod']
pid = int(p['pid'])
cmd = p['cmd']
leave_info(pod_name, f"Killed container with pid-{pid} ({cmd}) due to GPU quota exceeded.", "critical")
client.containers.get(pod_name).stop()
print(f"[Daemon] Killed container {pod_name} with pid-{pid} ({cmd}) due to GPU quota exceeded.")

def daemon_worker():
while True:
try:
task_check_gpu_usage()
except Exception as e:
if isinstance(e, KeyboardInterrupt): raise
print(f"[Daemon] Error: {e}")
time.sleep(60) # check every minute

def start_daemon() -> mp.Process:
p = mp.Process(target=daemon_worker)
p.start()
return p
9 changes: 2 additions & 7 deletions pody/svc/router_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from ..eng.errors import *
from ..eng.user import UserRecord
from ..eng.docker import check_container, list_docker_images
from ..eng.docker import check_container, list_docker_images, container_from_pid
from ..eng.gpu import list_processes_on_gpus, GPUProcess, GPUHandler
from ..eng.cpu import query_process

Expand All @@ -16,14 +16,9 @@
router_host = APIRouter(prefix="/host")

def gpu_status_impl(client: DockerClient, gpu_ids: list[int]):
def container_id_from_cgroup(cgoup: str) -> Optional[str]:
last = cgoup.split("/")[-1]
if not last.startswith("docker-"): return None
if not last.endswith(".scope"): return None
return last[len("docker-"):-len(".scope")]
def fmt_gpu_proc(gpu_proc: GPUProcess):
process_info = query_process(gpu_proc.pid)
container_id = container_id_from_cgroup(process_info.cgroup)
container_id = container_from_pid(g_client, gpu_proc.pid)
container_name = check_container(client, container_id)["name"] if container_id else ""
return {
"pid": gpu_proc.pid,
Expand Down

0 comments on commit 8eeea0f

Please sign in to comment.