Skip to content

Utils

novae.utils.spatial_neighbors(adata, slide_key=None, radius=None, pixel_size=None, technology=None, coord_type=None, n_neighs=None, delaunay=None, n_rings=1, percentile=None, set_diag=False, reset_slide_ids=True)

Create a Delaunay graph from the spatial coordinates of the cells. The graph is stored in adata.obsp['spatial_connectivities'] and adata.obsp['spatial_distances']. The long edges are removed from the graph according to the radius argument (if provided).

Info

This function was updated from squidpy.

Parameters:

Name Type Description Default
adata AnnData | list[AnnData]

An AnnData object, or a list of AnnData objects.

required
slide_key str | None

Optional key in adata.obs indicating the slide ID of each cell. If provided, the graph is computed for each slide separately.

None
radius tuple[float, float] | float | None

tuple that prunes the final graph to only contain edges in interval [min(radius), max(radius)]. If float, uses [0, radius]. If None, all edges are kept.

None
technology str | SpatialTechnology | None

Technology or machine used to generate the spatial data. One of "cosmx", "merscope", "xenium", "visium", "visium_hd". If None, uses adata.obsm["spatial"].

None
coord_type str | CoordType | None

Either "grid" or "generic". If "grid", the graph is built on a grid. If "generic", the graph is built using the coordinates as they are. By default, uses "grid" for Visium/VisiumHD and "generic" for other technologies.

None
n_neighs int | None

Number of neighbors to consider. If None, uses 6 for Visium, 4 for Visium HD, and None for generic graphs.

None
delaunay bool | None

Whether to use Delaunay triangulation to build the graph. If None, uses False for grid-based graphs and True for generic graphs.

None
n_rings int

See squidpy.gr.spatial_neighbors documentation.

1
percentile float | None

See squidpy.gr.spatial_neighbors documentation.

None
set_diag bool

See squidpy.gr.spatial_neighbors documentation.

False
reset_slide_ids bool

Whether to reset the novae slide ids.

