Skip to content

Commit

Permalink
Move stuff to the model
Browse files Browse the repository at this point in the history
  • Loading branch information
medihack committed Dec 19, 2023
1 parent 88e4f39 commit 9397cab
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 67 deletions.
2 changes: 0 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

## Top

- update_job_status -> update_job_state
- maybe move job_utils function to models methods
- Move executor functionality to processors
- Add permissions to dicom_web views (see TODOs there)
- task urls without job
Expand Down
57 changes: 57 additions & 0 deletions adit/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from django.db import models
from django.db.models.constraints import UniqueConstraint
from django.db.models.query import QuerySet
from django.utils import timezone

from adit.accounts.models import User
from adit.core.utils.mail import send_job_finished_mail

from .validators import (
no_backslash_char_validator,
Expand Down Expand Up @@ -250,6 +252,61 @@ def reset_tasks(self, only_failed=False) -> None:
end=None,
)

def post_process(self) -> bool:
"""Evaluates all the tasks of a dicom job and sets the job state accordingly.
Returns: True if the job is finished, False otherwise
"""

if self.tasks.filter(status=DicomTask.Status.PENDING).exists():
if self.status != DicomJob.Status.CANCELING:
self.status = DicomJob.Status.PENDING
self.save()
return False

if self.tasks.filter(status=DicomTask.Status.IN_PROGRESS).exists():
if self.status != DicomJob.Status.CANCELING:
self.status = DicomJob.Status.IN_PROGRESS
self.save()
return False

if self.status == DicomJob.Status.CANCELING:
self.status = DicomJob.Status.CANCELED
self.save()
return False

# Job is finished and we evaluate its final status
has_success = self.tasks.filter(status=DicomTask.Status.SUCCESS).exists()
has_warning = self.tasks.filter(status=DicomTask.Status.WARNING).exists()
has_failure = self.tasks.filter(status=DicomTask.Status.FAILURE).exists()

if has_success and not has_warning and not has_failure:
self.status = DicomJob.Status.SUCCESS
self.message = "All tasks succeeded."
elif has_success and has_failure or has_warning and has_failure:
self.status = DicomJob.Status.FAILURE
self.message = "Some tasks failed."
elif has_success and has_warning:
self.status = DicomJob.Status.WARNING
self.message = "Some tasks have warnings."
elif has_warning:
self.status = DicomJob.Status.WARNING
self.message = "All tasks have warnings."
elif has_failure:
self.status = DicomJob.Status.FAILURE
self.message = "All tasks failed."
else:
# at least one of success, warnings or failures must be > 0
raise AssertionError(f"Invalid task status list of {self}.")

self.end = timezone.now()
self.save()

if self.send_finished_mail:
send_job_finished_mail(self)

return True

@property
def is_deletable(self) -> bool:
non_pending_tasks = self.tasks.exclude(status=DicomTask.Status.PENDING)
Expand Down
55 changes: 0 additions & 55 deletions adit/core/utils/job_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from django.utils import timezone

from ..models import DicomJob, DicomTask, QueuedTask


