Skip to content

Commit

Permalink
add database models for statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
MaHaWo committed Dec 3, 2024
1 parent 49e026c commit 6212175
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 272 deletions.
32 changes: 25 additions & 7 deletions mondey_backend/src/mondey_backend/models/milestones.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,27 +193,45 @@ class MilestoneAnswerSessionPublic(SQLModel):
answers: dict[int, MilestoneAnswerPublic]


class Statistics(SQLModel):
class MilestoneAgeScore(SQLModel, table=True):
id: int | None = Field(primary_key=True, default=None)
collection_id: int | None = Field(
default=None, foreign_key="milestoneagescorecollection.id"
)
collection: MilestoneAgeScoreCollection = back_populates("scores")
avg_score: float
stddev_score: float
age_months: int
expected_score: float


class MilestoneAgeScores(SQLModel, table=True):
milestone_id: int = Field(primary_key=True, default=None)
scores: list[Statistics]
class MilestoneAgeScoreCollection(SQLModel, table=True):
id: int | None = Field(primary_key=True, default=None)
milestone_id: int = Field(default=None, foreign_key="milestone.id")
expected_age: int
scores: Mapped[list[MilestoneAgeScore]] = back_populates("collection")
created_at: datetime.datetime = Field(
sa_column_kwargs={
"server_default": text("CURRENT_TIMESTAMP"),
}
)


class MilestoneGroupAgeScores(SQLModel, table=True):
milestonegroup_id: int = Field(primary_key=True, default=None)
scores: list[Statistics]
class MilestoneGroupAgeScore(SQLModel, table=True):
id: int = Field(primary_key=True, default=None)
collection_id: int | None = Field(
default=None, foreign_key="milestonegroupagescorecollection.id"
)
collection: MilestoneGroupAgeScoreCollection = back_populates("scores")
avg_score: float
stddev_score: float
age_months: int