True
Source code in novae/utils/_build.py
def spatial_neighbors(
    adata: AnnData | list[AnnData],
    slide_key: str | None = None,
    radius: tuple[float, float] | float | None = None,
    pixel_size: float | None = None,
    technology: str | SpatialTechnology | None = None,
    coord_type: str | CoordType | None = None,
    n_neighs: int | None = None,
    delaunay: bool | None = None,
    n_rings: int = 1,
    percentile: float | None = None,
    set_diag: bool = False,
    reset_slide_ids: bool = True,
):
    """Create a Delaunay graph from the spatial coordinates of the cells.
    The graph is stored in `adata.obsp['spatial_connectivities']` and `adata.obsp['spatial_distances']`. The long edges
    are removed from the graph according to the `radius` argument (if provided).

    Info:
        This function was updated from [squidpy](https://squidpy.readthedocs.io/en/latest/api/squidpy.gr.spatial_neighbors.html#squidpy.gr.spatial_neighbors).

    Args:
        adata: An `AnnData` object, or a list of `AnnData` objects.
        slide_key: Optional key in `adata.obs` indicating the slide ID of each cell. If provided, the graph is computed for each slide separately.
        radius: `tuple` that prunes the final graph to only contain edges in interval `[min(radius), max(radius)]`. If `float`, uses `[0, radius]`. If `None`, all edges are kept.
        technology: Technology or machine used to generate the spatial data. One of `"cosmx", "merscope", "xenium", "visium", "visium_hd"`. If `None`, uses `adata.obsm["spatial"]`.
        coord_type: Either `"grid"` or `"generic"`. If `"grid"`, the graph is built on a grid. If `"generic"`, the graph is built using the coordinates as they are. By default, uses `"grid"` for Visium/VisiumHD and `"generic"` for other technologies.
        n_neighs: Number of neighbors to consider. If `None`, uses `6` for Visium, `4` for Visium HD, and `None` for generic graphs.
        delaunay: Whether to use Delaunay triangulation to build the graph. If `None`, uses `False` for grid-based graphs and `True` for generic graphs.
        n_rings: See `squidpy.gr.spatial_neighbors` documentation.
        percentile: See `squidpy.gr.spatial_neighbors` documentation.
        set_diag: See `squidpy.gr.spatial_neighbors` documentation.
        reset_slide_ids: Whether to reset the novae slide ids.
    """
    if reset_slide_ids:
        _set_unique_slide_ids(adata, slide_key=slide_key)

    if isinstance(adata, list):
        for adata_ in adata:
            spatial_neighbors(
                adata_,
                slide_key=slide_key,
                radius=radius,
                pixel_size=pixel_size,
                technology=technology,
                coord_type=coord_type,
                n_neighs=n_neighs,
                delaunay=delaunay,
                n_rings=n_rings,
                percentile=percentile,
                set_diag=set_diag,
                reset_slide_ids=False,
            )
        return

    if isinstance(radius, float) or isinstance(radius, int):
        radius = [0.0, float(radius)]

    assert radius is None or len(radius) == 2, "Radius is expected to be a tuple (min_radius, max_radius)"

    assert pixel_size is None or technology is None, "You must choose argument between `pixel_size` and `technology`"

    if technology == "visium":
        n_neighs = 6 if n_neighs is None else n_neighs
        coord_type, delaunay = CoordType.GRID, False
    elif technology == "visium_hd":
        n_neighs = 8 if n_neighs is None else n_neighs
        coord_type, delaunay = CoordType.GRID, False
    elif technology is not None:
        adata.obsm["spatial"] = _technology_coords(adata, technology)

    assert (
        "spatial" in adata.obsm
    ), "Key 'spatial' not found in adata.obsm. This should contain the 2D spatial coordinates of the cells"

    coord_type = CoordType(coord_type or "generic")
    delaunay = True if delaunay is None else delaunay
    n_neighs = 6 if (n_neighs is None and not delaunay) else n_neighs

    log.info(
        f"Computing graph on {adata.n_obs:,} cells (coord_type={coord_type.value}, {delaunay=}, {radius=}, {n_neighs=})"
    )

    slides = adata.obs[Keys.SLIDE_ID].cat.categories
    make_index_unique(adata.obs_names)

    _build_fun = partial(
        _spatial_neighbor,
        coord_type=coord_type,
        n_neighs=n_neighs,
        radius=radius,
        delaunay=delaunay,
        n_rings=n_rings,
        set_diag=set_diag,
        percentile=percentile,
    )

    if len(slides) > 1:
        mats: list[tuple[spmatrix, spmatrix]] = []
        ixs = []  # type: ignore[var-annotated]
        for slide in slides:
            ixs.extend(np.where(adata.obs[Keys.SLIDE_ID] == slide)[0])
            mats.append(_build_fun(adata[adata.obs[Keys.SLIDE_ID] == slide]))
        ixs = np.argsort(ixs)  # type: ignore[assignment] # invert
        Adj = block_diag([m[0] for m in mats], format="csr")[ixs, :][:, ixs]
        Dst = block_diag([m[1] for m in mats], format="csr")[ixs, :][:, ixs]
    else:
        Adj, Dst = _build_fun(adata)

    adata.obsp["spatial_connectivities"] = Adj
    adata.obsp["spatial_distances"] = Dst

    adata.uns["spatial_neighbors"] = {
        "connectivities_key": "spatial_connectivities",
        "distances_key": "spatial_distances",
        "params": {"radius": radius, "set_diag": set_diag, "n_neighbors": n_neighs, "coord_type": coord_type.value},
    }

    _sanity_check_spatial_neighbors(adata)

novae.utils.quantile_scaling(adata, multiplier=5, quantile=0.2, per_slide=True)

Preprocess fluorescence data from adata.X using quantiles of expression. For each column X, we compute asinh(X / 5*Q(0.2, X)), and store them back.

Parameters:

Name Type Description Default
adata AnnData | list[AnnData]

An AnnData object, or a list of AnnData objects.

required
multiplier float

The multiplier for the quantile.

5
quantile float

The quantile to compute.

0.2
per_slide bool

Whether to compute the quantile per slide. If False, the quantile is computed for each AnnData object.

