Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/copy_probe_features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:
curl -o src/probeinterface/resources/neuropixels_probe_features.json \
https://raw.githubusercontent.com/billkarsh/ProbeTable/refs/heads/main/Tables/probe_features.json

- name: Derive IMRO type mappings from catalogue
run: python resources/postprocess_neuropixels_probe_features.py

- name: Commit changes if any
id: commit
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ uv.lock
# libraries
**/neuropixels_library_generated
**/cambridgeneurotech_library
.codex
131 changes: 131 additions & 0 deletions resources/postprocess_neuropixels_probe_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Post-process neuropixels_probe_features.json after syncing from ProbeTable.

Derives two mappings from the catalogue and writes them back into the JSON:

- z_imro_format_type_to_imro_format: IMRO type code -> IMRO format name
(e.g. "0" -> "imro_np1000", "1110" -> "imro_np1110")

- z_imro_format_type_to_part_number: IMRO type code -> canonical probe part number
(e.g. "0" -> "NP1000", "1110" -> "NP1110")

This script is called by the GitHub Action workflow that syncs probe_features.json
from billkarsh/ProbeTable, and can also be run standalone.
"""

import json
import re
from pathlib import Path

PROBE_FEATURES_PATH = (
Path(__file__).absolute().parent
/ "../src/probeinterface/resources/neuropixels_probe_features.json"
)


def _parse_type_values_from_val_def(val_def: str) -> list[str]:
"""Extract IMRO type code(s) from a val_def string.

