Skip to content

Commit

Permalink
[Dashboard] Optimizing performance of Ray Dashboard (#47617)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin authored Sep 19, 2024
1 parent 1e48a03 commit b0828fb
Show file tree
Hide file tree
Showing 23 changed files with 510 additions and 255 deletions.
10 changes: 10 additions & 0 deletions python/ray/_private/collections_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import List, Any


def split(items: List[Any], chunk_size: int):
"""Splits provided list into chunks of given size"""

assert chunk_size > 0, "Chunk size has to be > 0"

for i in range(0, len(items), chunk_size):
yield items[i : i + chunk_size]
5 changes: 0 additions & 5 deletions python/ray/dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pathlib
import signal
import sys
from concurrent.futures import ThreadPoolExecutor

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -49,10 +48,6 @@ def __init__(
# Public attributes are accessible for all agent modules.
self.ip = node_ip_address
self.minimal = minimal
self.thread_pool_executor = ThreadPoolExecutor(
max_workers=dashboard_consts.RAY_AGENT_THREAD_POOL_MAX_WORKERS,
thread_name_prefix="dashboard_agent_tpe",
)

assert gcs_address is not None
self.gcs_address = gcs_address
Expand Down
8 changes: 1 addition & 7 deletions python/ray/dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"RAY_DASHBOARD_STATS_PURGING_INTERVAL", 60 * 10
)
RAY_DASHBOARD_STATS_UPDATING_INTERVAL = env_integer(
"RAY_DASHBOARD_STATS_UPDATING_INTERVAL", 2
"RAY_DASHBOARD_STATS_UPDATING_INTERVAL", 15
)
DASHBOARD_RPC_ADDRESS = "dashboard_rpc"
DASHBOARD_RPC_PORT = env_integer("RAY_DASHBOARD_RPC_PORT", 0)
Expand All @@ -49,12 +49,6 @@
# Example: "your.module.ray_cluster_activity_hook".
RAY_CLUSTER_ACTIVITY_HOOK = "RAY_CLUSTER_ACTIVITY_HOOK"

# Works in the thread pool should not starve the main thread loop, so we default to 1.
RAY_DASHBOARD_THREAD_POOL_MAX_WORKERS = env_integer(
"RAY_DASHBOARD_THREAD_POOL_MAX_WORKERS", 1
)
RAY_AGENT_THREAD_POOL_MAX_WORKERS = env_integer("RAY_AGENT_THREAD_POOL_MAX_WORKERS", 1)

# The number of candidate agents
CANDIDATE_AGENT_NUMBER = max(env_integer("CANDIDATE_AGENT_NUMBER", 1), 1)
# when head receive JobSubmitRequest, maybe not any agent is available,
Expand Down
8 changes: 8 additions & 0 deletions python/ray/dashboard/dashboard_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def __init__(self, registry: Optional[CollectorRegistry] = None):
namespace="ray",
registry=self.registry,
)
self.metrics_event_loop_tasks = Gauge(
"dashboard_event_loop_tasks",
"Number of tasks currently pending in the event loop's queue.",
tuple(COMPONENT_METRICS_TAG_KEYS),
unit="tasks",
namespace="ray",
registry=self.registry,
)
self.metrics_event_loop_lag = Gauge(
"dashboard_event_loop_lag",
"Event loop lag in seconds.",
Expand Down
70 changes: 41 additions & 29 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from typing import Any, List, Optional

Expand Down Expand Up @@ -72,46 +71,55 @@ async def organize(cls, thread_pool_executor):
to make sure it's on the main event loop thread. To avoid blocking the main
event loop, we yield after each node processed.
"""
loop = get_or_create_event_loop()

node_workers = {}
core_worker_stats = {}
# nodes may change during process, so we create a copy of keys().

# NOTE: We copy keys of the `DataSource.nodes` to make sure
# it doesn't change during the iteration (since its being updated
# from another async task)
for node_id in list(DataSource.nodes.keys()):
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
node_stats = DataSource.node_stats.get(node_id, {})
# Offloads the blocking operation to a thread pool executor. This also
# yields to the event loop.
workers = await get_or_create_event_loop().run_in_executor(
workers = await loop.run_in_executor(
thread_pool_executor,
cls.merge_workers_for_node,
cls._extract_workers_for_node,
node_physical_stats,
node_stats,
)

for worker in workers:
for stats in worker.get("coreWorkerStats", []):
worker_id = stats["workerId"]
core_worker_stats[worker_id] = stats

node_workers[node_id] = workers

DataSource.node_workers.reset(node_workers)
DataSource.core_worker_stats.reset(core_worker_stats)

@classmethod
def merge_workers_for_node(cls, node_physical_stats, node_stats):
def _extract_workers_for_node(cls, node_physical_stats, node_stats):
workers = []
# Merge coreWorkerStats (node stats) to workers (node physical stats)
pid_to_worker_stats = {}
pid_to_language = {}
pid_to_job_id = {}
pids_on_node = set()

for core_worker_stats in node_stats.get("coreWorkersStats", []):
pid = core_worker_stats["pid"]
pids_on_node.add(pid)

pid_to_worker_stats[pid] = core_worker_stats
pid_to_language[pid] = core_worker_stats["language"]
pid_to_job_id[pid] = core_worker_stats["jobId"]

for worker in node_physical_stats.get("workers", []):
worker = dict(worker)
pid = worker["pid"]

core_worker_stats = pid_to_worker_stats.get(pid)
# Empty list means core worker stats is not available.
worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else []
Expand All @@ -121,6 +129,7 @@ def merge_workers_for_node(cls, node_physical_stats, node_stats):
worker["jobId"] = pid_to_job_id.get(pid, dashboard_consts.DEFAULT_JOB_ID)

workers.append(worker)

return workers

@classmethod
Expand Down Expand Up @@ -156,10 +165,14 @@ async def get_node_info(cls, node_id, get_summary=False):
)

if not get_summary:
actor_table_entries = DataSource.node_actors.get(node_id, {})

# Merge actors to node physical stats
node_info["actors"] = await DataOrganizer._get_all_actors(
DataSource.node_actors.get(node_id, {})
)
node_info["actors"] = {
actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
for actor_id, actor_table_entry in actor_table_entries.items()
}

# Update workers to node physical stats
node_info["workers"] = DataSource.node_workers.get(node_id, [])

Expand All @@ -168,6 +181,8 @@ async def get_node_info(cls, node_id, get_summary=False):
@classmethod
async def get_all_node_summary(cls):
return [
# NOTE: We're intentionally awaiting in a loop to avoid excessive
# concurrency spinning up excessive # of tasks for large clusters
await DataOrganizer.get_node_info(node_id, get_summary=True)
for node_id in DataSource.nodes.keys()
]
Expand Down Expand Up @@ -209,28 +224,25 @@ def _create_agent_info(node_id: str):
return {node_id: _create_agent_info(node_id) for node_id in target_node_ids}

@classmethod
async def get_all_actors(cls):
return await cls._get_all_actors(DataSource.actors)
async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None):
target_actor_table_entries: dict[str, Optional[dict]]
if actor_ids is not None:
target_actor_table_entries = {
actor_id: DataSource.actors.get(actor_id) for actor_id in actor_ids
}
else:
target_actor_table_entries = DataSource.actors

@staticmethod
async def _get_all_actors(actors):
result = {}
for index, (actor_id, actor) in enumerate(actors.items()):
result[actor_id] = await DataOrganizer._get_actor(actor)
# There can be thousands of actors including dead ones. Processing
# them all can take many seconds, which blocks all other requests
# to the dashboard. The ideal solution might be to implement
# pagination. For now, use a workaround to yield to the event loop
# periodically, so other request handlers have a chance to run and
# avoid long latencies.
if index % 1000 == 0 and index > 0:
# Canonical way to yield to the event loop:
# https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
return result
return {
actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
for actor_id, actor_table_entry in target_actor_table_entries.items()
}

@staticmethod
async def _get_actor(actor):
async def _get_actor_info(actor):
if actor is None:
return None

actor = dict(actor)
worker_id = actor["address"]["workerId"]
core_worker_stats = DataSource.core_worker_stats.get(worker_id, {})
Expand Down
18 changes: 12 additions & 6 deletions python/ray/dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray.experimental.internal_kv as internal_kv
from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
from ray._private.ray_constants import env_integer
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray._raylet import GcsClient
from ray.dashboard.consts import DASHBOARD_METRIC_PORT
Expand All @@ -30,6 +31,13 @@
("grpc.max_receive_message_length", ray_constants.GRPC_CPP_MAX_MESSAGE_SIZE),
)

# NOTE: Executor in this head is intentionally constrained to just 1 thread by
# default to limit its concurrency, therefore reducing potential for
# GIL contention
RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS = env_integer(
"RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS", 1
)


def initialize_grpc_port_and_server(grpc_ip, grpc_port):
try:
Expand Down Expand Up @@ -98,11 +106,9 @@ def __init__(
self._modules_to_load = modules_to_load
self._modules_loaded = False

# A TPE holding background, compute-heavy, latency-tolerant jobs, typically
# state updates.
self._thread_pool_executor = ThreadPoolExecutor(
max_workers=dashboard_consts.RAY_DASHBOARD_THREAD_POOL_MAX_WORKERS,
thread_name_prefix="dashboard_head_tpe",
self._executor = ThreadPoolExecutor(
max_workers=RAY_DASHBOARD_DASHBOARD_HEAD_TPE_MAX_WORKERS,
thread_name_prefix="dashboard_head_executor",
)

self.gcs_address = None
Expand Down Expand Up @@ -326,7 +332,7 @@ async def _async_notify():
self._gcs_check_alive(),
_async_notify(),
DataOrganizer.purge(),
DataOrganizer.organize(self._thread_pool_executor),
DataOrganizer.organize(self._executor),
]
for m in modules:
concurrent_tasks.append(m.run(self.server))
Expand Down
Loading

0 comments on commit b0828fb

Please sign in to comment.