True
Source code in novae/utils/_preprocess.py
def quantile_scaling(
    adata: AnnData | list[AnnData],
    multiplier: float = 5,
    quantile: float = 0.2,
    per_slide: bool = True,
) -> pd.DataFrame:
    """Preprocess fluorescence data from `adata.X` using quantiles of expression.
    For each column `X`, we compute `asinh(X / 5*Q(0.2, X))`, and store them back.

    Args:
        adata: An `AnnData` object, or a list of `AnnData` objects.
        multiplier: The multiplier for the quantile.
        quantile: The quantile to compute.
        per_slide: Whether to compute the quantile per slide. If `False`, the quantile is computed for each `AnnData` object.
    """
    _check_has_slide_id(adata)

    if isinstance(adata, list):
        for adata_ in adata:
            quantile_scaling(adata_, multiplier, quantile, per_slide=per_slide)
        return

    if not per_slide:
        return _quantile_scaling(adata, multiplier, quantile)

    for adata_ in iter_slides(adata):
        _quantile_scaling(adata_, multiplier, quantile)

novae.utils.prepare_adatas(adata, var_names=None)

Ensure the AnnData objects are ready to be used by the model.

Note

It performs the following operations:

  • Preprocess the data if needed (e.g. normalize, log1p), in which case raw counts are saved in adata.layers['counts']
  • Compute the mean and std of each gene
  • Save which genes are highly variable, in case the number of genes is too high
  • If using a pretrained model, save which genes are known by the model

Parameters:

Name Type Description Default
adata AnnData | list[AnnData] | None

An AnnData object, or a list of AnnData objects. Optional if the model was initialized with adata.

required
var_names set | list[str] | None

Only used when loading a pretrained model. Do not use it yourself.

None

Returns:

Type Description
list[AnnData]

A list of AnnData objects ready to be used by the model. If only one adata object is provided, it will be wrapped in a list.

Source code in novae/utils/_validate.py
def prepare_adatas(
    adata: AnnData | list[AnnData] | None,
    var_names: set | list[str] | None = None,
) -> list[AnnData]:
    """Ensure the AnnData objects are ready to be used by the model.

    Note:
        It performs the following operations:

        - Preprocess the data if needed (e.g. normalize, log1p), in which case raw counts are saved in `adata.layers['counts']`
        - Compute the mean and std of each gene
        - Save which genes are highly variable, in case the number of genes is too high
        - If using a pretrained model, save which genes are known by the model

    Args:
        adata: An `AnnData` object, or a list of `AnnData` objects. Optional if the model was initialized with `adata`.
        var_names: Only used when loading a pretrained model. Do not use it yourself.

    Returns:
        A list of `AnnData` objects ready to be used by the model. If only one `adata` object is provided, it will be wrapped in a list.
    """
    assert adata is not None or var_names is not None, "One of `adata` and `var_names` must not be None"

    if adata is None:
        return None, var_names

    if isinstance(adata, AnnData):
        adatas = [adata]
    elif isinstance(adata, list):
        adatas = adata
    else:
        raise ValueError(f"Invalid type for `adata`: {type(adata)}")

    assert len(adatas) > 0, "No `adata` object found. Please provide an AnnData object, or a list of AnnData objects."

    assert all(
        Keys.ADJ in adata.obsp for adata in adatas
    ), "You need to first run `novae.utils.spatial_neighbors` to compute cell neighbors."

    _check_has_slide_id(adatas)
    _standardize_adatas(adatas)  # log1p + spatial_neighbors
    if settings.auto_preprocessing:
        _lookup_highly_variable_genes(adatas)
    _select_novae_genes(adatas, var_names)

    if var_names is None:
        var_names = _genes_union(adatas, among_used=True)

    return adatas, var_names

novae.utils.load_dataset(pattern=None, tissue=None, species=None, custom_filter=None, top_k=None, dry_run=False)

Automatically load slides from the Novae dataset repository.

Selecting slides

The function arguments allow to filter the slides based on the tissue, species, and name pattern. Internally, the function reads this dataset metadata file to select the slides that match the provided filters.

Parameters:

Name Type Description Default
pattern str | None

Optional pattern to match the slides names.

None
tissue list[str] | str | None

Optional tissue (or tissue list) to filter the slides. E.g., "brain", "colon".

None
species list[str] | str | None

Optional species (or species list) to filter the slides. E.g., "human", "mouse".

None
custom_filter Callable[[DataFrame], Series] | None

Custom filter function that takes the metadata DataFrame (see above link) and returns a boolean Series to decide which rows should be kept.

None
top_k int | None

Optional number of slides to keep. If None, keeps all slides.

None
dry_run bool

If True, the function will only return the metadata of slides that match the filters.

False

Returns:

Type Description
list[AnnData]

A list of AnnData objects, each object corresponds to one slide.