class MilestoneGroupAgeScoreCollection(SQLModel, table=True):
id: int | None = Field(primary_key=True, default=None)
milestonegroup_id: int = Field(default=None, foreign_key="milestonegroup.id")
scores: Mapped[list[MilestoneGroupAgeScore]] = back_populates("collection")
created_at: datetime.datetime = Field(
sa_column_kwargs={
"server_default": text("CURRENT_TIMESTAMP"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ...models.milestones import Language
from ...models.milestones import Milestone
from ...models.milestones import MilestoneAdmin
from ...models.milestones import MilestoneAgeScores
from ...models.milestones import MilestoneAgeScoreCollection
from ...models.milestones import MilestoneGroup
from ...models.milestones import MilestoneGroupAdmin
from ...models.milestones import MilestoneGroupText
Expand Down Expand Up @@ -183,7 +183,7 @@ async def delete_submitted_milestone_image(
@router.get("/milestone-age-scores/{milestone_id}")
def get_milestone_age_scores(
session: SessionDep, milestone_id: int
) -> MilestoneAgeScores:
) -> MilestoneAgeScoreCollection:
return calculate_milestone_statistics_by_age(session, milestone_id)

return router
238 changes: 75 additions & 163 deletions mondey_backend/src/mondey_backend/routers/scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,13 @@
from enum import Enum

import numpy as np
from sqlmodel import col
from sqlmodel import select

from ..dependencies import CurrentActiveUserDep
from ..dependencies import SessionDep
from ..models.children import Child
from ..models.milestones import MilestoneAgeScore
from ..models.milestones import MilestoneAgeScores
from ..models.milestones import MilestoneAgeScoreCollection
from ..models.milestones import MilestoneAnswer
from ..models.milestones import MilestoneAnswerSession
from ..models.milestones import MilestoneGroupStatistics
from .utils import _session_has_expired
from .utils import calculate_milestone_statistics_by_age
from .utils import calculate_milestonegroup_statistics
from .utils import get_child_age_in_months
from .utils import get_milestonegroups_for_answersession

Expand All @@ -38,7 +31,7 @@ class TrafficLight(Enum):


def compute_feedback_simple(
stat: MilestoneAgeScore | MilestoneGroupStatistics,
stat: MilestoneAgeScore,
score: float,
min_score: float | None = None,
) -> int:
Expand All @@ -59,54 +52,57 @@ def compute_feedback_simple(
1 if score > avg - stddev (trafficlight: green)
"""

# TODO: implement logic anew with the new answersession structure
def leq(val: float, lim: float) -> bool:
return val < lim or np.isclose(val, lim)

if stat.stddev_score < 1e-2:
# README: This happens when all the scores are the same, so any
# deviation towards lower values can be interpreted as
# underperformance.
# This logic relies on the score being integers, such that when the
# stddev is 0, the avg is an integer
# TODO: Check again what client wants to happen in such cases?
lim_lower = stat.avg_score - 2
lim_upper = stat.avg_score - 1
else:
lim_lower = stat.avg_score - 2 * stat.stddev_score
lim_upper = stat.avg_score - stat.stddev_score

if leq(score, lim_lower):
return TrafficLight.red.value
elif score > lim_lower and leq(score, lim_upper):
if min_score is not None and min_score < lim_lower:
return TrafficLight.yellowWithCaveat.value
return TrafficLight.yellow.value
else:
if min_score is not None and min_score < lim_upper:
return TrafficLight.greenWithCaveat.value
return TrafficLight.green.value
return TrafficLight.green.value
# if stat.stddev_score < 1e-2:
# # README: This happens when all the scores are the same, so any
# # deviation towards lower values can be interpreted as
# # underperformance.
# # This logic relies on the score being integers, such that when the
# # stddev is 0, the avg is an integer
# # TODO: Check again what client wants to happen in such cases?
# lim_lower = stat.avg_score - 2
# lim_upper = stat.avg_score - 1
# else:
# lim_lower = stat.avg_score - 2 * stat.stddev_score
# lim_upper = stat.avg_score - stat.stddev_score

# if leq(score, lim_lower):
# return TrafficLight.red.value
# elif score > lim_lower and leq(score, lim_upper):
# if min_score is not None and min_score < lim_lower:
# return TrafficLight.yellowWithCaveat.value
# return TrafficLight.yellow.value
# else:
# if min_score is not None and min_score < lim_upper:
# return TrafficLight.greenWithCaveat.value
# return TrafficLight.green.value


def compute_detailed_feedback_for_answers(
session: SessionDep,
answers: list[MilestoneAnswer],
statistics: dict[int, MilestoneAgeScores],
statistics: dict[int, MilestoneAgeScoreCollection],
age: int,
) -> dict[int, int]:
milestonegroup_result: dict[int, int] = {} # type: ignore
for answer in answers:
if statistics.get(answer.milestone_id) is None: # type: ignore
stat = calculate_milestone_statistics_by_age(
session,
answer.milestone_id, # type: ignore
) # type: ignore

statistics[answer.milestone_id] = stat # type: ignore
feedback = compute_feedback_simple(
statistics[answer.milestone_id].scores[age], # type: ignore
answer.answer, # type: ignore
) # type: ignore
milestonegroup_result[answer.milestone_id] = feedback # type: ignore
# TODO: implement logic anew with the new answersession structure
# for answer in answers:
# if statistics.get(answer.milestone_id) is None: # type: ignore
# stat = calculate_milestone_statistics_by_age(
# session,
# answer.milestone_id, # type: ignore
# ) # type: ignore

# statistics[answer.milestone_id] = stat # type: ignore
# feedback = compute_feedback_simple(
# statistics[answer.milestone_id].scores[age], # type: ignore
# answer.answer, # type: ignore
# ) # type: ignore
# milestonegroup_result[answer.milestone_id] = feedback # type: ignore
return milestonegroup_result


Expand All @@ -115,25 +111,27 @@ def compute_detailed_milestonegroup_feedback_for_answersession(
answersession: MilestoneAnswerSession,
child: Child,
) -> dict[int, dict[int, int]]:
# TODO: implement logic anew with the new answersession structure

age = get_child_age_in_months(child, answersession.created_at)
milestonegroups = get_milestonegroups_for_answersession(session, answersession)

filtered_answers = {
m.id: [
answersession.answers[ms.id]
for ms in m.milestones
if ms.id in answersession.answers and ms.id is not None
]
for mid, m in milestonegroups.items()
}
# filtered_answers = {
# m.id: [
# answersession.answers[ms.id]
# for ms in m.milestones
# if ms.id in answersession.answers and ms.id is not None
# ]
# for mid, m in milestonegroups.items()
# }

result: dict[int, dict[int, int]] = {}
statistics: dict[int, MilestoneAgeScores] = {}
for milestonegroup_id, answers in filtered_answers.items():
milestonegroup_result = compute_detailed_feedback_for_answers(
session, answers, statistics, age
)
result[milestonegroup_id] = milestonegroup_result # type: ignore
# statistics: dict[int, MilestoneAgeScoreCollection] = {}
# for milestonegroup_id, answers in filtered_answers.items():
# milestonegroup_result = compute_detailed_feedback_for_answers(
# session, answers, statistics, age
# )
# result[milestonegroup_id] = milestonegroup_result # type: ignore
return result


Expand All @@ -146,111 +144,25 @@ def compute_summary_milestonegroup_feedback_for_answersession(
) -> dict[int, int]:
age = get_child_age_in_months(child, answersession.created_at)

# TODO: double check if this does the right thing
# TODO: implement logic anew with the new answersession structure

milestonegroups = get_milestonegroups_for_answersession(session, answersession)

filtered_answers = {
milestonegroup.id: [
answersession.answers[ms.id]
for ms in milestonegroup.milestones
if ms.id in answersession.answers and ms.id is not None
]
for mid, milestonegroup in milestonegroups.items()
}

milestone_group_results: dict[int, int] = {}
for milestonegroup_id, answers in filtered_answers.items():
mg_stat = calculate_milestonegroup_statistics(
session,
milestonegroup_id, # type: ignore
age,
age_lower=age - age_limit_low,
age_upper=age + age_limit_high,
)
mg_stat.session_id = answersession.id # type: ignore
mg_stat.child_id = child.id # type: ignore

mean_for_mg = np.nan_to_num(np.mean([a.answer for a in answers]))
min_for_mg = np.nan_to_num(np.min([a.answer for a in answers]))

result = compute_feedback_simple(mg_stat, mean_for_mg, min_for_mg)
milestone_group_results[milestonegroup_id] = result # type: ignore
# for milestonegroup_id, answers in filtered_answers.items():
# mg_stat = calculate_milestonegroup_statistics(
# session,
# milestonegroup_id, # type: ignore
# age,
# age_lower=age - age_limit_low,
# age_upper=age + age_limit_high,
# )
# mg_stat.session_id = answersession.id # type: ignore
# mg_stat.child_id = child.id # type: ignore

# mean_for_mg = np.nan_to_num(np.mean([a.answer for a in answers]))
# min_for_mg = np.nan_to_num(np.min([a.answer for a in answers]))

# result = compute_feedback_simple(mg_stat, mean_for_mg, min_for_mg)
# milestone_group_results[milestonegroup_id] = result # type: ignore
return milestone_group_results


def compute_summary_milestonegroup_feedback_for_all_sessions(
session: SessionDep,
child: Child,
age_limit_low=6,
age_limit_high=6,
) -> dict[str, dict[int, int]]:
results: dict[str, dict[int, int]] = {}

# get all answer sessions and filter for completed ones
answersessions = [
a
for a in session.exec(
select(MilestoneAnswerSession).where(
col(MilestoneAnswerSession.child_id) == child.id
)
).all()
if _session_has_expired(a)
]

if answersessions == []:
return results
else:
for answersession in answersessions:
milestone_group_results = (
compute_summary_milestonegroup_feedback_for_answersession(
session,
answersession,
child,
age_limit_low=age_limit_low,
age_limit_high=age_limit_high,
)
)

datestring = answersession.created_at.strftime("%d-%m-%Y")
results[datestring] = milestone_group_results

return results


def compute_detailed_milestonegroup_feedback_for_all_sessions(
session: SessionDep,
current_active_user: CurrentActiveUserDep,
child: Child,
) -> dict[str, dict[int, dict[int, int]]]:
results: dict[str, dict[int, dict[int, int]]] = {}

user = current_active_user()
# get all answer sessions and filter for completed ones
answersessions = [
a
for a in session.exec(
select(MilestoneAnswerSession).where(
col(MilestoneAnswerSession.child_id) == child.id
and col(MilestoneAnswerSession.user_id) == user.id
)
).all()
if _session_has_expired(a)
]

if answersessions == []:
return results
else:
for answersession in answersessions:
milestone_group_results = (
compute_detailed_milestonegroup_feedback_for_answersession(
session,
answersession,
child,
)
)

datestring = answersession.created_at.strftime("%d-%m-%Y")
results[datestring] = milestone_group_results

return results
Loading

0 comments on commit 6212175

Please sign in to comment.