|
1 | 1 | import logging |
2 | 2 | from datetime import UTC, datetime |
3 | | -from operator import attrgetter |
4 | 3 | from uuid import UUID |
5 | 4 |
|
6 | 5 | from flask import Request, g |
@@ -66,57 +65,35 @@ def append_audit_condition( |
66 | 65 | campaign_details: tuple[CampaignID | None, CampaignVersion | None], |
67 | 66 | redirect_rule_details: tuple[RulePriority | None, RuleName | None], |
68 | 67 | ) -> None: |
69 | | - audit_eligibility_cohorts, audit_eligibility_cohort_groups, audit_actions = [], [], [] |
| 68 | + audit_eligibility_cohorts, audit_eligibility_cohort_groups = [], [] |
70 | 69 | audit_filter_rule, audit_suitability_rule, audit_redirect_rule = None, None, None |
71 | 70 | best_active_iteration = best_results[0] |
72 | 71 | best_candidate = best_results[1] |
73 | 72 | best_cohort_results = best_results[2] |
74 | 73 |
|
75 | 74 | if best_cohort_results: |
76 | | - for value in sorted(best_cohort_results.values(), key=attrgetter("cohort_code")): |
77 | | - cohort_status_name = value.status.name if value.status else None |
| 75 | + for cohort_label, result in sorted(best_cohort_results.items(), key=lambda item: item[1].cohort_code): |
| 76 | + cohort_status_name = result.status.name if result.status else None |
78 | 77 | audit_eligibility_cohorts.append( |
79 | | - AuditEligibilityCohorts(cohort_code=value.cohort_code, cohort_status=cohort_status_name) |
| 78 | + AuditEligibilityCohorts(cohort_code=cohort_label, cohort_status=cohort_status_name) |
80 | 79 | ) |
81 | 80 |
|
82 | 81 | audit_eligibility_cohort_groups.append( |
83 | 82 | AuditEligibilityCohortGroups( |
84 | | - cohort_code=value.cohort_code, cohort_status=cohort_status_name, cohort_text=value.description |
| 83 | + cohort_code=result.cohort_code, cohort_status=cohort_status_name, cohort_text=result.description |
85 | 84 | ) |
86 | 85 | ) |
87 | 86 |
|
88 | | - if value.audit_rules and best_candidate: |
89 | | - if best_candidate.status and best_candidate.status.name == Status.not_eligible.name: |
90 | | - audit_filter_rule = AuditFilterRule( |
91 | | - rule_priority=value.audit_rules[0].rule_priority, |
92 | | - rule_name=value.audit_rules[0].rule_name, |
93 | | - ) |
94 | | - if best_candidate.status and best_candidate.status.name == Status.not_actionable.name: |
95 | | - audit_suitability_rule = AuditSuitabilityRule( |
96 | | - rule_priority=value.audit_rules[0].rule_priority, |
97 | | - rule_name=value.audit_rules[0].rule_name, |
98 | | - rule_message=value.audit_rules[0].rule_description, |
99 | | - ) |
| 87 | + if result.audit_rules and best_candidate: |
| 88 | + audit_filter_rule = AuditContext.create_audit_filter_rule(best_candidate, result) |
| 89 | + audit_suitability_rule = AuditContext.create_audit_suitability_rule(best_candidate, result) |
100 | 90 |
|
101 | 91 | if best_candidate and best_candidate.status and best_candidate.status.name == Status.actionable.name: |
102 | 92 | audit_redirect_rule = AuditRedirectRule( |
103 | 93 | rule_priority=str(redirect_rule_details[0]), rule_name=redirect_rule_details[1] |
104 | 94 | ) |
105 | 95 |
|
106 | | - if suggested_actions is None: |
107 | | - audit_actions = None |
108 | | - elif len(suggested_actions) > 0: |
109 | | - for action in suggested_actions: |
110 | | - audit_actions.append( |
111 | | - AuditAction( |
112 | | - internal_action_code=action.internal_action_code, |
113 | | - action_code=action.action_code, |
114 | | - action_type=action.action_type, |
115 | | - action_description=action.action_description, |
116 | | - action_url=str(action.url_link) if action.url_link else None, |
117 | | - action_url_label=action.url_label, |
118 | | - ) |
119 | | - ) |
| 96 | + audit_actions = AuditContext.create_audit_actions(suggested_actions) |
120 | 97 |
|
121 | 98 | audit_condition = AuditCondition( |
122 | 99 | campaign_id=campaign_details[0], |
@@ -144,3 +121,45 @@ def add_response_details(response_id: UUID, last_updated: datetime) -> None: |
144 | 121 | @staticmethod |
145 | 122 | def write_to_firehose(service: AuditService) -> None: |
146 | 123 | service.audit(g.audit_log.model_dump(by_alias=True)) |
| 124 | + |
| 125 | + @staticmethod |
| 126 | + def create_audit_actions(suggested_actions: list[SuggestedAction] | None) -> list[AuditAction] | None: |
| 127 | + audit_actions = [] |
| 128 | + if suggested_actions is None: |
| 129 | + audit_actions = None |
| 130 | + elif len(suggested_actions) > 0: |
| 131 | + for action in suggested_actions: |
| 132 | + audit_actions.append( |
| 133 | + AuditAction( |
| 134 | + internal_action_code=action.internal_action_code, |
| 135 | + action_code=action.action_code, |
| 136 | + action_type=action.action_type, |
| 137 | + action_description=action.action_description, |
| 138 | + action_url=str(action.url_link) if action.url_link else None, |
| 139 | + action_url_label=action.url_label, |
| 140 | + ) |
| 141 | + ) |
| 142 | + return audit_actions |
| 143 | + |
| 144 | + @staticmethod |
| 145 | + def create_audit_suitability_rule( |
| 146 | + best_candidate: IterationResult, result: CohortGroupResult |
| 147 | + ) -> AuditSuitabilityRule | None: |
| 148 | + audit_suitability_rule = None |
| 149 | + if best_candidate.status and best_candidate.status.name == Status.not_actionable.name: |
| 150 | + audit_suitability_rule = AuditSuitabilityRule( |
| 151 | + rule_priority=result.audit_rules[0].rule_priority, |
| 152 | + rule_name=result.audit_rules[0].rule_name, |
| 153 | + rule_message=result.audit_rules[0].rule_description, |
| 154 | + ) |
| 155 | + return audit_suitability_rule |
| 156 | + |
| 157 | + @staticmethod |
| 158 | + def create_audit_filter_rule(best_candidate: IterationResult, result: CohortGroupResult) -> AuditFilterRule | None: |
| 159 | + audit_filter_rule = None |
| 160 | + if best_candidate.status and best_candidate.status.name == Status.not_eligible.name: |
| 161 | + audit_filter_rule = AuditFilterRule( |
| 162 | + rule_priority=result.audit_rules[0].rule_priority, |
| 163 | + rule_name=result.audit_rules[0].rule_name, |
| 164 | + ) |
| 165 | + return audit_filter_rule |
0 commit comments