Source code in novae/utils/_data.py
def load_dataset(
    pattern: str | None = None,
    tissue: list[str] | str | None = None,
    species: list[str] | str | None = None,
    custom_filter: Callable[[pd.DataFrame], pd.Series] | None = None,
    top_k: int | None = None,
    dry_run: bool = False,
) -> list[AnnData]:
    """Automatically load slides from the Novae dataset repository.

    !!! info "Selecting slides"
        The function arguments allow to filter the slides based on the tissue, species, and name pattern.
        Internally, the function reads [this dataset metadata file](https://huggingface.co/datasets/MICS-Lab/novae/blob/main/metadata.csv) to select the slides that match the provided filters.

    Args:
        pattern: Optional pattern to match the slides names.
        tissue: Optional tissue (or tissue list) to filter the slides. E.g., `"brain", "colon"`.
        species: Optional species (or species list) to filter the slides. E.g., `"human", "mouse"`.
        custom_filter: Custom filter function that takes the metadata DataFrame (see above link) and returns a boolean Series to decide which rows should be kept.
        top_k: Optional number of slides to keep. If `None`, keeps all slides.
        dry_run: If `True`, the function will only return the metadata of slides that match the filters.

    Returns:
        A list of `AnnData` objects, each object corresponds to one slide.
    """
    metadata = pd.read_csv("hf://datasets/MICS-Lab/novae/metadata.csv", index_col=0)

    valid_species = metadata["species"].unique()
    valid_tissues = metadata["tissue"].unique()

    if species is not None:
        species = [species] if isinstance(species, str) else species
        assert all(
            s in valid_species for s in species
        ), f"Found invalid species in {species}. Valid species are {valid_species}."
        metadata = metadata[metadata["species"].isin(species)]

    if tissue is not None:
        tissues = [tissue] if isinstance(tissue, str) else tissue
        assert all(
            tissue in valid_tissues for tissue in tissues
        ), f"Found invalid tissues in {tissues}. Valid tissues for the provided species are {valid_tissues}."
        metadata = metadata[metadata["tissue"].isin(tissues)]

    if custom_filter is not None:
        metadata = metadata[custom_filter(metadata)]

    assert not metadata.empty, "No dataset found for the provided filters."

    if pattern is not None:
        where = metadata.index.str.match(pattern)
        assert len(where), f"No dataset found for the provided pattern ({', '.join(list(metadata.index))})."
        metadata = metadata[where]

    assert not metadata.empty, "No dataset found for the provided filters."

    if top_k is not None:
        metadata = metadata.head(top_k)

    if dry_run:
        return metadata

    log.info(f"Found {len(metadata)} h5ad file(s) matching the filters.")
    return [_read_h5ad_from_hub(name, row) for name, row in metadata.iterrows()]

novae.utils.toy_dataset(n_panels=3, n_domains=4, n_slides_per_panel=1, xmax=500, n_vars=100, n_drop=20, step=20, panel_shift_lambda=5, slide_shift_lambda=1.5, domain_shift_lambda=2.0, slide_ids_unique=True, compute_spatial_neighbors=False, merge_last_domain_even_slide=False)

Creates a toy dataset, useful for debugging or testing.

Parameters:

Name Type Description Default
n_panels int

Number of panels. Each panel will correspond to one output AnnData object.

3
n_domains int

Number of domains.

4
n_slides_per_panel int

Number of slides per panel.

1
xmax int

Maximum value for the spatial coordinates (the larger, the more cells).

500
n_vars int

Maxmium number of genes per panel.

100
n_drop int

Number of genes that are randomly removed for each AnnData object. It will create non-identical panels.

20
step int

Step between cells in their spatial coordinates.

20
panel_shift_lambda float

Lambda used in the exponential law for each panel.

5
slide_shift_lambda float

Lambda used in the exponential law for each slide.

1.5
domain_shift_lambda float

Lambda used in the exponential law for each domain.

2.0
slide_ids_unique bool

Whether to ensure that slide ids are unique.

True
compute_spatial_neighbors bool

Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

False

Returns:

Type Description
list[AnnData]

A list of AnnData objects representing a valid Novae dataset.

