Skip to content

Commit 0b86fb2

Browse files
wip
1 parent bca60f4 commit 0b86fb2

1 file changed

Lines changed: 103 additions & 2 deletions

File tree

src/eligibility_signposting_api/services/calculators/eligibility_calculator.py

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from itertools import groupby
99
from typing import Any
1010

11+
from localstack.services.stepfunctions.asl.component.state.state_execution.state_map import iteration
1112
from wireup import service
1213

1314
from eligibility_signposting_api.model import eligibility, rules
15+
from eligibility_signposting_api.model.eligibility import Status
1416
from eligibility_signposting_api.services.calculators.rule_calculator import RuleCalculator
1517

1618
Row = Collection[Mapping[str, Any]]
@@ -53,7 +55,7 @@ def person_cohorts(self) -> set[str]:
5355
)
5456
return set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys())
5557

56-
def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
58+
def evaluate_eligibility_back_up(self) -> eligibility.EligibilityStatus:
5759
"""Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
5860

5961
for condition_name, campaign_group in self.campaigns_grouped_by_condition_name:
@@ -68,6 +70,82 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
6870
# Return the overall eligibility status, constructed from the list of condition results
6971
return eligibility.EligibilityStatus(conditions=list(self.results))
7072

73+
from collections import defaultdict
74+
75+
# Assuming cohort_results contains tuples of (IterationCohort, Status, list[Reason])
76+
def get_best_cohort(self,
77+
cohort_results: dict[
78+
str, tuple[rules.IterationCohort, eligibility.Status, list[eligibility.Reason]]]) -> tuple[
79+
rules.IterationCohort, eligibility.Status,
80+
list[
81+
eligibility.Reason]] | None:
82+
83+
84+
if not cohort_results:
85+
return None
86+
87+
# Find the best status across cohorts
88+
best_status = eligibility.Status.best(*[result[1] for result in cohort_results.values()])
89+
90+
# Filter cohorts that match the best status
91+
best_cohorts = [result for result in cohort_results.values() if result[1] == best_status]
92+
93+
# Pick the cohort with the highest priority
94+
#TODO return the appropriate result
95+
# Actionable,Actionable, not-actionable, not-actionable, not eligible,not eligible
96+
return max(best_cohorts, key=lambda cohort: cohort[0].priority) if best_cohorts else None
97+
98+
def evaluate_eligibility(self):
99+
"""Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
100+
priority_getter = attrgetter("priority")
101+
results: dict[
102+
str, tuple[rules.IterationCohort, eligibility.Status, list[eligibility.Reason]]] = defaultdict()
103+
for condition_name, campaign_group in self.campaigns_grouped_by_condition_name:
104+
iteration_results: dict[
105+
str, tuple[rules.IterationCohort, eligibility.Status, list[eligibility.Reason]]] = defaultdict()
106+
for active_iteration in [cc.current_iteration for cc in campaign_group]:
107+
cohort_results: dict[
108+
str, tuple[rules.IterationCohort, eligibility.Status, list[eligibility.Reason]]] = defaultdict()
109+
rules_by_type = {rule_type: tuple(
110+
rule for rule in active_iteration.iteration_rules if attrgetter("type")(rule) == rule_type)
111+
for rule_type in {"F", "R", "S"}}
112+
113+
for cohort in sorted(active_iteration.iteration_cohorts, key=priority_getter):
114+
# Check base Eligibility
115+
if cohort.cohort_label in self.person_cohorts:
116+
# Base eligible
117+
# Check Eligibility - F - Rules
118+
eli_flag: bool = True
119+
for _, rule_group in groupby(sorted(rules_by_type["F"], key=priority_getter),
120+
key=priority_getter):
121+
# iter F rules by priority and grouping
122+
# find first exclusion - throws
123+
status, group_actionable, group_exclusions = self.evaluate_rules_priority_group(rule_group)
124+
if status.is_exclusion:
125+
cohort_results[cohort.cohort_label] = (cohort, status, group_exclusions)
126+
eli_flag = False
127+
break
128+
if not eli_flag: continue
129+
130+
for _, rule_group in groupby(sorted(rules_by_type["S"], key=priority_getter),
131+
key=priority_getter):
132+
status, group_actionable, group_exclusions = self.evaluate_rules_priority_group(
133+
rule_group)
134+
if status.is_exclusion:
135+
cohort_results[cohort.cohort_label] = (cohort, status, group_exclusions)
136+
break
137+
else:
138+
# Not base eligibility
139+
cohort_results[cohort.cohort_label] = (cohort, eligibility.Status.not_eligible, [])
140+
141+
# Determine Result between cohorts
142+
iteration_results[active_iteration.name] = self.get_best_cohort(cohort_results)
143+
# Determine results between iterations
144+
results[condition_name] = self.get_best_cohort(iteration_results)
145+
# Consolidate all the results and return
146+
147+
return eligibility.EligibilityStatus(conditions=list(self.results))
148+
71149
def get_the_base_eligible_campaigns(self, campaign_group: list[rules.CampaignConfig]) -> list[rules.CampaignConfig]:
72150
"""Return campaigns for which the person is base eligible via cohorts."""
73151

@@ -135,7 +213,7 @@ def evaluate_priority_group(
135213
ir
136214
for ir in iteration_rule_group
137215
if ir.type in (rules.RuleType.filter, rules.RuleType.suppression)
138-
and (ir.cohort_label is None or (ir.cohort_label in self.person_cohorts))
216+
and (ir.cohort_label is None or (ir.cohort_label in self.person_cohorts))
139217
]
140218

141219
best_status = eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable
@@ -154,3 +232,26 @@ def evaluate_priority_group(
154232
if worst_group_status.is_exclusion:
155233
is_rule_stop = any(rule.rule_stop for rule in exclude_capable_rules)
156234
return worst_group_status, actionable_reasons, exclusion_reasons, is_rule_stop
235+
236+
def evaluate_rules_priority_group( # TODO refractor
237+
self,
238+
iteration_rule_group: Iterator[rules.IterationRule]
239+
) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason]]:
240+
status = Status.not_eligible
241+
exclusion_reasons, actionable_reasons = [], []
242+
exclude_capable_rules = [
243+
ir
244+
for ir in iteration_rule_group
245+
if ir.type in (rules.RuleType.filter, rules.RuleType.suppression)
246+
and (ir.cohort_label is None or (ir.cohort_label in self.person_cohorts))
247+
]
248+
249+
for rule in exclude_capable_rules:
250+
rule_calculator = RuleCalculator(person_data=self.person_data, rule=rule)
251+
status, reason = rule_calculator.evaluate_exclusion()
252+
if status.is_exclusion:
253+
exclusion_reasons.append(reason)
254+
else:
255+
actionable_reasons.append(reason)
256+
257+
return status, actionable_reasons, exclusion_reasons

0 commit comments

Comments
 (0)