Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion compose_runner/aws_lambda/run_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def _log(job_id: str, message: str, **details: Any) -> None:
def _compose_api_base_url(environment: str) -> str:
env = (environment or "production").lower()
if env == "staging":
return "https://synth.neurostore.xyz/api"
return "https://staging.synth.neurostore.xyz/api"
if env == "development":
return "https://dev.synth.neurostore.xyz/api"
if env == "local":
return "http://localhost:81/api"
return "https://compose.neurosynth.org/api"
Expand Down
122 changes: 64 additions & 58 deletions compose_runner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,29 @@ class Runner:

_TARGET_SPACE = "mni152_2mm"

_ENTITY_SNAPSHOT_KEYS = {
"studyset": ("studyset_snapshot", "studyset", "cached_studyset"),
"annotation": ("annotation_snapshot", "annotation", "cached_annotation"),
_ENTITY_SNAPSHOT_ID_KEYS = {
"studyset": ("snapshot_studyset_id",),
"annotation": ("snapshot_annotation_id",),
}
_ENTITY_STORE_PATHS = {
"studyset": "studysets",
"annotation": "annotations",
}
_ENTITY_SNAPSHOT_PATHS = {
"studyset": "studysets",
"annotation": "annotations",
"studyset": "snapshot-studysets",
"annotation": "snapshot-annotations",
}
_ENTITY_NEUROSTORE_KEYS = {
"studyset": ("neurostore_studyset", "neurostore_studyset_id", "studyset"),
"studyset": ("neurostore_studyset", "neurostore_studyset_id"),
"annotation": (
"neurostore_annotation",
"neurostore_annotation_id",
"annotation",
),
}
_ENTITY_SNAPSHOT_SUMMARY_KEYS = {
"studyset": ("neurostore_studyset", "studysets"),
"annotation": ("neurostore_annotation", "annotations"),
}
_ENTITY_COMPOSE_PATHS = {
"studyset": "neurostore-studysets",
"annotation": "neurostore-annotations",
Expand Down Expand Up @@ -79,8 +82,8 @@ def __init__(
}
elif environment == "staging":
# staging
self.compose_url = "https://synth.neurostore.xyz/api"
self.store_url = "https://neurostore.xyz/api"
self.compose_url = "https://staging.synth.neurostore.xyz/api"
self.store_url = "https://staging.neurostore.xyz/api"
self.reference_studysets = {
"neurosynth": gen_database_url("staging", "neurosynth"),
"neuroquery": gen_database_url("staging", "neuroquery"),
Expand Down Expand Up @@ -206,7 +209,7 @@ def _get_result_documents(self, meta_analysis):
result_id = result_ref
result_doc = None
elif isinstance(result_ref, dict):
result_id = result_ref.get("id")
result_id = result_ref.get("id") or result_ref.get("result_id")
result_doc = result_ref
else:
continue
Expand All @@ -215,14 +218,7 @@ def _get_result_documents(self, meta_analysis):
continue
if result_id is not None:
seen_ids.add(result_id)

has_snapshot_payload = any(
isinstance(result_doc.get(key), dict)
for key in self._ENTITY_SNAPSHOT_KEYS["studyset"]
+ self._ENTITY_SNAPSHOT_KEYS["annotation"]
) if isinstance(result_doc, dict) else False

if result_doc is None or (result_id is not None and not has_snapshot_payload):
if result_doc is None:
if result_id is None:
continue
result_doc = self._get_json(
Expand Down Expand Up @@ -254,24 +250,33 @@ def _get_entity_snapshot_record(self, entity_name, documents):
for document in documents:
if not isinstance(document, dict):
continue
for key in self._ENTITY_SNAPSHOT_KEYS[entity_name]:
snapshot_document = document.get(key)
payload = self._unwrap_snapshot(snapshot_document)
if is_expected_snapshot(payload):
return payload, self._extract_document_id(snapshot_document)
snapshot_id = self._extract_document_id(snapshot_document)
snapshot_id = None
for key in self._ENTITY_SNAPSHOT_ID_KEYS[entity_name]:
snapshot_id = self._extract_document_id(document.get(key))
if snapshot_id is None:
continue
try:
snapshot_document = self._get_json(
f"{self.compose_url}/{self._ENTITY_SNAPSHOT_PATHS[entity_name]}/{snapshot_id}",
f"Could not download {entity_name} snapshot {snapshot_id}",
)
except requests.exceptions.HTTPError:
continue
payload = self._unwrap_snapshot(snapshot_document)
if is_expected_snapshot(payload):
return payload, self._extract_document_id(snapshot_document) or snapshot_id
break
if snapshot_id is None:
ref_key, summary_key = self._ENTITY_SNAPSHOT_SUMMARY_KEYS[entity_name]
ref_document = document.get(ref_key)
if isinstance(ref_document, dict):
summary_documents = ref_document.get(summary_key) or []
for summary_document in summary_documents:
snapshot_id = self._extract_document_id(summary_document)
if snapshot_id is not None:
break
if snapshot_id is None:
continue
try:
snapshot_document = self._get_json(
f"{self.compose_url}/{self._ENTITY_SNAPSHOT_PATHS[entity_name]}/{snapshot_id}",
f"Could not download {entity_name} snapshot {snapshot_id}",
)
except requests.exceptions.HTTPError:
continue
payload = self._unwrap_snapshot(snapshot_document)
if is_expected_snapshot(payload):
return payload, snapshot_id
return None, None

@staticmethod
Expand Down Expand Up @@ -505,17 +510,27 @@ def apply_filter(self, studyset, annotation):
raise ValueError("Cannot have multiple conditions and a database studyset.")

elif len(conditions) == 2 and not database_studyset:
second_analysis_ids = [
n.analysis.id
for n in annotation.notes
if n.note.get(f"{column}") == weight_conditions[-1]
]
if column_type == "boolean":
second_analysis_ids = [
n.analysis.id
for n in annotation.notes
if not n.note.get(f"{column}")
]
else:
second_analysis_ids = [
n.analysis.id
for n in annotation.notes
if n.note.get(f"{column}") == weight_conditions[-1]
]
second_studyset = studyset.slice(analyses=second_analysis_ids)
second_studyset = second_studyset.combine_analyses()

return first_studyset, second_studyset

elif len(conditions) <= 1 and database_studyset:
# collect user study IDs cheaply before loading the large reference database
study_ids = set(studyset.study_ids)

# Download the gzip file
response = requests.get(self.reference_studysets[database_studyset])

Expand All @@ -537,26 +552,17 @@ def apply_filter(self, studyset, annotation):
# Load the JSON data into a dictionary
reference_studyset_dict = json.loads(json_data)

reference_studyset = Studyset(reference_studyset_dict, target=self._TARGET_SPACE)
# pre-filter at the dict level to exclude user studies before constructing
# Studyset, keeping the object small and avoiding expensive materialize calls
reference_studyset_dict["studies"] = [
s for s in reference_studyset_dict.get("studies", [])
if s["id"] not in study_ids
]

reference_studyset = Studyset(reference_studyset_dict, target=self._TARGET_SPACE)
del reference_studyset_dict
# get study ids from studyset
study_ids = set([s.id for s in studyset.studies])

# reference study ids
reference_study_ids = set([s.id for s in reference_studyset.studies])

keep_study_ids = reference_study_ids - study_ids

# get analysis ids from reference studyset
analysis_ids = [
a.id
for s in reference_studyset.studies
for a in s.analyses
if s.id in keep_study_ids
]
second_studyset = reference_studyset.slice(analyses=analysis_ids)
second_studyset = second_studyset.combine_analyses()
second_studyset = reference_studyset.combine_analyses()

return first_studyset, second_studyset

Expand Down Expand Up @@ -591,9 +597,9 @@ def create_result_object(self):
existing_payload,
existing_id,
):
data[f"cached_{entity_name}"] = existing_id
data[f"snapshot_{entity_name}_id"] = existing_id
else:
data[f"{entity_name}_snapshot"] = live_payload
data[f"snapshot_{entity_name}"] = live_payload

resp = requests.post(
f"{self.compose_url}/meta-analysis-results",
Expand Down
Loading
Loading