Source code in novae/utils/_data.py
def toy_dataset(
    n_panels: int = 3,
    n_domains: int = 4,
    n_slides_per_panel: int = 1,
    xmax: int = 500,
    n_vars: int = 100,
    n_drop: int = 20,
    step: int = 20,
    panel_shift_lambda: float = 5,
    slide_shift_lambda: float = 1.5,
    domain_shift_lambda: float = 2.0,
    slide_ids_unique: bool = True,
    compute_spatial_neighbors: bool = False,
    merge_last_domain_even_slide: bool = False,
) -> list[AnnData]:
    """Creates a toy dataset, useful for debugging or testing.

    Args:
        n_panels: Number of panels. Each panel will correspond to one output `AnnData` object.
        n_domains: Number of domains.
        n_slides_per_panel: Number of slides per panel.
        xmax: Maximum value for the spatial coordinates (the larger, the more cells).
        n_vars: Maxmium number of genes per panel.
        n_drop: Number of genes that are randomly removed for each `AnnData` object. It will create non-identical panels.
        step: Step between cells in their spatial coordinates.
        panel_shift_lambda: Lambda used in the exponential law for each panel.
        slide_shift_lambda: Lambda used in the exponential law for each slide.
        domain_shift_lambda: Lambda used in the exponential law for each domain.
        slide_ids_unique: Whether to ensure that slide ids are unique.
        compute_spatial_neighbors: Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

    Returns:
        A list of `AnnData` objects representing a valid `Novae` dataset.
    """
    assert n_vars - n_drop - n_panels > 2

    spatial = np.mgrid[-xmax:xmax:step, -xmax:xmax:step].reshape(2, -1).T
    spatial = spatial[(spatial**2).sum(1) <= xmax**2]
    n_obs = len(spatial)

    int_domains = (np.sqrt((spatial**2).sum(1)) // (xmax / n_domains + 1e-8)).astype(int)
    domain = "domain_" + int_domains.astype(str).astype(object)
    merge_domain = "domain_" + int_domains.clip(0, n_domains - 2).astype(str).astype(object)

    adatas = []

    var_names = np.array(
        GENE_NAMES_SUBSET[:n_vars] if n_vars <= len(GENE_NAMES_SUBSET) else [f"g{i}" for i in range(n_vars)]
    )

    domains_shift = np.random.exponential(domain_shift_lambda, size=(n_domains, n_vars))

    for panel_index in range(n_panels):
        adatas_panel = []
        panel_shift = np.random.exponential(panel_shift_lambda, size=n_vars)

        for slide_index in range(n_slides_per_panel):
            slide_shift = np.random.exponential(slide_shift_lambda, size=n_vars)

            merge = merge_last_domain_even_slide and (slide_index % 2 == 0)

            adata = AnnData(
                np.zeros((n_obs, n_vars)),
                obsm={"spatial": spatial + panel_index + slide_index},  # ensure the locs are different
                obs=pd.DataFrame(
                    {"domain": merge_domain if merge else domain}, index=[f"cell_{i}" for i in range(spatial.shape[0])]
                ),
            )

            adata.var_names = var_names
            adata.obs_names = [f"c_{panel_index}_{slide_index}_{i}" for i in range(adata.n_obs)]

            slide_key = f"slide_{panel_index}_{slide_index}" if slide_ids_unique else f"slide_{slide_index}"
            adata.obs["slide_key"] = slide_key

            for i in range(n_domains):
                condition = adata.obs["domain"] == "domain_" + str(i)
                n_obs_domain = condition.sum()

                lambdas = domains_shift[i] + slide_shift + panel_shift
                X_domain = np.random.exponential(lambdas, size=(n_obs_domain, n_vars))
                adata.X[condition] = X_domain.astype(int)  # values should look like counts

            if n_drop:
                size = n_vars - n_drop - panel_index  # different number of genes
                var_indices = np.random.choice(n_vars, size=size, replace=False)
                adata = adata[:, var_indices].copy()

            adatas_panel.append(adata[: -1 - panel_index - slide_index].copy())  # different number of cells

        adata_panel = anndata.concat(adatas_panel)

        if compute_spatial_neighbors:
            spatial_neighbors(adata_panel, slide_key="slide_key")
            _drop_neighbors(adata_panel, index=3)  # ensure one node is not connected to any other

        adata_panel.layers["counts"] = adata_panel.X.copy()
        sc.pp.normalize_total(adata_panel)
        sc.pp.log1p(adata_panel)

        adatas.append(adata_panel)

    return adatas