Two patterns in ProbeTable:
type:{0,1020,1030,...} -> set of values
type:1110 -> single value
"""
match = re.match(r"type:\{([^}]+)\}", val_def)
if match:
return [v.strip() for v in match.group(1).split(",")]

match = re.match(r"type:(\d+)", val_def)
if match:
return [match.group(1)]

raise ValueError(f"Cannot parse type from val_def: {val_def!r}")


def build_derived_mappings(probe_features: dict) -> tuple[dict, dict]:
"""Build type-to-format and type-to-part-number mappings from the catalogue."""

imro_formats = probe_features["z_imro_formats"]
probes = probe_features["neuropixels_probes"]

# 1. Build type -> format mapping from val_def entries
type_to_format = {}
for key, val_def in imro_formats.items():
if not key.endswith("_val_def"):
continue
# e.g. "imro_np1000_val_def" -> "imro_np1000"
format_name = key.removesuffix("_val_def")
for type_code in _parse_type_values_from_val_def(val_def):
if type_code in type_to_format:
raise ValueError(
f"IMRO type {type_code!r} maps to both "
f"{type_to_format[type_code]!r} and {format_name!r}"
)
type_to_format[type_code] = format_name

# 2. Build type -> canonical part number mapping
# For each type, find probes that use the matching format, then pick
# the first NP-prefixed part number alphabetically.
#
# We also need to verify the candidate actually belongs to this type,
# not just the same format. For example, NP1021 uses imro_np1000 format
# but its IMRO type is not "0". We filter by checking the format's
# val_def includes the type code we're resolving.

# Invert: format -> set of type codes it covers
format_to_types = {}
for type_code, format_name in type_to_format.items():
format_to_types.setdefault(format_name, set()).add(type_code)

type_to_part_number = {}
for type_code, format_name in sorted(type_to_format.items()):
candidates = [
pn
for pn, spec in probes.items()
if spec.get("imro_table_format_type") == format_name
]

# Prefer a probe whose part number contains the type code (e.g. NP1020 for type "1020").
# This matters because many probes share the same IMRO format but have different
# physical geometries (e.g. NP1000 has 960 contacts, NP1020 has 2496).
exact_matches = sorted(
pn for pn in candidates if pn.startswith("NP") and type_code in pn
)
if exact_matches:
type_to_part_number[type_code] = exact_matches[0]
continue

# Fall back to first NP-prefixed name alphabetically
np_candidates = sorted(pn for pn in candidates if pn.startswith("NP"))
other_candidates = sorted(pn for pn in candidates if not pn.startswith("NP"))
ordered = np_candidates + other_candidates

if ordered:
type_to_part_number[type_code] = ordered[0]

return type_to_format, type_to_part_number


def postprocess(filepath: Path = PROBE_FEATURES_PATH) -> None:
filepath = filepath.resolve()
with open(filepath) as f:
probe_features = json.load(f)

type_to_format, type_to_part_number = build_derived_mappings(probe_features)

probe_features["z_imro_format_type_to_imro_format"] = dict(sorted(type_to_format.items(), key=lambda kv: int(kv[0])))
probe_features["z_imro_format_type_to_part_number"] = dict(sorted(type_to_part_number.items(), key=lambda kv: int(kv[0])))

with open(filepath, "w") as f:
json.dump(probe_features, f, indent=4)
f.write("\n")

print(f"Wrote derived mappings to {filepath}")
print(f" z_imro_format_type_to_imro_format: {len(type_to_format)} entries")
print(f" z_imro_format_type_to_part_number: {len(type_to_part_number)} entries")
for type_code in sorted(type_to_format, key=int):
pn = type_to_part_number.get(type_code, "???")
print(f" type {type_code:>5s} -> format={type_to_format[type_code]}, part_number={pn}")


if __name__ == "__main__":
postprocess()
155 changes: 70 additions & 85 deletions src/probeinterface/neuropixels_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,6 @@
# Utils zone #
###############

# Map imDatPrb_pn (probe number) to imDatPrb_type (probe type) when the latter is missing
# ONLY needed for `read_imro` function
probe_part_number_to_probe_type = {
# for old version without a probe number we assume NP1.0
None: "0",
# NP1.0
"PRB_1_4_0480_1": "0",
"PRB_1_4_0480_1_C": "0", # This is the metal cap version
"PRB_1_2_0480_2": "0",
"NP1010": "0",
# NHP probes lin
"NP1015": "1015",
"NP1016": "1015",
"NP1017": "1015",
# NHP probes stag med
"NP1020": "1020",
"NP1021": "1021",
"NP1022": "1022",
# NHP probes stag long
"NP1030": "1030",
"NP1031": "1031",
"NP1032": "1032",
# NP2.0
"NP2000": "21",
"NP2010": "24",
"NP2013": "2013",
"NP2014": "2014",
"NP2003": "2003",
"NP2004": "2004",
"PRB2_1_2_0640_0": "21",
"PRB2_4_2_0640_0": "24",
# NXT
"NP2020": "2020",
# Ultra
"NP1100": "1100", # Ultra probe - 1 bank
"NP1110": "1110", # Ultra probe - 16 banks no handle because
"NP1121": "1121", # Ultra probe - beta configuration
# Opto
"NP1300": "1300", # Opto probe
}

# Map from imro format to ProbeInterface naming conventions
imro_field_to_pi_field = {
"ap_gain": "ap_gains",
Expand Down Expand Up @@ -439,24 +398,20 @@ def _annotate_probe_with_adc_sampling_info(probe: Probe, adc_sampling_table: str
#########################


def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
def _parse_imro_string(imro_table_string: str) -> dict:
"""
Parse IMRO (Imec ReadOut) table string into structured per-channel data.

IMRO format: "(probe_type,num_chans)(ch0 bank0 ref0 ...)(ch1 bank1 ref1 ...)..."
Example: "(0,384)(0 1 0 500 250 1)(1 0 0 500 250 1)..."

Note: The IMRO header contains a probe_type field (e.g., "0", "21", "24"), which is
a numeric format version identifier that specifies which IMRO table structure was used.
Different probe generations use different IMRO formats. This is a file format detail,
not a physical probe property.
The IMRO type is extracted from the header and used to look up the field schema
from the catalogue (z_imro_format_type_to_imro_format). No probe part number is needed.

Parameters
----------
imro_table_string : str
IMRO table string from SpikeGLX metadata file
probe_part_number : str
Probe part number (e.g., "NP1000", "NP2000")