Expand All @@ -22,56 +20,3 @@ def queue_pending_tasks(dicom_job: DicomJob, default_priority: int, urgent_prior
for dicom_task in dicom_job.tasks.filter(status=DicomTask.Status.PENDING):
if not dicom_task.queued:
QueuedTask.objects.create(content_object=dicom_task, priority=priority)


def update_job_status(dicom_job: DicomJob) -> bool:
"""Evaluates all the tasks of a dicom job and sets the job status accordingly.
Returns: True if the job is finished, False otherwise
"""

if dicom_job.tasks.filter(status=DicomTask.Status.PENDING).exists():
if dicom_job.status != DicomJob.Status.CANCELING:
dicom_job.status = DicomJob.Status.PENDING
dicom_job.save()
return False

if dicom_job.tasks.filter(status=DicomTask.Status.IN_PROGRESS).exists():
if dicom_job.status != DicomJob.Status.CANCELING:
dicom_job.status = DicomJob.Status.IN_PROGRESS
dicom_job.save()
return False

if dicom_job.status == DicomJob.Status.CANCELING:
dicom_job.status = DicomJob.Status.CANCELED
dicom_job.save()
return False

# Job is finished and we evaluate its final status
has_success = dicom_job.tasks.filter(status=DicomTask.Status.SUCCESS).exists()
has_warning = dicom_job.tasks.filter(status=DicomTask.Status.WARNING).exists()
has_failure = dicom_job.tasks.filter(status=DicomTask.Status.FAILURE).exists()

if has_success and not has_warning and not has_failure:
dicom_job.status = DicomJob.Status.SUCCESS
dicom_job.message = "All tasks succeeded."
elif has_success and has_failure or has_warning and has_failure:
dicom_job.status = DicomJob.Status.FAILURE
dicom_job.message = "Some tasks failed."
elif has_success and has_warning:
dicom_job.status = DicomJob.Status.WARNING
dicom_job.message = "Some tasks have warnings."
elif has_warning:
dicom_job.status = DicomJob.Status.WARNING
dicom_job.message = "All tasks have warnings."
elif has_failure:
dicom_job.status = DicomJob.Status.FAILURE
dicom_job.message = "All tasks failed."
else:
# at least one of success, warnings or failures must be > 0
raise AssertionError(f"Invalid task status list of {dicom_job}.")

dicom_job.end = timezone.now()
dicom_job.save()

return True
8 changes: 4 additions & 4 deletions adit/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .site import job_stats_collectors
from .tasks import broadcast_mail
from .types import AuthenticatedHttpRequest, HtmxHttpRequest
from .utils.job_utils import queue_pending_task, queue_pending_tasks, update_job_status
from .utils.job_utils import queue_pending_task, queue_pending_tasks

THEME = "theme"

Expand Down Expand Up @@ -294,7 +294,7 @@ def post(self, request: AuthenticatedHttpRequest, *args, **kwargs) -> HttpRespon
dicom_tasks.update(status=DicomTask.Status.CANCELED)

# If there is a task in progress then the job will be set to canceling and will be set
# to canceled when the processing of the task is finished (see update_job_status).
# to canceled when the processing of the task is finished.
tasks_in_progress_count = job.tasks.filter(status=DicomTask.Status.IN_PROGRESS).count()
if tasks_in_progress_count > 0:
job.status = DicomJob.Status.CANCELING
Expand Down Expand Up @@ -442,7 +442,7 @@ def form_valid(self, form: ModelForm) -> HttpResponse:
success_message = self.success_message % task.__dict__

task.delete()
update_job_status(task.job)
task.job.post_process()

messages.success(self.request, success_message)
return redirect(task.job)
Expand Down Expand Up @@ -478,7 +478,7 @@ def post(self, request: AuthenticatedHttpRequest, *args, **kwargs) -> HttpRespon

queue_pending_task(task, self.default_priority, self.urgent_priority)

update_job_status(task.job)
task.job.post_process()

messages.success(request, self.success_message % task.__dict__)
return redirect(task)
Expand Down
7 changes: 1 addition & 6 deletions adit/core/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from adit.core.site import dicom_processors
from adit.core.types import ProcessingResult
from adit.core.utils.db_utils import ensure_db_connection
from adit.core.utils.job_utils import update_job_status
from adit.core.utils.mail import send_job_finished_mail

from .utils.worker_utils import in_time_slot

Expand Down Expand Up @@ -172,14 +170,11 @@ def check_and_process_next_task(self) -> bool:

with self._redis.lock(DISTRIBUTED_LOCK):
dicom_job.refresh_from_db()
job_finished = update_job_status(dicom_job)
job_finished = dicom_job.post_process()

if job_finished:
logger.info(f"Processing of {dicom_job} ended.")

if dicom_job.send_finished_mail:
send_job_finished_mail(dicom_job)

# Unlock the queued task. The queued task may also be already deleted
# when everything worked well.
try:
Expand Down

0 comments on commit 9397cab

Please sign in to comment.