diff --git a/.gitignore b/.gitignore index d83456d..b62199b 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ cov.xml neuropixel_library_generated/* +uv.lock diff --git a/resources/generate_neuropixels_library.py b/resources/generate_neuropixels_library.py index b4055db..5357965 100644 --- a/resources/generate_neuropixels_library.py +++ b/resources/generate_neuropixels_library.py @@ -6,7 +6,7 @@ import numpy as np import matplotlib.pyplot as plt -from probeinterface.neuropixels_tools import _make_npx_probe_from_description, get_probe_metadata_from_probe_features +from probeinterface.neuropixels_tools import build_neuropixels_probe from probeinterface.plotting import plot_probe from probeinterface import write_probeinterface @@ -28,6 +28,8 @@ def generate_all_npx(base_folder=None): probe_part_numbers = probe_features['neuropixels_probes'].keys() + # Note: model_name here is the probe_part_number (specific SKU like "NP1000"), + # NOT the model/platform (like "Neuropixels 1.0", "Neuropixels 2.0", "Ultra", or "NHP") for model_name in probe_part_numbers: print(model_name) @@ -41,18 +43,8 @@ def generate_all_npx(base_folder=None): probe_folder = base_folder / model_name probe_folder.mkdir(exist_ok=True) - pt_metadata, _, _ = get_probe_metadata_from_probe_features(probe_features, model_name) - - num_shank = pt_metadata["num_shanks"] - contact_per_shank = pt_metadata["cols_per_shank"] * pt_metadata["rows_per_shank"] - if num_shank == 1: - elec_ids = np.arange(contact_per_shank) - shank_ids = None - else: - elec_ids = np.concatenate([np.arange(contact_per_shank) for i in range(num_shank)]) - shank_ids = np.concatenate([np.zeros(contact_per_shank) + i for i in range(num_shank)]) - - probe = _make_npx_probe_from_description(pt_metadata, model_name, elec_ids, shank_ids) + # Build the full probe with all possible contacts from the probe part number + probe = build_neuropixels_probe(model_name) # plotting fig, axs = plt.subplots(ncols=2) @@ -73,7 +65,10 @@ def generate_all_npx(base_folder=None): plot_probe(probe, ax=ax) ax.set_title("") - yp = pt_metadata["electrode_pitch_vert_um"] + # Get vertical pitch from contact positions (minimum non-zero y-distance) + positions = probe.contact_positions + y_positions = np.unique(positions[:, 1]) + yp = np.min(np.diff(y_positions)) ax.set_ylim(-yp*8, yp*13) ax.yaxis.set_visible(False) ax.spines["top"].set_visible(False) diff --git a/src/probeinterface/neuropixels_tools.py b/src/probeinterface/neuropixels_tools.py index 23d6531..83c04a9 100644 --- a/src/probeinterface/neuropixels_tools.py +++ b/src/probeinterface/neuropixels_tools.py @@ -24,7 +24,7 @@ def _load_np_probe_features(): - # this avoid loading the json several time + # this avoid loading the json several times global _np_probe_features if _np_probe_features is None: probe_features_filepath = Path(__file__).absolute().parent / Path("resources/neuropixels_probe_features.json") @@ -349,6 +349,142 @@ def _make_npx_probe_from_description(probe_description, model_name, elec_ids, sh return probe +def build_neuropixels_probe(probe_part_number: str) -> Probe: + """ + Build a Neuropixels probe with all possible contacts from the probe part number. + + This function constructs a complete probe geometry based on IMEC manufacturer specifications + sourced from Bill Karsh's ProbeTable repository (https://github.com/billkarsh/ProbeTable). + The specifications include contact positions, electrode dimensions, shank geometry, MUX routing + tables, and ADC configurations. The resulting probe contains ALL electrodes (e.g., 960 for + NP1.0, 1280 for NP2.0), not just the subset that might be recorded in an actual experiment. + + Parameters + ---------- + probe_part_number : str + Probe part number (specific SKU identifier). + Examples: "NP1000", "NP2000", "NP1010", "NP2003", "NP2004" + + Note: This is the specific SKU, not the model/platform name: + - probe_part_number is like "NP1000" (specific SKU) + - NOT like "Neuropixels 1.0" or "Neuropixels 2.0" (platform family) + + In SpikeGLX meta files, this corresponds to the `imDatPrb_pn` field. + Multiple part numbers may belong to the same platform family but have + different configurations or variants. + + Returns + ------- + probe : Probe + The complete Probe object with all contacts and metadata. + """ + # ===== 1. Load configuration ===== + probe_features = _load_np_probe_features() + probe_spec_dict = probe_features["neuropixels_probes"][probe_part_number] + + # ===== 2. Calculate electrode IDs and shank IDs ===== + num_shanks = int(probe_spec_dict["num_shanks"]) + contacts_per_shank = int(probe_spec_dict["cols_per_shank"]) * int(probe_spec_dict["rows_per_shank"]) + + if num_shanks == 1: + elec_ids = np.arange(contacts_per_shank, dtype=int) + shank_ids = None + else: + elec_ids = np.concatenate([np.arange(contacts_per_shank, dtype=int) for _ in range(num_shanks)]) + shank_ids = np.concatenate([np.zeros(contacts_per_shank, dtype=int) + i for i in range(num_shanks)]) + + # ===== 3. Calculate contact positions ===== + cols_per_shank = int(probe_spec_dict["cols_per_shank"]) + y_idx, x_idx = np.divmod(elec_ids, cols_per_shank) + + x_pitch = float(probe_spec_dict["electrode_pitch_horz_um"]) + y_pitch = float(probe_spec_dict["electrode_pitch_vert_um"]) + + even_offset = float(probe_spec_dict["even_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + odd_offset = float(probe_spec_dict["odd_row_horz_offset_left_edge_to_leftmost_electrode_center_um"]) + raw_stagger = even_offset - odd_offset + + stagger = np.mod(y_idx + 1, 2) * raw_stagger + x_pos = (x_idx * x_pitch + stagger).astype("float64") + y_pos = (y_idx * y_pitch).astype("float64") + + # Apply horizontal offset for multi-shank probes + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + x_pos += np.array(shank_ids).astype(int) * shank_pitch + + positions = np.stack((x_pos, y_pos), axis=1) + + # ===== 4. Calculate contact IDs ===== + shank_ids_iter = shank_ids if shank_ids is not None else [None] * len(elec_ids) + contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(shank_ids_iter, elec_ids) + ] + + # ===== 5. Create Probe object and set contacts ===== + probe = Probe(ndim=2, si_units="um", model_name=probe_part_number, manufacturer="imec") + probe.description = probe_spec_dict["description"] + probe.set_contacts( + positions=positions, + shapes="square", + shank_ids=shank_ids, + shape_params={"width": float(probe_spec_dict["electrode_size_horz_direction_um"])}, + ) + probe.set_contact_ids(contact_ids) + + # ===== 6. Build probe contour and shank tips ===== + shank_width = float(probe_spec_dict["shank_width_um"]) + tip_length = float(probe_spec_dict["tip_length_um"]) + polygon = np.array(get_probe_contour_vertices(shank_width, tip_length, get_probe_length(probe_part_number))) + + # Build contour for all shanks + contour = [] + if shank_ids is not None: + shank_pitch = float(probe_spec_dict["shank_pitch_um"]) + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + contour += list(polygon + shank_shift) + + # Apply contour shift to align with contact positions + # This constant (11 μm) represents the vertical distance from the center of the bottommost + # electrode to the top of the shank tip. This is a geometric constant for Neuropixels probes + # that is not currently available in the ProbeTable specifications. + middle_of_bottommost_electrode_to_top_of_shank_tip = 11 + contour_shift = np.array([-odd_offset, -middle_of_bottommost_electrode_to_top_of_shank_tip]) + contour = np.array(contour) + contour_shift + probe.set_planar_contour(contour) + + # Calculate shank tips (polygon[2] is the tip vertex from get_probe_contour_vertices) + tip_vertex = polygon[2] + shank_tips = [] + for shank_id in range(num_shanks): + shank_shift = np.array([shank_pitch * shank_id if shank_ids is not None else 0, 0]) + shank_tip = np.array(tip_vertex) + contour_shift + shank_shift + shank_tips.append(shank_tip.tolist()) + probe.annotate(shank_tips=shank_tips) + + # ===== 7. Add metadata annotations ===== + probe.annotate( + adc_bit_depth=int(probe_spec_dict["adc_bit_depth"]), + num_readout_channels=int(probe_spec_dict["num_readout_channels"]), + ap_sample_frequency_hz=float(probe_spec_dict["ap_sample_frequency_hz"]), + lf_sample_frequency_hz=float(probe_spec_dict["lf_sample_frequency_hz"]), + ) + + # ===== 8. Store ADC sampling table ===== + # The ADC sampling table describes how readout channels map to ADCs, not electrodes. + # Per-contact annotations (adc_group, adc_sample_order) can only be correctly + # assigned when reading a recording with a known channel map (via read_spikeglx), + # because the table indices are readout channel indices, not electrode indices. + # We store the full table string so it's available after slicing. + mux_table_format_type = probe_spec_dict["mux_table_format_type"] + adc_sampling_table = probe_features["z_mux_tables"].get(mux_table_format_type) + if adc_sampling_table is not None: + probe.annotate(adc_sampling_table=adc_sampling_table) + + return probe + + def _read_imro_string(imro_str: str, imDatPrb_pn: Optional[str] = None) -> Probe: """ Parse the IMRO table when presented as a string and create a Probe object. @@ -542,23 +678,121 @@ def write_imro(file: str | Path, probe: Probe): ## -def read_spikeglx(file: str | Path) -> Probe: +def _build_canonical_contact_id(electrode_id: int, shank_id: int | None = None) -> str: """ - Read probe position for the meta file generated by SpikeGLX + Build the canonical contact ID string for a Neuropixels electrode. - See http://billkarsh.github.io/SpikeGLX/#metadata-guides for implementation. - The x_pitch/y_pitch/width are set automatically depending on the NP version. + This establishes the standard naming convention used throughout probeinterface + for Neuropixels contact identification. - The shape is auto generated as a shank. + Parameters + ---------- + electrode_id : int + Physical electrode ID on the probe (e.g., 0-959 for NP1.0) + shank_id : int or None, default: None + Shank ID for multi-shank probes. If None, assumes single-shank probe. + + Returns + ------- + contact_id : str + Canonical contact ID string, either "e{electrode_id}" for single-shank + or "s{shank_id}e{electrode_id}" for multi-shank probes. + + Examples + -------- + >>> _build_canonical_contact_id(123) + 'e123' + >>> _build_canonical_contact_id(123, shank_id=0) + 's0e123' + """ + if shank_id is not None: + return f"s{shank_id}e{electrode_id}" + else: + return f"e{electrode_id}" + + +def _parse_imro_string(imro_table_string: str, probe_part_number: str) -> dict: + """ + Parse IMRO (Imec ReadOut) table string into structured per-channel data. + + IMRO format: "(probe_type,num_chans)(ch0 bank0 ref0 ...)(ch1 bank1 ref1 ...)..." + Example: "(0,384)(0 1 0 500 250 1)(1 0 0 500 250 1)..." + + Note: The IMRO header contains a probe_type field (e.g., "0", "21", "24"), which is + a numeric format version identifier that specifies which IMRO table structure was used. + Different probe generations use different IMRO formats. This is a file format detail, + not a physical probe property. + + Parameters + ---------- + imro_table_string : str + IMRO table string from SpikeGLX metadata file + probe_part_number : str + Probe part number (e.g., "NP1000", "NP2000") + + Returns + ------- + imro_per_channel : dict + Dictionary where each key maps to a list of values (one per channel). + Keys are IMRO fields like "channel", "bank", "electrode", "ap_gain", etc. + The "electrode" key always contains physical electrode IDs (0-959 for NP1.0, etc.). + For NP2.0+: electrode IDs come directly from IMRO data. + For NP1.0: electrode IDs are computed as bank * 384 + channel. + Example: {"channel": [0,1,2,...], "bank": [1,0,0,...], "electrode": [384,1,2,...], "ap_gain": [500,500,...]} + """ + # Get IMRO field format from catalogue + probe_features = _load_np_probe_features() + probe_spec = probe_features["neuropixels_probes"][probe_part_number] + imro_format = probe_spec["imro_table_format_type"] + imro_fields_string = probe_features["z_imro_formats"][imro_format + "_elm_flds"] + imro_fields = tuple(imro_fields_string.replace("(", "").replace(")", "").split(" ")) + + # Parse IMRO table values into per-channel data + # Skip the header "(probe_type,num_chans)" and trailing empty string + _, *imro_table_values_list, _ = imro_table_string.strip().split(")") + imro_per_channel = {k: [] for k in imro_fields} + for field_values_str in imro_table_values_list: + values = tuple(map(int, field_values_str[1:].split(" "))) + for field, field_value in zip(imro_fields, values): + imro_per_channel[field].append(field_value) + + # Ensure "electrode" key always exists with physical electrode IDs + # Different probe types encode electrode selection differently + if "electrode" not in imro_per_channel: + # NP1.0: Bank-based addressing (physical_electrode_id = bank * 384 + channel) + readout_channel_ids = np.array(imro_per_channel["channel"]) + bank_key = "bank" if "bank" in imro_per_channel else "bank_mask" + bank_indices = np.array(imro_per_channel[bank_key]) + imro_per_channel["electrode"] = (bank_indices * 384 + readout_channel_ids).tolist() + + return imro_per_channel + + +def read_spikeglx(file: str | Path) -> Probe: + """ + Read probe geometry and configuration from a SpikeGLX metadata file. + + This function reconstructs the probe used in a recording by: + 1. Reading the probe part number from metadata + 2. Building the full probe geometry from manufacturer specifications + 3. Slicing to the electrodes selected in the IMRO table + 4. Further slicing to channels actually saved to disk (if subset was saved) + 5. Adding recording-specific annotations + 6. Add wiring (device channel indices) Parameters ---------- file : Path or str - The .meta file path + Path to the SpikeGLX .meta file Returns ------- - probe : Probe object + probe : Probe + Probe object with geometry, contact annotations, and device channel mapping + + See Also + -------- + http://billkarsh.github.io/SpikeGLX/#metadata-guides """ @@ -566,42 +800,104 @@ def read_spikeglx(file: str | Path) -> Probe: assert meta_file.suffix == ".meta", "'meta_file' should point to the .meta SpikeGLX file" meta = parse_spikeglx_meta(meta_file) - assert "imroTbl" in meta, "Could not find imroTbl field in meta file!" - imro_table = meta["imroTbl"] - # read serial number - imDatPrb_serial_number = meta.get("imDatPrb_sn", None) - if imDatPrb_serial_number is None: # this is for Phase3A - imDatPrb_serial_number = meta.get("imProbeSN", None) - - # read other metadata + # ===== 1. Extract probe part number from metadata ===== imDatPrb_pn = meta.get("imDatPrb_pn", None) - imDatPrb_port = meta.get("imDatPrb_port", None) - imDatPrb_slot = meta.get("imDatPrb_slot", None) - imDatPrb_part_number = meta.get("imDatPrb_pn", None) - - # Only Phase3a probe has "imProbeOpt". Map this to NP10101. + # Only Phase3a probe has "imProbeOpt". Map this to NP1010. if meta.get("imProbeOpt") is not None: imDatPrb_pn = "NP1010" - probe = _read_imro_string(imro_str=imro_table, imDatPrb_pn=imDatPrb_pn) + # ===== 2. Build full probe with all possible contacts ===== + # This creates the complete probe geometry (e.g., 960 contacts for NP1.0) + # based on manufacturer specifications + full_probe = build_neuropixels_probe(probe_part_number=imDatPrb_pn) + + # ===== 3. Parse IMRO table to extract recorded electrodes and acquisition settings ===== + # IMRO = Imec ReadOut (the configuration table format from IMEC manufacturer) + # Specifies which electrodes were selected for recording (e.g., 384 of 960) plus their + # acquisition settings (gains, references, filters). See: https://billkarsh.github.io/SpikeGLX/help/imroTables/ + imro_table_string = meta["imroTbl"] + imro_per_channel = _parse_imro_string(imro_table_string, imDatPrb_pn) + + # ===== 4. Build contact IDs for active electrodes ===== + # Convert physical electrode IDs to probeinterface canonical contact ID strings + imro_electrode = imro_per_channel["electrode"] + imro_shank = imro_per_channel.get("shank", [None] * len(imro_electrode)) + active_contact_ids = [ + _build_canonical_contact_id(elec_id, shank_id) for shank_id, elec_id in zip(imro_shank, imro_electrode) + ] - # add serial number and other annotations - probe.annotate(serial_number=imDatPrb_serial_number) - probe.annotate(part_number=imDatPrb_part_number) - probe.annotate(port=imDatPrb_port) - probe.annotate(slot=imDatPrb_slot) - probe.annotate(serial_number=imDatPrb_serial_number) + # ===== 5. Slice full probe to active electrodes ===== + # Find indices of active contacts in the full probe, preserving IMRO order + contact_id_to_index = {contact_id: idx for idx, contact_id in enumerate(full_probe.contact_ids)} + selected_contact_indices = np.array([contact_id_to_index[contact_id] for contact_id in active_contact_ids]) + + probe = full_probe.get_slice(selected_contact_indices) + + # ===== 6. Store IMRO properties (acquisition settings) as annotations ===== + # Filter IMRO data to only the properties we want to add as annotations + imro_properties_to_add = ("channel", "bank", "bank_mask", "ref_id", "ap_gain", "lf_gain", "ap_hipas_flt") + imro_filtered = {k: v for k, v in imro_per_channel.items() if k in imro_properties_to_add and len(v) > 0} + # Map IMRO field names to probeinterface field names and add as contact annotations + annotations = {} + for imro_field, values in imro_filtered.items(): + pi_field = imro_field_to_pi_field.get(imro_field) + annotations[pi_field] = values + probe.annotate_contacts(**annotations) + + # ===== 6b. Add ADC sampling annotations ===== + # The ADC sampling table describes which ADC samples each readout channel and in what order. + # At this point, contacts are ordered by readout channel (0-383), so we can directly + # apply the mapping. This must be done here (not in build_neuropixels_probe) + # because the table indices are readout channel indices, not electrode indices. + adc_sampling_table = probe.annotations.get("adc_sampling_table") + if adc_sampling_table is not None: + # Parse table string: (num_adcs,num_channels_per_adc)(ch ch ...)(ch ch ...)... + adc_info = adc_sampling_table.split(")(")[0] + split_mux = adc_sampling_table.split(")(")[1:] + num_adcs, num_channels_per_adc = map(int, adc_info[1:].split(",")) + adc_groups_list = [ + np.array(each_mux.replace("(", "").replace(")", "").split(" ")).astype("int") for each_mux in split_mux + ] + mux_table = np.transpose(np.array(adc_groups_list)) + + # Map readout channels to ADC groups and sample order + num_readout_channels = probe.get_contact_count() + adc_groups = np.zeros(num_readout_channels, dtype="int64") + adc_sample_order = np.zeros(num_readout_channels, dtype="int64") + for adc_index, channels_per_adc in enumerate(mux_table): + # Filter out placeholder values (e.g., 128 in mux_np1200 for unused slots) + valid_channels = channels_per_adc[channels_per_adc < num_readout_channels] + adc_groups[valid_channels] = adc_index + adc_sample_order[valid_channels] = np.arange(len(valid_channels)) - # sometimes we need to slice the probe when not all channels are saved + probe.annotate(num_adcs=num_adcs) + probe.annotate(num_channels_per_adc=num_channels_per_adc) + probe.annotate_contacts(adc_group=adc_groups) + probe.annotate_contacts(adc_sample_order=adc_sample_order) + + # ===== 7. Slice to saved channels (if subset was saved) ===== + # This is DIFFERENT from IMRO selection: IMRO selects which electrodes to acquire, + # but SpikeGLX can optionally save only a subset of acquired channels to reduce file size. + # For example: IMRO selects 384 electrodes, but only 300 are saved to disk. saved_chans = get_saved_channel_indices_from_spikeglx_meta(meta_file) - # remove the SYS chans - saved_chans = saved_chans[saved_chans < probe.get_contact_count()] + saved_chans = saved_chans[saved_chans < probe.get_contact_count()] # Remove SYS channels if saved_chans.size != probe.get_contact_count(): - # slice if needed probe = probe.get_slice(saved_chans) - # wire it + + # ===== 6. Add recording-specific annotations ===== + # These annotations identify the physical probe instance and recording setup + imDatPrb_serial_number = meta.get("imDatPrb_sn") or meta.get("imProbeSN") # Phase3A uses imProbeSN + imDatPrb_port = meta.get("imDatPrb_port", None) + imDatPrb_slot = meta.get("imDatPrb_slot", None) + probe.annotate(serial_number=imDatPrb_serial_number) + probe.annotate(part_number=imDatPrb_pn) + probe.annotate(port=imDatPrb_port) + probe.annotate(slot=imDatPrb_slot) + + # ===== 7. Set device channel indices (wiring) ===== + # I am unsure why are we are doing this. If someone knows please document it here. probe.set_device_channel_indices(np.arange(probe.get_contact_count())) return probe diff --git a/tests/test_io/test_spikeglx.py b/tests/test_io/test_spikeglx.py index ab51329..6350a50 100644 --- a/tests/test_io/test_spikeglx.py +++ b/tests/test_io/test_spikeglx.py @@ -320,6 +320,57 @@ def test_CatGT_NP1(): assert "1.0" in probe.description +def test_mux_annotations_match_readout_channels(): + """ + Test that MUX annotations (adc_group, adc_sample_order) correctly map + readout channels to their ADC assignments. + + The MUX table defines which ADC samples each readout channel and in what order. + For NP1.0 (mux_np1000), the pattern is: + - Readout channel 0 -> ADC 0, sample order 0 + - Readout channel 1 -> ADC 1, sample order 0 + - Readout channel 2 -> ADC 0, sample order 1 + - Readout channel 3 -> ADC 1, sample order 1 + - etc. + + The contacts in the probe returned by read_spikeglx are ordered by readout channel, + so adc_group[i] should be the ADC for readout channel i. + + This test uses allan-longcol which has alternating banks (0, 1, 0, 1, ...), + meaning electrode indices differ from readout channel indices. This exposes + bugs where MUX table indices (readout channels) are incorrectly treated as + electrode indices. + """ + # allan-longcol uses alternating banks: channel 0->electrode 0, channel 1->electrode 385, etc. + probe = read_spikeglx(data_path / "allan-longcol_g0_t0.imec0.ap.meta") + + assert probe.get_contact_count() == 384 + + # Verify the bank pattern - this file alternates between bank 0 and bank 1 + banks = probe.contact_annotations.get("banks") + assert list(banks[:10]) == [0, 1, 0, 1, 0, 1, 0, 1, 0, 1], "Expected alternating banks" + + adc_groups = probe.contact_annotations.get("adc_group") + adc_sample_order = probe.contact_annotations.get("adc_sample_order") + + assert adc_groups is not None, "adc_group annotation should be present" + assert adc_sample_order is not None, "adc_sample_order annotation should be present" + assert len(adc_groups) == 384 + assert len(adc_sample_order) == 384 + + # According to mux_np1000 (32 ADCs, 12 channels per ADC): + # Readout channel 0 -> ADC 0, readout channel 1 -> ADC 1, etc. + # The MUX assignment follows readout channel order, NOT electrode order. + # Even though electrode indices are [0, 385, 2, 387, ...], the ADC groups + # should still be [0, 1, 0, 1, ...] because that's the readout channel pattern. + expected_adc_for_first_10 = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] + assert list(adc_groups[:10]) == expected_adc_for_first_10, ( + f"ADC groups for first 10 channels should be {expected_adc_for_first_10}, " + f"got {list(adc_groups[:10])}. " + f"MUX annotations must follow readout channel order, not electrode order." + ) + + def test_snsGeomMap(): # check when snsGeomMap is present if contact positions are the same from imroTbl