Skip to content

Commit

Permalink
refactor QaCase into CrfCase and RequisitionCase
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvw committed Aug 29, 2024
1 parent 2035af4 commit 2d813de
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from django.apps import apps as django_apps
from django.contrib import admin
from django.core.exceptions import ObjectDoesNotExist
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.translation import gettext as _
from edc_model_admin.dashboard import ModelAdminDashboardMixin
from edc_model_admin.mixins import TemplatesModelAdminMixin
from edc_sites.admin import SiteModelAdminMixin
from edc_visit_schedule.admin import ScheduleStatusListFilter
from edc_visit_tracking.utils import get_related_visit_model_cls

from .qa_report_modeladmin_mixin import QaReportModelAdminMixin

Expand All @@ -18,14 +20,14 @@ class OnStudyMissingValuesModelAdminMixin(
TemplatesModelAdminMixin,
):
include_note_column: bool = True
project_reports_admin: str = "meta_reports_admin"
project_subject_admin: str = "meta_subject_admin"

site_list_display_insert_pos: int = 2
qa_report_list_display_insert_pos = 4
ordering = ["site", "subject_identifier"]

list_display = [
"dashboard",
"subject_identifier",
"render_button",
"subject",
"site",
"label",
"crf",
Expand All @@ -43,20 +45,72 @@ class OnStudyMissingValuesModelAdminMixin(

search_fields = ["subject_identifier", "label"]

def dashboard(self, obj=None, label=None) -> str:
url = self.get_subject_dashboard_url(obj=obj)
if not url:
@admin.display(description="Update")
def render_button(self, obj=None):
crf_model_cls = django_apps.get_model(obj.label_lower)
try:
django_apps.get_model(obj.label_lower).objects.get(id=obj.original_id)
except ObjectDoesNotExist:
url = reverse(
f"{self.crf_admin_site_name(crf_model_cls)}:"
f"{obj.label_lower.replace('.', '_')}_add"
)
url = (
f"{url}?next={self.admin_site.name}:"
f"{self.model._meta.label_lower.replace('.', '_')}_changelist"
f"&subject_identifier={obj.subject_identifier}"
f"&subject_visit={obj.subject_visit_id}"
f"&appointment={self.related_visit(obj).appointment.id}"
f"&requisition={obj.original_id}"
)
title = _(f"Add {crf_model_cls._meta.verbose_name}")
label = _("Add CRF")
crf_button = render_to_string(
"edc_qareports/columns/add_button.html",
context=dict(title=title, url=url, label=label),
)
else:
url = reverse(
f"{self.project_subject_admin}:{obj.label_lower.replace('.', '_')}_change",
f"{self.crf_admin_site_name(crf_model_cls)}:"
f"{obj.label_lower.replace('.', '_')}_change",
args=(obj.original_id,),
)
url = (
f"{url}?next={self.project_reports_admin}:"
f"{url}?next={self.admin_site.name}:"
f"{self.model._meta.label_lower.replace('.', '_')}_changelist"
)
context = dict(title=_("Go to CRF"), url=url, label=label)
title = _(f"Change {crf_model_cls._meta.verbose_name}")
label = _("Change CRF")
crf_button = render_to_string(
"edc_qareports/columns/change_button.html",
context=dict(title=title, url=url, label=label),
)
return crf_button

def dashboard(self, obj=None, label=None) -> str:
dashboard_url = reverse(
self.get_subject_dashboard_url_name(obj=obj),
kwargs=dict(
subject_identifier=obj.subject_identifier,
appointment=self.related_visit(obj).appointment.id,
),
)
context = dict(title=_("Go to subject's dashboard"), url=dashboard_url, label=label)
return render_to_string("dashboard_button.html", context=context)

@staticmethod
def crf_admin_site_name(crf_model_cls) -> str:
"""Returns the name of the admin site CRFs are registered
by assuming admin site name follows the edc naming convention.
For example: 'meta_subject_admin' or 'effect_subject_admin'
"""
return f"{crf_model_cls._meta.label_lower.split('.')[0]}_admin"

@staticmethod
def related_visit(obj=None):
return get_related_visit_model_cls().objects.get(id=obj.subject_visit_id)

@admin.display(description="CRF", ordering="label_lower")
def crf(self, obj=None) -> str:
model_cls = django_apps.get_model(obj.label_lower)
Expand All @@ -71,3 +125,18 @@ def report_date(self, obj) -> str | None:
if obj.report_datetime:
return obj.report_datetime.date()
return None

@admin.display(description="Subject", ordering="subject_identifier")
def subject(self, obj) -> str | None:
url = reverse(
f"{self.admin_site.name}:{self.model._meta.label_lower.replace('.', '_')}"
"_changelist"
)
return render_to_string(
"edc_qareports/columns/subject_identifier_column.html",
{
"subject_identifier": obj.subject_identifier,
"url": url,
"title": _("Filter by subject"),
},
)
9 changes: 5 additions & 4 deletions edc_qareports/modeladmin_mixins/qa_report_modeladmin_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.core.exceptions import ObjectDoesNotExist
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.translation import gettext as _
from edc_constants.constants import NEW

from ..models import QaReportLog
Expand Down Expand Up @@ -93,13 +94,13 @@ def notes(self, obj=None) -> str:
except ObjectDoesNotExist:
note_model_obj = None
url = reverse(f"{note_app_label}_admin:{note_url_name}_add")
title = "Add"
title = _("Add if pending or cannot be resolved")
else:
url = reverse(
f"{note_app_label}_admin:{note_url_name}_change",
args=(note_model_obj.id,),
)
title = "Edit"
title = _("Edit if pending or cannot be resolved")

url = (
f"{url}?next={next_url_name},subject_identifier,q"
Expand All @@ -112,9 +113,9 @@ def notes(self, obj=None) -> str:

def get_notes_label(self, obj) -> str:
if not obj:
label = "Add"
label = _("Add")
elif not obj.note:
label = "Edit"
label = _("Edit")
else:
label = truncate_string(obj.note, max_length=35)
return label
7 changes: 4 additions & 3 deletions edc_qareports/sql_generator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .qa_case import QaCase, QaCaseError
from .crf_case import CrfCase, CrfCaseError
from .crf_subquery import CrfSubquery
from .requisition_case import RequisitionCase
from .requisition_subquery import RequisitionSubquery
from .sql_view_generator import SqlViewGenerator
from .subquery import Subquery
from .subquery_from_dict import subquery_from_dict
Original file line number Diff line number Diff line change
@@ -1,43 +1,40 @@
from dataclasses import dataclass, field

import sqlglot
from django.apps import apps as django_apps
from django.db import OperationalError, connection

from .subquery_from_dict import subquery_from_dict
from .crf_subquery import CrfSubquery


class QaCaseError(Exception):
class CrfCaseError(Exception):
pass


@dataclass(kw_only=True)
class QaCase:
class CrfCase:
label: str = None
dbtable: str = None
label_lower: str = None
fld_name: str | None = None
where: str | None = None
list_tables: list[tuple[str, str, str]] | None = field(default_factory=list)

def __post_init__(self):
if self.fld_name is None and self.where is None:
raise QaCaseError("Expected either 'fld_name' or 'where'. Got None for both.")
elif self.fld_name is not None and self.where is not None:
raise QaCaseError("Expected either 'fld_name' or 'where', not both.")
subjectvisit_dbtable: str | None = None

@property
def sql(self):
return subquery_from_dict([self.__dict__])
sql = CrfSubquery(**self.__dict__).sql
vendor = "postgres" if connection.vendor.startswith("postgres") else connection.vendor
return sqlglot.transpile(sql, read="mysql", write=vendor)[0]

@property
def model_cls(self):
return django_apps.get_model(self.label_lower)

def fetchall(self):
sql = subquery_from_dict([self.__dict__])
with connection.cursor() as cursor:
try:
cursor.execute(sql)
cursor.execute(self.sql)
except OperationalError as e:
raise QaCaseError(f"{e}. See {self}.")
raise CrfCaseError(f"{e}. See {self}.")
return cursor.fetchall()
63 changes: 63 additions & 0 deletions edc_qareports/sql_generator/crf_subquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass, field
from string import Template


class CrfSubqueryError(Exception):
pass


@dataclass(kw_only=True)
class CrfSubquery:
label: str = None
label_lower: str = None
dbtable: str = None
fld_name: str | None = None
subjectvisit_dbtable: str | None = None
where: str | None = None
list_tables: list[tuple[str, str, str]] | None = field(default_factory=list)
template: Template = field(
init=False,
default=Template(
"select v.subject_identifier, crf.id as original_id, crf.subject_visit_id, "
"crf.report_datetime, crf.site_id, v.visit_code, "
"v.visit_code_sequence, v.schedule_name, crf.modified, "
"'${label_lower}' as label_lower, "
"'${label}' as label, count(*) as records "
"from ${dbtable} as crf "
"left join ${subjectvisit_dbtable} as v on v.id=crf.subject_visit_id "
"${left_joins} "
"where ${where} "
"group by v.subject_identifier, crf.subject_visit_id, crf.report_datetime, "
"crf.site_id, v.visit_code, v.visit_code_sequence, v.schedule_name, crf.modified"
),
)

def __post_init__(self):
# default where statement if not provided and have fld_name.
if self.where is None and self.fld_name:
self.where = f"crf.{self.fld_name} is null"
if not self.label_lower:
raise CrfSubqueryError("label_lower is required")
if not self.subjectvisit_dbtable:
self.subjectvisit_dbtable = f"{self.label_lower.split('.')[0]}_subjectvisit"

@property
def left_joins(self) -> str:
"""Add list tbls to access list cols by 'name' instead of 'id'"""
left_join = []
for opts in self.list_tables or []:
list_field, list_dbtable, alias = opts
left_join.append(
f"left join {list_dbtable} as {alias} on crf.{list_field}={alias}.id"
)
return " ".join(left_join)

@property
def sql(self):
opts = {k: v for k, v in self.__dict__.items() if v is not None}
opts.update(left_joins=self.left_joins)
try:
sql = self.template.substitute(**opts).replace(";", "")
except KeyError as e:
raise CrfSubqueryError(e)
return sql
16 changes: 16 additions & 0 deletions edc_qareports/sql_generator/requisition_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import dataclass

from .crf_case import CrfCase
from .requisition_subquery import RequisitionSubquery


@dataclass(kw_only=True)
class RequisitionCase(CrfCase):
panel: str = None
subjectrequisition_dbtable: str | None = None
panel_dbtable: str | None = None

@property
def sql(self):
sql = RequisitionSubquery(**self.__dict__).sql
return sql.format(panel=self.panel)
58 changes: 58 additions & 0 deletions edc_qareports/sql_generator/requisition_subquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass, field
from string import Template

from edc_constants.constants import YES

from .crf_subquery import CrfSubquery


class RequisitionSubqueryError(Exception):
pass


@dataclass(kw_only=True)
class RequisitionSubquery(CrfSubquery):
"""Generate a SELECT query returning requisitions where
is_drawn=Yes for a given panel but results have not been captured
in the result CRF.
For example requisition exists for panel FBC but results_fbc
CRF does not exist.
"""

panel: str = None
subjectrequisition_dbtable: str | None = None
panel_dbtable: str | None = None
template: str = field(
init=False,
default=Template(
"select req.subject_identifier, req.id as original_id, req.subject_visit_id, "
"req.report_datetime, req.site_id, v.visit_code, v.visit_code_sequence, "
"v.schedule_name, req.modified, '${label_lower}' as label_lower, "
"'${label}' as label, count(*) as records "
"from ${subjectrequisition_dbtable} as req "
"left join ${dbtable} as crf on req.id=crf.requisition_id "
"left join ${subjectvisit_dbtable} as v on v.id=req.subject_visit_id "
"${left_joins} "
"left join ${panel_dbtable} as panel on req.panel_id=panel.id "
f"where panel.name='${{panel}}' and req.is_drawn='{YES}' and crf.id is null "
"group by req.id, req.subject_identifier, req.subject_visit_id, "
"req.report_datetime, req.site_id, v.visit_code, v.visit_code_sequence, "
"v.schedule_name, req.modified"
),
)

def __post_init__(self):
# default where statement if not provided and have fld_name.
if self.where is None and self.fld_name:
self.where = f"crf.{self.fld_name} is null"
if not self.label_lower:
raise RequisitionSubqueryError("label_lower is required")
if not self.subjectvisit_dbtable:
self.subjectvisit_dbtable = f"{self.label_lower.split('.')[0]}_subjectvisit"
if not self.subjectrequisition_dbtable:
self.subjectrequisition_dbtable = (
f"{self.label_lower.split('.')[0]}_subjectrequisition"
)
if not self.panel_dbtable:
self.panel_dbtable = "edc_lab_panel"
Loading

0 comments on commit 2d813de

Please sign in to comment.