Returns
-------
Expand All @@ -473,22 +428,51 @@ def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict:
Example for NP1110: {"header": {"type": 1110, "col_mode": 2, "ref_id": 0, ...},
"group": [0,1,...], "bankA": [0,0,...], "bankB": [0,0,...]} # 24 entries, not 384
"""
# Get IMRO field format from catalogue
# Parse IMRO header and per-entry values. Header values stay as strings; only the
# numeric trailing fields are cast to int below. The first field may be a numeric
# IMRO type code (old SpikeGLX format) or an alphanumeric probe part number such as
# "NP2020" (new format, SpikeGLX 20260115 onward; see issue #432).
header_str, *imro_table_values_list, _ = imro_table_string.strip().split(")")
header_parts = header_str[1:].split(",")
first_value = header_parts[0]
header_values = (first_value,) + tuple(map(int, header_parts[1:]))

# Resolve the IMRO format schema. Three header layouts to handle:
# 1. Phase3A: 3-field header, no part number anywhere; treat as type code "0".
# 2. New format: first field is a probe part number (e.g. "NP2020").
# 3. Old format: first field is a numeric IMRO type code (e.g. "21").
probe_features = _load_np_probe_features()
probe_spec = probe_features["neuropixels_probes"][probe_part_number]
imro_format = probe_spec["imro_table_format_type"]
probes = probe_features["neuropixels_probes"]
type_to_format = probe_features["z_imro_format_type_to_imro_format"]

if len(header_values) == 3:
imro_format_type = "0"
imro_format = type_to_format[imro_format_type]
elif first_value in probes:
imro_format = probes[first_value]["imro_table_format_type"]
imro_format_type = None # not used for new-format files
elif first_value in type_to_format:
imro_format_type = first_value
imro_format = type_to_format[imro_format_type]
else:
valid_types = sorted(type_to_format, key=int)
raise ValueError(
f"Unknown IMRO header first field {first_value!r}. "
f"Expected a probe part number from the catalogue or one of: {valid_types}"
)

imro_fields_string = probe_features["z_imro_formats"][imro_format + "_elm_flds"]
imro_fields = tuple(imro_fields_string.replace("(", "").replace(")", "").split(" "))

# Parse IMRO header and per-entry values
header_str, *imro_table_values_list, _ = imro_table_string.strip().split(")")

# Parse header fields using the catalogue schema
imro_header_fields_string = probe_features["z_imro_formats"][imro_format + "_hdr_flds"]
imro_header_fields = tuple(imro_header_fields_string.replace("(", "").replace(")", "").split(","))
header_values = tuple(map(int, header_str[1:].split(",")))
# Initialize with parsed header and empty lists for per-entry fields (filled below)
# Initialize with parsed header and empty lists for per-entry fields (filled below).
# For Phase3A (3-field header), zip silently drops the extra value, which is correct.
imro_per_channel = {"header": dict(zip(imro_header_fields, header_values))}
# Normalize Phase3A header type to 0 so downstream code reads it consistently
if len(header_values) == 3:
imro_per_channel["header"]["type"] = 0
for field in imro_fields:
imro_per_channel[field] = []
for field_values_str in imro_table_values_list:
Expand All @@ -511,7 +495,12 @@ def write_imro(file: str | Path, probe: Probe) -> None:
probe : Probe object

"""
probe_type = probe.annotations["probe_type"]
model_name = probe.model_name
probe_features = _load_np_probe_features()
part_number_to_format_type = {v: k for k, v in probe_features["z_imro_format_type_to_part_number"].items()}
probe_type = part_number_to_format_type.get(model_name)
if probe_type is None:
raise ValueError(f"Cannot resolve IMRO format type from model_name={model_name!r}")
data = probe.to_dataframe(complete=True).sort_values("device_channel_indices")
annotations = probe.contact_annotations
ret = [f"({probe_type},{len(data)})"]
Expand Down Expand Up @@ -716,34 +705,33 @@ def read_imro(file_path: str | Path) -> Probe:
https://billkarsh.github.io/SpikeGLX/help/imroTables/

