"""Single-channel segmentation workflow built on top of CellColoc core tools.
This module provides a parallel high-level API for workflows that only need to
segment and count objects in one image channel. It reuses the same OMIO
loading, ROI handling, pre- and postfiltering, z-cropping, z projection,
Cellpose or threshold segmentation, and cached Cellpose refinement backends as
the multi-channel colocalization pipeline.
author: Fabrizio Musacchio
date: June 2026
"""
# %% IMPORTS
from __future__ import annotations
from pathlib import Path
from typing import Sequence
import numpy as np
import omio as om
import pandas as pd
import tifffile
from skimage.measure import regionprops, regionprops_table
from .analysis import (
_apply_analysis_z_bounds,
_compute_mask_occupancy_metrics,
_normalize_z_crop_bounds,
_project_zyx_volume,
_resolve_analysis_z_bounds,
_resolve_analysis_z_projection_method,
)
from .config import (
CellposeModelConfig,
RuntimeConfig,
SingleChannelAnalysisConfig,
SingleChannelConfig,
SingleChannelDisplayNames,
)
from .filtering import apply_postfilters, apply_prefilter
from .io import _extract_zyx_channel, _resolve_voxel_scale_zyx, load_roi_labels, save_roi_labels
from .roi import get_bbox_2d, get_roi_label_points
from .schemas import (
CellposeChannelRefinementContext,
CellposeRefinementRoiCache,
LoadedSingleChannelImage,
SingleChannelResultsPaths,
SingleChannelRunResult,
SingleChannelTables,
)
from .segmentation import (
create_cellpose_model,
evaluate_segmentation_method,
filter_labels_by_size,
normalize_segmentation_method,
relabel_with_offset,
)
from .visualization import (
_build_roi_labels_3d,
_get_or_create_viewer,
_hide_layer_if_present,
_normalize_layer_selection,
_replace_or_add_image,
_replace_or_add_labels,
_replace_or_add_points,
_should_render_layer,
)
# %% IO HELPERS
[docs]
def build_single_channel_results_paths(source_path: Path) -> SingleChannelResultsPaths:
"""Create the standard results paths for one single-channel dataset run."""
source_path = Path(source_path).expanduser().resolve()
results_dir = source_path.parent / "results"
results_dir.mkdir(parents=True, exist_ok=True)
stem = source_path.stem
return SingleChannelResultsPaths(
source_path=source_path,
results_dir=results_dir,
roi_mask_path=results_dir / f"{stem}_roi_labelmask.tif",
object_csv_path=results_dir / f"{stem}_single_channel_objects.csv",
excel_path=results_dir / f"{stem}_single_channel_segmentation.xlsx",
mask_path=results_dir / f"{stem}_single_channel_masks.tif",
)
[docs]
def load_single_channel_image(
source_path: Path,
channel_config: SingleChannelConfig,
voxel_scale_zyx: tuple[float, float] | tuple[float, float, float] | None,
crop_for_testing: tuple[slice, slice, slice] | None = None,
image_loading_mode: str = "memory",
) -> LoadedSingleChannelImage:
"""Load one configured analysis channel from a microscopy dataset.
Parameters
----------
source_path:
Input microscopy dataset that OMIO can open.
channel_config:
One-channel mapping defining which raw channel should be analyzed.
voxel_scale_zyx:
Optional explicit voxel size in micrometers, either as ``(Z, Y, X)``
or, for 2D convenience, as ``(Y, X)``.
crop_for_testing:
Optional test crop applied after channel extraction in ``(Z, Y, X)``
order.
image_loading_mode:
Raw-image loading strategy. ``"memory"`` materializes the full image
eagerly, whereas ``"memap"`` keeps OMIO's disk-backed Zarr cache.
"""
paths = build_single_channel_results_paths(source_path)
normalized_loading_mode = image_loading_mode.strip().lower()
if normalized_loading_mode == "memory":
image_tzcyx, metadata = om.imread(paths.source_path, zarr_store=None)
image_tzcyx = np.asarray(image_tzcyx)
elif normalized_loading_mode == "memap":
image_tzcyx, metadata = om.imread(
paths.source_path,
zarr_store="disk",
reuse_disk_cache=True,
)
else:
raise ValueError(
"`image_loading_mode` must be 'memory' or 'memap', got "
f"{image_loading_mode!r}."
)
print(f"Image loading mode: {normalized_loading_mode}")
raw_z_size = int(image_tzcyx.shape[1])
is_3d = raw_z_size > 1
print(f"Loaded image: {paths.source_path}")
print(f"Raw image shape (expected TZCYX): {image_tzcyx.shape}")
print(f"Detected dimensionality from Z axis: {'3D' if is_3d else '2D'} (Z={raw_z_size})")
resolved_voxel_scale_zyx = _resolve_voxel_scale_zyx(voxel_scale_zyx, metadata)
image = _extract_zyx_channel(image_tzcyx, channel_config.channel_index)
if crop_for_testing is not None:
image = image[crop_for_testing]
print(f"Analysis volume shape (ZYX): {image.shape}")
return LoadedSingleChannelImage(
source_path=paths.source_path,
paths=paths,
voxel_scale_zyx=resolved_voxel_scale_zyx,
image=image,
raw_shape_tzcyx=tuple(image_tzcyx.shape),
raw_z_size=raw_z_size,
is_3d=is_3d,
metadata=metadata,
analysis_z_bounds=None,
z_projection_method=None,
)
[docs]
def try_load_single_channel_roi_labels(path: Path) -> np.ndarray | None:
"""Load a saved ROI label mask for a single-channel workflow when present."""
path = Path(path)
if not path.exists():
print(f"No existing ROI label mask found at:\n{path}")
return None
return load_roi_labels(path)
[docs]
def export_single_channel_outputs(
run_result: SingleChannelRunResult,
paths: SingleChannelResultsPaths,
) -> None:
"""Write standard tables and masks for one completed single-channel run.
The Excel workbook contains three sheets:
- ``object_summary`` for biologically relevant size and shape metrics
- ``voxel_plausibility_check`` for technical voxel-count cross-checks
- ``roi_overview`` for ROI-level counts, occupancies, and mean metrics
"""
run_result.tables.objects.to_csv(paths.object_csv_path, index=False)
tifffile.imwrite(paths.mask_path, run_result.masks.astype(np.uint32))
with pd.ExcelWriter(paths.excel_path) as writer:
run_result.tables.objects.to_excel(writer, sheet_name="object_summary", index=False)
run_result.tables.voxel_plausibility.to_excel(
writer,
sheet_name="voxel_plausibility_check",
index=False,
)
run_result.tables.overview.to_excel(writer, sheet_name="roi_overview", index=False)
print(f"Saved object CSV analysis to:\n{paths.object_csv_path}")
print(f"Saved Excel analysis to:\n{paths.excel_path}")
print(f"Saved segmentation masks to:\n{paths.mask_path}")
# %% ANALYSIS PREPARATION
[docs]
def prepare_loaded_single_channel_image_for_analysis(
loaded_image: LoadedSingleChannelImage,
model_config: CellposeModelConfig | None,
) -> LoadedSingleChannelImage:
"""Prepare one loaded channel for downstream analysis according to config.
When ``model_config.z_projection`` is set, the helper optionally applies
the corresponding global ``z_crop`` first, then projects the image along
z, and returns a singleton-z 2D analysis view.
"""
projection_method = _resolve_analysis_z_projection_method(model_config)
if projection_method is None:
return loaded_image
analysis_z_bounds = _resolve_analysis_z_bounds(
loaded_image.image.shape[0],
model_config,
)
z_slice = slice(*analysis_z_bounds) if analysis_z_bounds is not None else slice(None)
projected_image = _project_zyx_volume(loaded_image.image[z_slice], projection_method)
return LoadedSingleChannelImage(
source_path=loaded_image.source_path,
paths=loaded_image.paths,
voxel_scale_zyx=(1.0, loaded_image.voxel_scale_zyx[1], loaded_image.voxel_scale_zyx[2]),
image=projected_image,
raw_shape_tzcyx=loaded_image.raw_shape_tzcyx,
raw_z_size=loaded_image.raw_z_size,
is_3d=False,
metadata=loaded_image.metadata,
analysis_z_bounds=analysis_z_bounds,
z_projection_method=projection_method,
)
# %% TABLE BUILDERS
def _compute_3d_surface_area_um2(
object_mask_zyx: np.ndarray,
voxel_scale_zyx: tuple[float, float, float],
) -> float:
"""Compute voxel-surface area of one 3D object in squared micrometers."""
z_size_um, y_size_um, x_size_um = voxel_scale_zyx
padded = np.pad(np.asarray(object_mask_zyx, dtype=bool), 1, mode="constant", constant_values=False)
transitions_z = np.count_nonzero(padded[1:, :, :] != padded[:-1, :, :])
transitions_y = np.count_nonzero(padded[:, 1:, :] != padded[:, :-1, :])
transitions_x = np.count_nonzero(padded[:, :, 1:] != padded[:, :, :-1])
face_area_z = y_size_um * x_size_um
face_area_y = z_size_um * x_size_um
face_area_x = z_size_um * y_size_um
return float(
transitions_z * face_area_z
+ transitions_y * face_area_y
+ transitions_x * face_area_x
)
def _compute_3d_ellipticity(
object_mask_zyx: np.ndarray,
voxel_scale_zyx: tuple[float, float, float],
) -> float:
"""Estimate a 3D ellipticity-like elongation score from voxel coordinates."""
coordinates = np.column_stack(np.where(object_mask_zyx))
if coordinates.shape[0] < 3:
return np.nan
spacing = np.asarray(voxel_scale_zyx, dtype=float)
coordinates_um = coordinates * spacing
centered = coordinates_um - coordinates_um.mean(axis=0, keepdims=True)
covariance = np.cov(centered, rowvar=False)
eigenvalues = np.sort(np.linalg.eigvalsh(covariance))[::-1]
eigenvalues = np.clip(eigenvalues, 0.0, None)
if eigenvalues[0] <= 0:
return np.nan
major_axis = np.sqrt(eigenvalues[0])
minor_axis = np.sqrt(eigenvalues[-1])
if major_axis <= 0:
return np.nan
return float(1.0 - (minor_axis / major_axis))
def _build_single_channel_plausibility_table(
masks: np.ndarray,
roi_labels_2d: np.ndarray,
) -> pd.DataFrame:
"""Create a technical voxel-consistency table for segmented objects."""
if np.max(masks) == 0:
return pd.DataFrame(
columns=[
"roi_id",
"object_label",
"object_voxels",
"object_voxels_props",
"object_voxels - object_voxels_props",
]
)
props_table = pd.DataFrame(
regionprops_table(masks, properties=("label", "area"))
).rename(
columns={
"label": "object_label",
"area": "object_voxels_props",
}
)
props_table["object_label"] = props_table["object_label"].astype(int)
rows: list[dict[str, int | float]] = []
roi_labels_3d = np.broadcast_to(roi_labels_2d, masks.shape)
for object_label in props_table["object_label"]:
object_mask = masks == object_label
object_voxels = int(object_mask.sum())
roi_values = roi_labels_3d[object_mask]
roi_values = roi_values[roi_values != 0]
roi_id = int(np.unique(roi_values)[0]) if roi_values.size > 0 else 0
rows.append(
{
"roi_id": roi_id,
"object_label": int(object_label),
"object_voxels": object_voxels,
}
)
plausibility_table = pd.DataFrame(rows).merge(props_table, on="object_label", how="left")
plausibility_table["object_voxels - object_voxels_props"] = (
plausibility_table["object_voxels"] - plausibility_table["object_voxels_props"]
)
return plausibility_table.sort_values(by=["roi_id", "object_label"]).reset_index(drop=True)
def _build_single_channel_object_table(
masks: np.ndarray,
loaded_image: LoadedSingleChannelImage,
roi_labels_2d: np.ndarray,
) -> pd.DataFrame:
"""Create one object-summary row per segmented label.
Effective 2D analyses report area, perimeter, roundness, and eccentricity.
Effective 3D analyses report volume, voxel-surface area, sphericity, and a
simple ellipticity-like elongation score derived from object coordinates.
"""
if np.max(masks) == 0:
return pd.DataFrame(
columns=[
"roi_id",
"object_label",
"centroid_z",
"centroid_y",
"centroid_x",
"object_area_px_2d",
"object_area_um2_2d",
"object_perimeter_px_2d",
"object_perimeter_um_2d",
"object_roundness_2d",
"object_eccentricity_2d",
"object_volume_voxels_3d",
"object_volume_um3_3d",
"object_surface_area_um2_3d",
"object_sphericity_3d",
"object_ellipticity_3d",
]
)
is_effective_2d = masks.shape[0] == 1
z_size_um, y_size_um, x_size_um = loaded_image.voxel_scale_zyx
pixel_area_um2 = y_size_um * x_size_um
voxel_volume_um3 = z_size_um * y_size_um * x_size_um
rows: list[dict[str, int | float]] = []
roi_labels_3d = np.broadcast_to(roi_labels_2d, masks.shape)
if is_effective_2d:
for region in regionprops(masks[0]):
object_label = int(region.label)
object_mask = masks == object_label
roi_values = roi_labels_3d[object_mask]
roi_values = roi_values[roi_values != 0]
roi_id = int(np.unique(roi_values)[0]) if roi_values.size > 0 else 0
object_area_px = float(region.area)
object_area_um2 = float(object_area_px * pixel_area_um2)
perimeter_px = float(getattr(region, "perimeter", np.nan))
perimeter_um = (
float(perimeter_px * ((y_size_um + x_size_um) / 2.0))
if np.isfinite(perimeter_px)
else np.nan
)
roundness = (
float((4.0 * np.pi * object_area_px) / (perimeter_px ** 2))
if np.isfinite(perimeter_px) and perimeter_px > 0
else np.nan
)
rows.append(
{
"roi_id": roi_id,
"object_label": object_label,
"centroid_z": 0.0,
"centroid_y": float(region.centroid[0]),
"centroid_x": float(region.centroid[1]),
"object_area_px_2d": object_area_px,
"object_area_um2_2d": object_area_um2,
"object_perimeter_px_2d": perimeter_px,
"object_perimeter_um_2d": perimeter_um,
"object_roundness_2d": roundness,
"object_eccentricity_2d": float(getattr(region, "eccentricity", np.nan)),
"object_volume_voxels_3d": np.nan,
"object_volume_um3_3d": np.nan,
"object_surface_area_um2_3d": np.nan,
"object_sphericity_3d": np.nan,
"object_ellipticity_3d": np.nan,
}
)
else:
for region in regionprops(masks):
object_label = int(region.label)
object_mask = masks == object_label
roi_values = roi_labels_3d[object_mask]
roi_values = roi_values[roi_values != 0]
roi_id = int(np.unique(roi_values)[0]) if roi_values.size > 0 else 0
object_volume_voxels = float(region.area)
object_volume_um3 = float(object_volume_voxels * voxel_volume_um3)
object_surface_area_um2 = _compute_3d_surface_area_um2(
object_mask,
loaded_image.voxel_scale_zyx,
)
object_sphericity = (
float(
(np.pi ** (1.0 / 3.0)) * ((6.0 * object_volume_um3) ** (2.0 / 3.0))
/ object_surface_area_um2
)
if object_surface_area_um2 > 0 and object_volume_um3 > 0
else np.nan
)
rows.append(
{
"roi_id": roi_id,
"object_label": object_label,
"centroid_z": float(region.centroid[0]),
"centroid_y": float(region.centroid[1]),
"centroid_x": float(region.centroid[2]),
"object_area_px_2d": np.nan,
"object_area_um2_2d": np.nan,
"object_perimeter_px_2d": np.nan,
"object_perimeter_um_2d": np.nan,
"object_roundness_2d": np.nan,
"object_eccentricity_2d": np.nan,
"object_volume_voxels_3d": object_volume_voxels,
"object_volume_um3_3d": object_volume_um3,
"object_surface_area_um2_3d": object_surface_area_um2,
"object_sphericity_3d": object_sphericity,
"object_ellipticity_3d": _compute_3d_ellipticity(
object_mask,
loaded_image.voxel_scale_zyx,
),
}
)
return pd.DataFrame(rows).sort_values(by=["roi_id", "object_label"]).reset_index(drop=True)
def _build_single_channel_overview_table(
roi_labels_2d: np.ndarray,
loaded_image: LoadedSingleChannelImage,
masks: np.ndarray,
object_table: pd.DataFrame,
analysis_z_bounds: tuple[int, int] | None,
) -> pd.DataFrame:
"""Create one ROI overview row per ROI for one-channel analyses.
In addition to object counts and occupancy metrics, the overview reports
per-ROI averages of the object-summary morphology metrics that are
applicable to the current effective dimensionality.
"""
z_size_um, y_size_um, x_size_um = loaded_image.voxel_scale_zyx
pixel_area_um2 = y_size_um * x_size_um
voxel_volume_um3 = z_size_um * y_size_um * x_size_um
n_z = loaded_image.image.shape[0]
z_start, z_stop = analysis_z_bounds if analysis_z_bounds is not None else (0, n_z)
analysis_depth = z_stop - z_start
rows: list[dict[str, int | float]] = []
for roi_id in np.unique(roi_labels_2d):
if roi_id == 0:
continue
roi_mask_2d = roi_labels_2d == roi_id
roi_area_px = int(roi_mask_2d.sum())
roi_area_um2 = float(roi_area_px * pixel_area_um2)
roi_volume_voxels = int(roi_area_px * analysis_depth)
roi_volume_um3 = float(roi_volume_voxels * voxel_volume_um3)
object_rows = object_table[object_table["roi_id"] == roi_id]
row: dict[str, int | float] = {
"roi_id": int(roi_id),
"n_objects": int(len(object_rows)),
"drawn_roi_area_px": roi_area_px,
"drawn_roi_area_um2": roi_area_um2,
"roi_volume_voxels": roi_volume_voxels,
"roi_volume_um3": roi_volume_um3,
}
row.update(
_compute_mask_occupancy_metrics(
"object",
masks,
roi_mask_2d,
loaded_image.voxel_scale_zyx,
analysis_z_bounds,
)
)
average_metric_columns = [
"object_area_px_2d",
"object_area_um2_2d",
"object_perimeter_px_2d",
"object_perimeter_um_2d",
"object_roundness_2d",
"object_eccentricity_2d",
"object_volume_voxels_3d",
"object_volume_um3_3d",
"object_surface_area_um2_3d",
"object_sphericity_3d",
"object_ellipticity_3d",
]
for column_name in average_metric_columns:
if column_name in object_rows.columns:
mean_value = object_rows[column_name].dropna().mean()
row[f"average_{column_name}"] = (
float(mean_value) if pd.notna(mean_value) else np.nan
)
rows.append(row)
return pd.DataFrame(rows)
# %% ANALYSIS CORE
[docs]
def analyze_existing_single_channel_masks(
loaded_image: LoadedSingleChannelImage,
roi_labels_2d: np.ndarray,
masks: np.ndarray,
analysis_config: SingleChannelAnalysisConfig,
analysis_z_bounds: tuple[int, int] | None = None,
refinement_context: CellposeChannelRefinementContext | None = None,
model_config: CellposeModelConfig | None = None,
) -> SingleChannelRunResult:
"""Recompute object tables from existing one-channel label masks."""
effective_analysis_z_bounds = (
None if loaded_image.z_projection_method is not None else analysis_z_bounds
)
full_masks = _apply_analysis_z_bounds(masks, effective_analysis_z_bounds)
print(f"\nFiltering objects smaller than {analysis_config.min_object_voxels} voxels...")
full_masks = filter_labels_by_size(full_masks, analysis_config.min_object_voxels)
if model_config is not None and model_config.postfilters is not None:
print("Applying configured postfilters to single-channel masks...")
full_masks = apply_postfilters(
full_masks,
loaded_image.image,
model_config,
)
object_table = _build_single_channel_object_table(full_masks, loaded_image, roi_labels_2d)
plausibility_table = _build_single_channel_plausibility_table(full_masks, roi_labels_2d)
overview_table = _build_single_channel_overview_table(
roi_labels_2d=roi_labels_2d,
loaded_image=loaded_image,
masks=full_masks,
object_table=object_table,
analysis_z_bounds=effective_analysis_z_bounds,
)
return SingleChannelRunResult(
masks=full_masks,
tables=SingleChannelTables(
objects=object_table,
voxel_plausibility=plausibility_table,
overview=overview_table,
),
analysis_z_bounds=effective_analysis_z_bounds,
refinement_context=refinement_context,
)
[docs]
def run_roi_single_channel_segmentation(
loaded_image: LoadedSingleChannelImage,
roi_labels_2d: np.ndarray,
model_config: CellposeModelConfig,
analysis_config: SingleChannelAnalysisConfig,
runtime_config: RuntimeConfig,
) -> SingleChannelRunResult:
"""Run ROI-wise segmentation and object counting for one analysis channel."""
if not runtime_config.process_rois:
raise ValueError("ROI processing is disabled in RuntimeConfig.")
roi_ids = np.unique(roi_labels_2d)
roi_ids = roi_ids[roi_ids != 0]
print(f"Found {len(roi_ids)} ROIs: {roi_ids}")
if loaded_image.z_projection_method is not None:
analysis_z_bounds = None
else:
analysis_z_bounds = _resolve_analysis_z_bounds(
loaded_image.image.shape[0],
model_config,
)
z_slice = slice(*analysis_z_bounds) if analysis_z_bounds is not None else slice(None)
method = normalize_segmentation_method(model_config.segmentation_method)
model = None
if method == "cellpose":
model = create_cellpose_model(model_config.model_name_or_path, runtime_config.use_gpu)
full_masks = np.zeros(loaded_image.image.shape, dtype=np.uint32)
roi_caches: list[CellposeRefinementRoiCache] = []
label_offset = 0
for roi_id in roi_ids:
print(f"\nProcessing ROI {int(roi_id)}...")
roi_mask_2d = roi_labels_2d == roi_id
bbox = get_bbox_2d(roi_mask_2d)
if bbox is None:
print(f"Skipping ROI {int(roi_id)}: empty ROI")
continue
y_slice, x_slice = bbox
roi_mask_crop_2d = roi_mask_2d[y_slice, x_slice]
image_crop = loaded_image.image[z_slice, y_slice, x_slice].copy()
image_crop = apply_prefilter(image_crop, model_config)
image_crop[:, ~roi_mask_crop_2d] = 0
masks_roi, refinement_cache = evaluate_segmentation_method(
model,
image_crop,
model_config,
loaded_image.voxel_scale_zyx,
)
if refinement_cache is not None:
refinement_cache.roi_id = int(roi_id)
refinement_cache.y_min = int(y_slice.start)
refinement_cache.y_max = int(y_slice.stop)
refinement_cache.x_min = int(x_slice.start)
refinement_cache.x_max = int(x_slice.stop)
refinement_cache.roi_mask_crop_2d = roi_mask_crop_2d.copy()
roi_caches.append(refinement_cache)
masks_roi = relabel_with_offset(masks_roi, label_offset)
if masks_roi.max() > 0:
label_offset = int(masks_roi.max())
full_masks[z_slice, y_slice, x_slice] = np.maximum(
full_masks[z_slice, y_slice, x_slice],
masks_roi,
)
refinement_context = None
if roi_caches:
refinement_context = CellposeChannelRefinementContext(
model=model,
model_name_or_path=model_config.model_name_or_path,
roi_caches=roi_caches,
)
return analyze_existing_single_channel_masks(
loaded_image=loaded_image,
roi_labels_2d=roi_labels_2d,
masks=full_masks,
analysis_config=analysis_config,
analysis_z_bounds=analysis_z_bounds,
refinement_context=refinement_context,
model_config=model_config,
)
def _rebuild_single_channel_masks_from_refinement_context(
image_shape: tuple[int, int, int],
refinement_context: CellposeChannelRefinementContext,
flow_threshold: float | None = None,
cellprob_threshold: float | None = None,
) -> np.ndarray:
"""Recompute one-channel masks from cached Cellpose network outputs."""
rebuilt_masks = np.zeros(image_shape, dtype=np.uint32)
label_offset = 0
for roi_cache in refinement_context.roi_caches:
current_flow_threshold = roi_cache.flow_threshold if flow_threshold is None else flow_threshold
current_cellprob_threshold = (
roi_cache.cellprob_threshold if cellprob_threshold is None else cellprob_threshold
)
masks_roi = refinement_context.model._compute_masks(
roi_cache.shape_for_masks,
roi_cache.dP,
roi_cache.cellprob,
flow_threshold=current_flow_threshold,
cellprob_threshold=current_cellprob_threshold,
min_size=roi_cache.min_size,
max_size_fraction=roi_cache.max_size_fraction,
niter=roi_cache.niter,
do_3D=roi_cache.do_3d,
stitch_threshold=0.0,
)
masks_roi = np.asarray(masks_roi, dtype=np.uint32)
if not roi_cache.do_3d:
masks_roi = masks_roi[np.newaxis, :, :]
masks_roi[:, ~roi_cache.roi_mask_crop_2d] = 0
masks_roi = relabel_with_offset(masks_roi, label_offset)
if masks_roi.max() > 0:
label_offset = int(masks_roi.max())
y_slice = slice(roi_cache.y_min, roi_cache.y_max)
x_slice = slice(roi_cache.x_min, roi_cache.x_max)
rebuilt_masks[:, y_slice, x_slice] = np.maximum(
rebuilt_masks[:, y_slice, x_slice],
masks_roi,
)
return rebuilt_masks
[docs]
def refine_single_channel_run_result_from_cellpose_cache(
loaded_image: LoadedSingleChannelImage,
roi_labels_2d: np.ndarray,
run_result: SingleChannelRunResult,
analysis_config: SingleChannelAnalysisConfig,
model_config: CellposeModelConfig | None = None,
cellprob_threshold: float | None = None,
flow_threshold: float | None = None,
) -> SingleChannelRunResult:
"""Recompute one-channel masks and tables from cached Cellpose outputs.
Passing ``model_config=None`` leaves the current masks unchanged and simply
rebuilds the object tables from the masks already stored in
``run_result``.
"""
if loaded_image.z_projection_method is not None:
analysis_z_bounds = None
else:
analysis_z_bounds = (
run_result.analysis_z_bounds
if model_config is None
else _resolve_analysis_z_bounds(
loaded_image.image.shape[0],
model_config,
fallback=run_result.analysis_z_bounds,
)
)
if model_config is None:
rebuilt_masks = np.asarray(run_result.masks, dtype=np.uint32).copy()
else:
if run_result.refinement_context is None:
raise ValueError(
"Single-channel refinement was requested, but this run result "
"does not contain Cellpose refinement caches. Threshold-only "
"refinement is currently available only when the initial "
"segmentation was produced with a supported Cellpose 4 run."
)
rebuilt_masks = _rebuild_single_channel_masks_from_refinement_context(
image_shape=loaded_image.image.shape,
refinement_context=run_result.refinement_context,
flow_threshold=flow_threshold,
cellprob_threshold=cellprob_threshold,
)
rebuilt_masks = _apply_analysis_z_bounds(rebuilt_masks, analysis_z_bounds)
return analyze_existing_single_channel_masks(
loaded_image=loaded_image,
roi_labels_2d=roi_labels_2d,
masks=rebuilt_masks,
analysis_config=analysis_config,
analysis_z_bounds=analysis_z_bounds,
refinement_context=run_result.refinement_context,
model_config=model_config,
)
# %% VISUALIZATION
[docs]
def create_single_channel_roi_drawing_viewer(
loaded_image: LoadedSingleChannelImage,
display_names: SingleChannelDisplayNames | None = None,
):
"""Open a napari viewer for drawing 2D ROIs on one-channel projections."""
import napari
display_names = display_names or SingleChannelDisplayNames()
projection = loaded_image.image.max(axis=0)
viewer = napari.Viewer()
viewer.add_image(
projection,
name=f"{display_names.channel} max projection for ROI drawing",
scale=loaded_image.voxel_scale_zyx[1:],
)
shapes_layer = viewer.add_shapes(
name="Draw ROIs here",
ndim=2,
shape_type="polygon",
edge_width=2,
face_color="transparent",
blending="additive",
)
return viewer, shapes_layer
[docs]
def show_single_channel_results(
loaded_image: LoadedSingleChannelImage,
roi_labels_2d: np.ndarray,
run_result: SingleChannelRunResult,
display_names: SingleChannelDisplayNames | None = None,
viewer=None,
layers_to_show: Sequence[str] | None = None,
replace_existing_layers: bool = True,
):
"""Display or refresh one-channel analysis layers in napari.
Supported ``layers_to_show`` keys are ``"channel_image"``, ``"rois"``,
``"roi_numbers"``, and ``"masks"``.
"""
display_names = display_names or SingleChannelDisplayNames()
viewer = _get_or_create_viewer(viewer)
selected_layers = _normalize_layer_selection(layers_to_show)
if _should_render_layer(selected_layers, "channel_image"):
_replace_or_add_image(
viewer,
replace_existing_layers=replace_existing_layers,
data=loaded_image.image,
name=display_names.channel,
scale=loaded_image.voxel_scale_zyx,
blending="additive",
colormap="magenta",
channel_axis=None,
)
roi_labels_3d = _build_roi_labels_3d(
roi_labels_2d,
loaded_image.image.shape[0],
run_result.analysis_z_bounds,
)
if _should_render_layer(selected_layers, "rois"):
_replace_or_add_labels(
viewer,
replace_existing_layers=replace_existing_layers,
data=roi_labels_3d,
name="ROIs",
blending="additive",
scale=loaded_image.voxel_scale_zyx,
)
roi_points_yx, roi_text_labels = get_roi_label_points(roi_labels_2d)
if len(roi_points_yx) > 0 and _should_render_layer(selected_layers, "roi_numbers"):
_replace_or_add_points(
viewer,
replace_existing_layers=replace_existing_layers,
data=roi_points_yx,
name="ROI numbers",
scale=loaded_image.voxel_scale_zyx[1:],
size=10,
face_color="transparent",
text={
"string": roi_text_labels,
"size": 14,
"color": "white",
"anchor": "center",
},
)
if _should_render_layer(selected_layers, "masks"):
_replace_or_add_labels(
viewer,
replace_existing_layers=replace_existing_layers,
data=run_result.masks,
name=display_names.objects,
blending="additive",
scale=loaded_image.voxel_scale_zyx,
)
_hide_layer_if_present(viewer, f"{display_names.channel} max projection for ROI drawing")
_hide_layer_if_present(viewer, "Draw ROIs here")
return viewer
__all__ = [
"SingleChannelConfig",
"SingleChannelDisplayNames",
"SingleChannelAnalysisConfig",
"SingleChannelResultsPaths",
"LoadedSingleChannelImage",
"SingleChannelTables",
"SingleChannelRunResult",
"build_single_channel_results_paths",
"load_single_channel_image",
"prepare_loaded_single_channel_image_for_analysis",
"try_load_single_channel_roi_labels",
"run_roi_single_channel_segmentation",
"analyze_existing_single_channel_masks",
"refine_single_channel_run_result_from_cellpose_cache",
"create_single_channel_roi_drawing_viewer",
"show_single_channel_results",
"extract_single_channel_masks_from_viewer",
"export_single_channel_outputs",
]