Skip to content

Commit 4c8c273

Browse files
cohort label, rules stop relooked
1 parent 0925a7b commit 4c8c273

2 files changed

Lines changed: 135 additions & 202 deletions

File tree

src/eligibility_signposting_api/services/calculators/eligibility_calculator.py

Lines changed: 28 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,6 @@ def person_cohorts(self) -> set[str]:
6060
)
6161
return set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys())
6262

63-
def evaluate_eligibility_back_up(self) -> eligibility.EligibilityStatus:
64-
"""Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
65-
66-
for condition_name, campaign_group in self.campaigns_grouped_by_condition_name:
67-
if base_eligible_campaigns := self.get_the_base_eligible_campaigns(campaign_group):
68-
status, reasons = self.evaluate_eligibility_by_iteration_rules(base_eligible_campaigns)
69-
# Append the evaluation result for this condition to the results list
70-
self.results.append(eligibility.Condition(condition_name, status, reasons))
71-
else:
72-
# Create and append the evaluation result, as no campaign config is base eligible
73-
self.results.append(eligibility.Condition(condition_name, eligibility.Status.not_eligible, []))
74-
75-
# Return the overall eligibility status, constructed from the list of condition results
76-
return eligibility.EligibilityStatus(conditions=list(self.results))
77-
78-
from collections import defaultdict
79-
8063
# Assuming cohort_results contains tuples of (IterationCohort, Status, list[Reason])
8164
def get_best_cohort(self, cohort_results: dict[str, CohortStatus]) -> tuple[Status, list[CohortStatus]]:
8265
# Find the best status across cohorts
@@ -109,7 +92,14 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
10992
# Base eligible
11093
# Check Eligibility - F - Rules
11194
eligibility_flag: bool = True
112-
for _, rule_group in groupby(sorted(rules_filter, key=priority_getter), key=priority_getter):
95+
exclusion_capable_filter_rules = (
96+
ir
97+
for ir in rules_filter
98+
if ir.cohort_label is None or ir.cohort_label in cohort.cohort_label
99+
)
100+
for _, rule_group in groupby(
101+
sorted(exclusion_capable_filter_rules, key=priority_getter), key=priority_getter
102+
):
113103
# iter F rules by priority and grouping
114104
# find first exclusion - throws
115105
status, group_actionable, group_exclusions = self.evaluate_rules_priority_group(rule_group)
@@ -121,21 +111,34 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
121111
# Check Actionable(ity) - S - Rules
122112
if eligibility_flag:
123113
actionable_flag: bool = True
114+
suppression_reasons = []
115+
exclusion_capable_suppression_rules = (
116+
ir
117+
for ir in rules_suppression
118+
if ir.cohort_label is None or ir.cohort_label in cohort.cohort_label
119+
)
124120
for _, rule_group in groupby(
125-
sorted(rules_suppression, key=priority_getter), key=priority_getter
121+
sorted(exclusion_capable_suppression_rules, key=priority_getter), key=priority_getter
126122
):
123+
rule_group = list(rule_group)
127124
# iter S rules by priority and grouping
128125
# find first exclusion - throws
129126
status, group_actionable, group_exclusions = self.evaluate_rules_priority_group(
130-
rule_group
127+
iter(rule_group)
131128
)
132129
if status.is_exclusion:
133-
cohort_results[cohort.cohort_label] = CohortStatus(cohort, status, group_exclusions)
134130
actionable_flag = False
135-
break
131+
suppression_reasons.append(group_exclusions)
132+
if any(rule.rule_stop for rule in rule_group):
133+
break
136134
# No exclusions - actionable
137135
if actionable_flag:
138136
cohort_results[cohort.cohort_label] = CohortStatus(cohort, Status.actionable, [])
137+
else:
138+
cohort_results[cohort.cohort_label] = CohortStatus(
139+
cohort, Status.not_actionable, suppression_reasons
140+
)
141+
139142
else:
140143
# Not base eligibility
141144
cohort_results[cohort.cohort_label] = CohortStatus(cohort, eligibility.Status.not_eligible, [])
@@ -159,106 +162,14 @@ def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
159162
]
160163
return eligibility.EligibilityStatus(conditions=final_result)
161164

162-
def get_the_base_eligible_campaigns(self, campaign_group: list[rules.CampaignConfig]) -> list[rules.CampaignConfig]:
163-
"""Return campaigns for which the person is base eligible via cohorts."""
164-
165-
base_eligible_campaigns: list[rules.CampaignConfig] = [
166-
campaign for campaign in campaign_group if self.check_base_eligibility(campaign.current_iteration)
167-
]
168-
169-
if base_eligible_campaigns:
170-
return base_eligible_campaigns
171-
return []
172-
173-
def check_base_eligibility(self, iteration: rules.Iteration) -> bool:
174-
"""Return cohorts for which person is base eligible."""
175-
iteration_cohorts: set[str] = {
176-
cohort.cohort_label for cohort in iteration.iteration_cohorts if cohort.cohort_label
177-
}
178-
if magic_cohort in iteration_cohorts:
179-
return True
180-
return bool(iteration_cohorts & self.person_cohorts)
181-
182-
def evaluate_eligibility_by_iteration_rules(
183-
self, campaign_group: list[rules.CampaignConfig]
184-
) -> tuple[eligibility.Status, list[eligibility.Reason]]:
185-
"""Evaluate iteration rules to see if the person is actionable, not actionable (due to "S" rules),
186-
or not eligible (due to "F" rules").
187-
188-
For each condition, evaluate all iterations for inclusion or exclusion."""
189-
190-
priority_getter = attrgetter("priority")
191-
192-
status_with_reasons: dict[eligibility.Status, list[eligibility.Reason]] = defaultdict()
193-
194-
for iteration in [cc.current_iteration for cc in campaign_group]:
195-
# Until we see a worse status, we assume someone is actionable for this iteration.
196-
worst_status = eligibility.Status.actionable
197-
exclusion_reasons, actionable_reasons = [], []
198-
by_priority = sorted(iteration.iteration_rules, key=priority_getter)
199-
for _, rule_group in groupby(by_priority, key=priority_getter):
200-
status, group_actionable, group_exclusions, is_rule_stop = self.evaluate_priority_group(
201-
rule_group, worst_status
202-
)
203-
# Merge results
204-
worst_status = status
205-
actionable_reasons.extend(group_actionable)
206-
exclusion_reasons.extend(group_exclusions)
207-
if is_rule_stop:
208-
break
209-
condition_status_entry = status_with_reasons.setdefault(worst_status, [])
210-
condition_status_entry.extend(
211-
actionable_reasons if worst_status is eligibility.Status.actionable else exclusion_reasons
212-
)
213-
214-
best_status = eligibility.Status.best(*list(status_with_reasons.keys()))
215-
216-
return best_status, status_with_reasons[best_status]
217-
218-
def evaluate_priority_group(
219-
self,
220-
iteration_rule_group: Iterator[rules.IterationRule],
221-
worst_status_so_far_for_condition: eligibility.Status,
222-
) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason], bool]:
223-
is_rule_stop = False
224-
exclusion_reasons, actionable_reasons = [], []
225-
exclude_capable_rules = [
226-
ir
227-
for ir in iteration_rule_group
228-
if ir.type in (rules.RuleType.filter, rules.RuleType.suppression)
229-
and (ir.cohort_label is None or (ir.cohort_label in self.person_cohorts))
230-
]
231-
232-
best_status = eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable
233-
234-
for rule in exclude_capable_rules:
235-
rule_calculator = RuleCalculator(person_data=self.person_data, rule=rule)
236-
status, reason = rule_calculator.evaluate_exclusion()
237-
if status.is_exclusion:
238-
best_status = eligibility.Status.best(status, best_status)
239-
exclusion_reasons.append(reason)
240-
else:
241-
best_status = eligibility.Status.actionable
242-
actionable_reasons.append(reason)
243-
244-
worst_group_status = eligibility.Status.worst(best_status, worst_status_so_far_for_condition)
245-
if worst_group_status.is_exclusion:
246-
is_rule_stop = any(rule.rule_stop for rule in exclude_capable_rules)
247-
return worst_group_status, actionable_reasons, exclusion_reasons, is_rule_stop
248-
249165
def evaluate_rules_priority_group(
250166
self, iteration_rule_group: Iterator[rules.IterationRule]
251167
) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason]]:
252168
exclusion_reasons, actionable_reasons = [], []
253-
exclude_capable_rules = [
254-
ir
255-
for ir in iteration_rule_group
256-
if ir.type in (rules.RuleType.filter, rules.RuleType.suppression)
257-
and (ir.cohort_label is None or (ir.cohort_label in self.person_cohorts))
258-
]
259-
best_status = eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable
260169

261-
for rule in exclude_capable_rules:
170+
best_status = eligibility.Status.not_eligible
171+
172+
for rule in iteration_rule_group:
262173
rule_calculator = RuleCalculator(person_data=self.person_data, rule=rule)
263174
status, reason = rule_calculator.evaluate_exclusion()
264175
if status.is_exclusion:

0 commit comments

Comments
 (0)