"""
# ===== 1. Read file and determine probe part number from IMRO header =====
# ===== 1. Read file =====
meta_file = Path(file_path)
assert meta_file.suffix == ".imro", "'file' should point to the .imro file"
with meta_file.open(mode="r") as f:
imro_str = str(f.read())

imro_table_header_str, *imro_table_values_list, _ = imro_str.strip().split(")")
imro_table_header = tuple(map(int, imro_table_header_str[1:].split(",")))
# ===== 2. Parse IMRO table (type is extracted from the header automatically) =====
imro_per_channel = _parse_imro_string(imro_str)

if len(imro_table_header) == 3:
# In older versions of neuropixel arrays (phase 3A), imro tables were structured differently.
# We use probe_type "0", which maps to probe_part_number NP1010 as a proxy for Phase3a.
imDatPrb_type = "0"
elif len(imro_table_header) == 2:
imDatPrb_type, _ = imro_table_header
# ===== 3. Resolve probe part number and build full probe =====
# The header's "type" field carries either the alphanumeric probe part number
# (new SpikeGLX format, 20260115+) or a numeric IMRO type code (old format).
header_type = str(imro_per_channel["header"]["type"])
probe_features = _load_np_probe_features()
probes = probe_features["neuropixels_probes"]
type_to_pn = probe_features["z_imro_format_type_to_part_number"]
if header_type in probes:
probe_part_number = header_type
elif header_type in type_to_pn:
probe_part_number = type_to_pn[header_type]
else:
raise ValueError(f"read_imro error, the header has a strange length: {imro_table_header}")
imDatPrb_type = str(imDatPrb_type)

for probe_part_number, probe_type in probe_part_number_to_probe_type.items():
if imDatPrb_type == probe_type:
imDatPrb_pn = probe_part_number

# ===== 2. Interpret IMRO table =====
imro_per_channel = _parse_imro_string(imro_str, imDatPrb_pn)

# ===== 3. Build full probe with all possible contacts =====
full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn)
valid_types = sorted(type_to_pn, key=int)
raise ValueError(
f"Unknown IMRO header first field {header_type!r}. "
f"Expected a probe part number from the catalogue or one of: {valid_types}"
)
full_probe = build_neuropixels_probe(probe_part_number=probe_part_number)

# ===== 4. Slice full probe to active electrodes =====
active_contact_ids = _get_imro_active_contact_ids(imro_per_channel)
Expand All @@ -755,10 +743,6 @@ def read_imro(file_path: str | Path) -> Probe:
adc_sampling_table = probe.annotations.get("adc_sampling_table")
_annotate_probe_with_adc_sampling_info(probe, adc_sampling_table)

# Scalar annotations
probe_type = imro_str.strip().split(")")[0].split(",")[0][1:]
probe.annotate(probe_type=probe_type)

# Vector annotations from IMRO fields
vector_properties = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt")
vector_properties_available = {}
Expand Down Expand Up @@ -820,7 +804,7 @@ def read_spikeglx(file: str | Path) -> Probe:
# Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their
# acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/
imro_table_string = meta["imroTbl"]
imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn)
imro_per_channel = _parse_imro_string(imro_table_string)

# ===== 4. Slice full probe to active electrodes =====
active_contact_ids = _get_imro_active_contact_ids(imro_per_channel)
Expand Down Expand Up @@ -907,8 +891,9 @@ def parse_spikeglx_snsGeomMap(meta: dict) -> tuple[int, float, float, np.ndarray

geom_list = meta["snsGeomMap"].split(sep=")")

# first entry is for instance (NP1000,1,0,70)
probe_type, num_shank, shank_pitch, shank_width = geom_list[0][1:].split(",")
# first entry is for instance (NP1000,1,0,70); the leading field can be a numeric
# type code or an alphanumeric part number depending on SpikeGLX version, and is unused
_, num_shank, shank_pitch, shank_width = geom_list[0][1:].split(",")
num_shank, shank_pitch, shank_width = int(num_shank), float(shank_pitch), float(shank_width)

geom_list = geom_list[1:-1]
Expand Down
Loading
Loading