Skip to content

Commit

Permalink
Refactor running processess metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksey Mikhaylov committed Sep 26, 2024
1 parent 64ed77c commit d0812ad
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 39 deletions.
16 changes: 9 additions & 7 deletions aqueduct/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from .metrics.collect import Collector, TasksStats
from .metrics.export import Exporter
from .metrics.manager import get_metrics_manager
from .metrics.processes import ProcessesStats
from .metrics.queue import TaskMetricsQueue
from .metrics.timer import timeit
from .multiprocessing import (
Expand Down Expand Up @@ -264,6 +263,13 @@ async def _check_memory_usage(self, sleep_sec: float = 1.):
self._metrics_manager.collector.add_memory_usage(metrics)
await asyncio.sleep(sleep_sec)

async def _count_processes(self, sleep_sec: float = 1.):
while self.state != FlowState.STOPPED:
metrics = MetricsItems()
metrics.add('running', len(mp.active_children()))
self._metrics_manager.collector.add_processes_count(metrics)
await asyncio.sleep(sleep_sec)

def _run_steps(self, timeout: Optional[int]):
if len(self._steps) == 0:
log.info('Flow has zero steps -> do nothing')
Expand Down Expand Up @@ -331,7 +337,9 @@ def _run_tasks(self):
self._tasks.append(asyncio.ensure_future(self._check_is_alive()))

self._metrics_manager.start(queues_info=self._get_queues_info())

self._tasks.append(asyncio.ensure_future(self._check_memory_usage()))
self._tasks.append(asyncio.ensure_future(self._count_processes()))

def _get_queues_info(self) -> Dict[mp.Queue, str]:
"""Returns queues between Step handlers and its names.
Expand Down Expand Up @@ -412,20 +420,14 @@ async def _check_is_alive(self, sleep_sec: float = 1.):
If at least one process is not alive, it stops Flow.
"""
while self.state != FlowState.STOPPED:
processes_stats = ProcessesStats()
for handler, context in self._contexts.items():
for proc in context.processes:
if not proc.is_alive():
if self.is_running:
handler_name = handler.__class__.__name__
log.error('The process %s for %s handler is dead',
proc.pid, handler_name)
processes_stats.add_dead_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await self.stop(graceful=False)
else:
processes_stats.add_running_process()
self._metrics_manager.collector.add_processes_stats(processes_stats)
await asyncio.sleep(sleep_sec)

@staticmethod
Expand Down
11 changes: 6 additions & 5 deletions aqueduct/metrics/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from . import IMetricsItems
from .base import MetricsItems, MetricsTypes
from .processes import ProcessesStats
from .task import TasksMetricsStorage


Expand All @@ -31,18 +30,20 @@ def __init__(self):
self.queue_sizes = MetricsItems()
self.tasks_stats = TasksStats()
self.memory_usage = MetricsItems()
self.processes_stats = ProcessesStats()
self.processes_count = MetricsItems()

def extend(self, storage: TasksMetricsStorage):
super().extend(storage)
if isinstance(storage, AqueductMetricsStorage):
self.queue_sizes.extend(storage.queue_sizes)
self.tasks_stats.extend(storage.tasks_stats)
self.processes_stats.extend(storage.processes_stats)

def extend_memory_usage(self, metrics: MetricsItems):
self.memory_usage.extend(metrics)

def extend_processes_count(self, metrics: MetricsItems):
self.processes_count.extend(metrics)


class Collector:
def __init__(self, collectible_metrics: Iterable[MetricsTypes] = None,
Expand Down Expand Up @@ -73,8 +74,8 @@ def add_tasks_stats(self, stats: TasksStats):
def add_memory_usage(self, metrics: MetricsItems):
self._metrics.extend_memory_usage(metrics)

def add_processes_stats(self, stats: ProcessesStats):
self._metrics.processes_stats.extend(stats)
def add_processes_count(self, metrics: MetricsItems):
self._metrics.extend_processes_count(metrics)

def extract_metrics(self) -> AqueductMetricsStorage:
metrics = self._metrics
Expand Down
5 changes: 2 additions & 3 deletions aqueduct/metrics/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ def export(self, metrics: AqueductMetricsStorage):
for name, memory_usage in metrics.memory_usage.items:
self.target.timing(f'{self.prefix}.{MEMORY_USAGE_PREFIX}.{name}', memory_usage)

for name, cnt in metrics.processes_stats.items:
if cnt > 0:
self.target.count(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', cnt)
for name, processes_count in metrics.processes_count.items:
self.target.timing(f'{self.prefix}.{PROCESSES_PREFIX}.{name}', processes_count)


class DummyExporter(Exporter):
Expand Down
24 changes: 0 additions & 24 deletions aqueduct/metrics/processes.py

This file was deleted.

0 comments on commit d0812ad

Please sign in to comment.