diff --git a/pyproject.toml b/pyproject.toml index 2e3eec3..9e869c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sqlonfhir" -version = "0.0.1" +version = "0.0.2" authors = [ { name="Stuart Mackle", email="stuart.mackle@sas.com" }, { name="SAS Healthcare Solutions", email="support@sas.com" } diff --git a/sqlonfhir/sqlonfhir.py b/sqlonfhir/sqlonfhir.py index e4377c5..882a139 100644 --- a/sqlonfhir/sqlonfhir.py +++ b/sqlonfhir/sqlonfhir.py @@ -1,165 +1,188 @@ # Copyright © 2025, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from fhirpathpy import evaluate +from fhirpathpy import compile from fhirpathpy.models import models -# This is here to change identified.ofType(boolean) to identifiedBoolean.ofType(boolean) -# This function should be replaced at some point as fhirpathpy and the SQL on FHIR specifications converge -def escape_path(path): - return path.replace("$this", "identity()") - - -def eval_fhirpath(resource, path): - path = escape_path(path) - user_invocation_table = { - "getReferenceKey": { - "fn": get_reference_key, - "arity": {0: [], 1: ["Identifier"]}, - }, - "getResourceKey": {"fn": get_resource_key}, - "identity": {"fn": identity}, - } - e = evaluate( - resource, - path, - options={"userInvocationTable": user_invocation_table}, - model=models["r4"], - ) - return e - - -def identity(resource): - return resource - - -def union_all(expr, resource, view_definition): - result = [] - for expression in expr["unionAll"]: - result += call_fn(expression, resource, view_definition) - return result - - -def for_each(expr, resource, view_definition): - result = [] - replaced_path = replace_constants(expr["forEach"], view_definition) - selections = eval_fhirpath(resource, replaced_path) - for selection in selections: - result += select(expr, selection, view_definition) - return result - - -def get_all_child_columns(expression): - empty_record = {} - if "column" in expression: - for column in expression["column"]: - empty_record[column["name"]] = None - if "select" in expression: - for selection in expression["select"]: - empty_record = empty_record | get_all_child_columns(selection) - if "unionAll" in expression: - for selection in expression["unionAll"]: - empty_record = empty_record | get_all_child_columns(selection) - return empty_record - - -def for_each_or_null(expr, resource, view_definition): - result = [] - selections = eval_fhirpath(resource, expr["forEachOrNull"]) - if len(selections) == 0: - return [get_all_child_columns(expr)] - for selection in selections: - result += select(expr, selection, view_definition) - return result - - -def row_product(parts): - if len(parts) == 1: - return parts[0] - rows = [{}] - new_rows = None - for part in parts: - new_rows = [] - for partial_row in part: - for row in rows: - new_rows.append(partial_row | row) - rows = new_rows - return rows - - -def select(expr, resource, view_definition): - if "where" in expr: - for condition in expr["where"]: - replaced_path = replace_constants(condition["path"], view_definition) - val = eval_fhirpath(resource, replaced_path) - if len(val) == 0: - return [] - elif not val[0]: - return [] - elif type(val[0]) is not bool: - raise Exception("Where clause did not evaluate to boolean") - sub_selections = [] - for selection in expr["select"]: - selection_evaluation = call_fn(selection, resource, view_definition) - if selection_evaluation != []: - sub_selections.append(selection_evaluation) - else: - return [] - return row_product(sub_selections) - - -def column(expr, resource, view_definition): - record = {} - for column in expr["column"]: - replaced_path = replace_constants(column["path"], view_definition) - value = eval_fhirpath(resource, replaced_path) - if "collection" in column and column["collection"]: - record[column["name"]] = value - elif len(value) == 1: - record[column["name"]] = value[0] - elif len(value) == 0: - record[column["name"]] = None - else: - raise Exception("Unexpected multiple values") - return [record] - - -def call_fn(expr, resource, view_definition): - if "forEachOrNull" in expr: - return for_each_or_null(expr, resource, view_definition) - elif "forEach" in expr: - return for_each(expr, resource, view_definition) - elif "select" in expr: - return select(expr, resource, view_definition) - elif "unionAll" in expr: - return union_all(expr, resource, view_definition) - elif "column" in expr: - return column(expr, resource, view_definition) - - -def get_resource_key(ctx): - return ctx[0]["id"] - - -def get_reference_key(ctx, identifier=None): - if identifier and not ctx[0]["reference"].startswith(identifier): - return None - index = ctx[0]["reference"].find("/") - return ctx[0]["reference"][index + 1 :] - - def eval(resources, view_definition): norm = normalize(view_definition) result = [] + evaluator = ViewDefinitionEvaluator() for resource in resources: if resource["resourceType"] != view_definition["resource"]: continue - result += call_fn(norm, resource, view_definition) + result += evaluator.call_fn(norm, resource, view_definition) return result +# View Definition Evaluation +class ViewDefinitionEvaluator: + def __init__(self): + self.fhirpath_cache = {} + + def eval_fhirpath(self, resource, path): + # $this is not in the FHIRPath spec, replacing as per + # reference implementation + path = path.replace("$this", "identity()") + # Add functions to FHIRPath that are needed for SQL on FHIR + user_invocation_table = { + "getReferenceKey": { + "fn": self.get_reference_key, + "arity": {0: [], 1: ["Identifier"]}, + }, + "getResourceKey": {"fn": self.get_resource_key}, + "identity": {"fn": self.identity}, + } + + if path not in self.fhirpath_cache: + self.fhirpath_cache[path] = compile( + path, + model=models["r4"], + options={"userInvocationTable": user_invocation_table}, + ) + + return self.fhirpath_cache[path](resource) + + def union_all(self, expr, resource, view_definition): + result = [] + for expression in expr["unionAll"]: + result += self.call_fn(expression, resource, view_definition) + return result + + def for_each(self, expr, resource, view_definition): + result = [] + replaced_path = self.replace_constants(expr["forEach"], view_definition) + selections = self.eval_fhirpath(resource, replaced_path) + for selection in selections: + result += self.select(expr, selection, view_definition) + return result + + def get_all_child_columns(self, expression): + empty_record = {} + if "column" in expression: + for column in expression["column"]: + empty_record[column["name"]] = None + if "select" in expression: + for selection in expression["select"]: + empty_record = empty_record | self.get_all_child_columns(selection) + if "unionAll" in expression: + for selection in expression["unionAll"]: + empty_record = empty_record | self.get_all_child_columns(selection) + return empty_record + + def for_each_or_null(self, expr, resource, view_definition): + result = [] + selections = self.eval_fhirpath(resource, expr["forEachOrNull"]) + if len(selections) == 0: + return [self.get_all_child_columns(expr)] + for selection in selections: + result += self.select(expr, selection, view_definition) + return result + + def select(self, expr, resource, view_definition): + if "where" in expr: + for condition in expr["where"]: + replaced_path = self.replace_constants( + condition["path"], view_definition + ) + val = self.eval_fhirpath(resource, replaced_path) + if len(val) == 0 or not val[0]: + return [] + elif type(val[0]) is not bool: + raise Exception("Where clause did not evaluate to boolean") + sub_selections = [] + for selection in expr["select"]: + selection_evaluation = self.call_fn(selection, resource, view_definition) + if selection_evaluation != []: + sub_selections.append(selection_evaluation) + else: + return [] + return self.row_product(sub_selections) + + def column(self, expr, resource, view_definition): + record = {} + for column in expr["column"]: + replaced_path = self.replace_constants(column["path"], view_definition) + value = self.eval_fhirpath(resource, replaced_path) + if "collection" in column and column["collection"]: + record[column["name"]] = value + elif len(value) == 1: + record[column["name"]] = value[0] + elif len(value) == 0: + record[column["name"]] = None + else: + raise Exception("Unexpected multiple values") + return [record] + + def call_fn(self, expr, resource, view_definition): + if "forEachOrNull" in expr: + return self.for_each_or_null(expr, resource, view_definition) + elif "forEach" in expr: + return self.for_each(expr, resource, view_definition) + elif "select" in expr: + return self.select(expr, resource, view_definition) + elif "unionAll" in expr: + return self.union_all(expr, resource, view_definition) + elif "column" in expr: + return self.column(expr, resource, view_definition) + + # Utility functions + @staticmethod + def row_product(parts): + if len(parts) == 1: + return parts[0] + rows = [{}] + new_rows = None + for part in parts: + new_rows = [] + for partial_row in part: + for row in rows: + new_rows.append(partial_row | row) + rows = new_rows + return rows + + @staticmethod + def replace_constants(path, view_definition): + for constant in view_definition.get("constant") or []: + value_key = [vk for vk in constant.keys() if vk.startswith("value")] + if "valueBoolean" in constant: + path = path.replace( + "%" + constant["name"], str(constant["valueBoolean"]).lower() + ) + elif value_key[0] in [ + "valueInteger", + "valueDecimal", + "valueUnsignedInt", + "valuePositiveInt", + ]: + path = path.replace("%" + constant["name"], str(constant[value_key[0]])) + elif len(value_key) == 1: + path = path.replace( + "%" + constant["name"], "'" + constant[value_key[0]] + "'" + ) + return path + + # FHIRPath Helper Functions + @staticmethod + def get_resource_key(ctx): + return ctx[0]["id"] + + @staticmethod + def get_reference_key(ctx, identifier=None): + if identifier and not ctx[0]["reference"].startswith(identifier): + return None + index = ctx[0]["reference"].find("/") + return ctx[0]["reference"][index + 1 :] + + @staticmethod + def identity(resource): + return resource + + +# View Definition Normalization & Validation def normalize(view): + # Make sure we only operate on keys we have implemented for current_functions = view.keys() & { "select", "column", @@ -170,6 +193,7 @@ def normalize(view): if "forEach" in view or "forEachOrNull" in view: if "select" not in view: view["select"] = [] + # Move column & unionAll under forEach for evaluation for_each() -> union_all()/column() view = move_functions( view, "select", current_functions - {"forEach", "select", "forEachOrNull"} ) @@ -177,7 +201,9 @@ def normalize(view): elif "select" in view: view = move_functions(view, "select", current_functions - {"select"}) view["select"] = [normalize(selection) for selection in view["select"]] - # if unionAll and column are present, we want to select so they are merged at the same level + # if unionAll and column are present make sure it is evaluated as row_product(unionAll + column) + # instead of union_all(column()). We also require select() to make sure both get evaluated + # and union_all doesn't take precedence elif "unionAll" in view and "column" in view: view["select"] = [] view = move_functions(view, "select", current_functions - {"select"}) @@ -199,27 +225,6 @@ def move_functions(view, function, sub_functions): return view -def replace_constants(path, view_definition): - for constant in view_definition.get("constant") or []: - value_key = [vk for vk in constant.keys() if vk.startswith("value")] - if "valueBoolean" in constant: - path = path.replace( - "%" + constant["name"], str(constant["valueBoolean"]).lower() - ) - elif value_key[0] in [ - "valueInteger", - "valueDecimal", - "valueUnsignedInt", - "valuePositiveInt", - ]: - path = path.replace("%" + constant["name"], str(constant[value_key[0]])) - elif len(value_key) == 1: - path = path.replace( - "%" + constant["name"], "'" + constant[value_key[0]] + "'" - ) - return path - - def validate_union_all(union_all): all_column_orderings = [] for expression in union_all: diff --git a/uv.lock b/uv.lock index 4277075..00a3afe 100644 --- a/uv.lock +++ b/uv.lock @@ -148,7 +148,7 @@ wheels = [ [[package]] name = "sqlonfhir" -version = "0.0.1" +version = "0.0.2" source = { editable = "." } dependencies = [ { name = "fhirpathpy" },