Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 3, 2024
1 parent a9e4201 commit 8ee92a6
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 115 deletions.
11 changes: 11 additions & 0 deletions src/anemoi/registry/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def is_identifier(self, name_or_path):
return False

def process_task(self, entry, args, k, func_name=None, /, **kwargs):
"""
Call the method `k` on the entry object.
The args/kwargs given to the method are extracted from from the argument `k` in the `args` object.
Additionally the argument `k` is casted to the correct type,
depending on if this is a string, int, float, list or dict, or a boolean.
The provided **kwargs are also passed to the method.
The method name can be changed by providing the `func_name` argument.
"""

assert isinstance(k, str), k
if func_name is None:
func_name = k
Expand Down
112 changes: 0 additions & 112 deletions src/anemoi/registry/commands/queues.py

This file was deleted.

124 changes: 121 additions & 3 deletions src/anemoi/registry/commands/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,94 @@
"""

import datetime
import logging

from anemoi.registry.commands.base import BaseCommand
from anemoi.utils.humanize import when
from anemoi.utils.text import table

from ..queue_manager import TaskCatalogueEntry
from anemoi.registry.commands.base import BaseCommand
from anemoi.registry.entry import list_to_dict
from anemoi.registry.queue_manager import TaskCatalogueEntry
from anemoi.registry.rest import RestItemList

LOG = logging.getLogger(__name__)


def task_list_to_str(data, long):
rows = []
for v in data:
if not isinstance(v, dict):
raise ValueError(v)
created = datetime.datetime.fromisoformat(v.pop("created"))
updated = datetime.datetime.fromisoformat(v.pop("updated"))

uuid = v.pop("uuid")
content = " ".join(f"{k}={v}" for k, v in v.items())
if not long:
content = content[:20] + "..."
rows.append(
[
when(created),
when(updated),
v.pop("status"),
v.pop("progress", ""),
content,
uuid,
]
)
return table(rows, ["Created", "Updated", "Status", "%", "Details", "UUID"], ["<", "<", "<", "<", "<", "<"])


class Tasks(BaseCommand):
internal = True
timestamp = True
entry_class = TaskCatalogueEntry

collection = "queues"

def add_arguments(self, command_parser):
command_parser.add_argument("TASK", help="The uuid of the task")
command_parser.add_argument("TASK", help="The uuid of the task", nargs="?")
command_parser.add_argument("--remove", help="remove from queue")
command_parser.add_argument("--set-status", help="--set-status <status>")
command_parser.add_argument("--set-progress", help="--set-progress <progress>", type=int)
command_parser.add_argument("--disown", help="Release a task and requeue it", action="store_true")
command_parser.add_argument("--own", help="Release a task and requeue it", action="store_true")
command_parser.add_argument("--json", help="Output json record", action="store_true")

group = command_parser.add_mutually_exclusive_group()
group.add_argument("--new", help="Add a new queue entry", nargs="*", metavar="K=V")
group.add_argument(
"--take-one", help="Take ownership of the oldest entry with status=queued", nargs="*", metavar="K=V"
)
group.add_argument("--list", help="List some queue entries", nargs="*", metavar="K=V")
group.add_argument("--worker", help="Run a worker, taking ownership of the oldest task, running it.", nargs="*")

command_parser.add_argument(
"--sort",
help="Sort by date. Use with --list, --worker, --take-one",
choices=["created", "updated"],
default="updated",
)
command_parser.add_argument("-l", "--long", help="Details, use with --list", action="store_true")
command_parser.add_argument(
"--timeout", help="Die with timeout (SIGALARM) after TIMEOUT seconds. Use with --worker.", type=int
)

def run(self, args):
if args.TASK:
return self.run_with_uuid(args.TASK, args)
if args.new is not None:
self.run_new(args)
if args.take_one is not None:
self.run_take_one(args)
if args.list is not None:
self.run_list(args)
if args.worker is not None:
self.run_worker(args)

def run_with_uuid(self, uuid, args):

uuid = args.TASK
entry = self.entry_class(key=uuid)

Expand All @@ -46,5 +110,59 @@ def run(self, args):
self.process_task(entry, args, "set_progress")
self.process_task(entry, args, "json")

def run_worker(self, args):
if args.timeout:
import signal

signal.alarm(args.timeout)

data = self._retrieve_task_list(args.worker, args.sort)
if data:
self._process_one_task(data[-1]["uuid"])
else:
LOG.info("No tasks found")

def _process_one_task(self, uuid):
entry = self.entry_class(key=uuid)
LOG.info(f"Processing task {uuid}: {entry}")
res = entry.take_ownership()
print(res)

def run_new(self, args):
res = TaskCatalogueEntry.new(*args.new)
uuid = res["uuid"]
LOG.debug(f"New task created {uuid}: {res}")
print(uuid)

def _retrieve_task_list(self, lst, sort):
request = list_to_dict(lst)
data = RestItemList(self.collection).get(params=request)
return sorted(data, key=lambda x: x[sort])

def run_list(self, args):
data = self._retrieve_task_list(args.list, args.sort)
long = args.long
print(task_list_to_str(data, long))
return data

def run_take_one(self, args):
take_one = args.take_one
if not take_one: # empty list
take_one = ["status=queued"]

data = self._retrieve_task_list(take_one, args.sort)
uuids = [v["uuid"] for v in data]
if not uuids:
LOG.info("No tasks found")
return
latest = uuids[-1]

entry = TaskCatalogueEntry(key=latest)
res = entry.take_ownership()
LOG.debug(f"Task {latest} taken: {res}")
uuid = res["uuid"]
print(uuid)
return uuid


command = Tasks

0 comments on commit 8ee92a6

Please sign in to comment.