Skip to content

novae.utils

novae.utils.spatial_neighbors(adata, radius=100, slide_key=None, pixel_size=None, technology=None, percentile=None, set_diag=False)

Create a Delaunay graph from the spatial coordinates of the cells (in microns). The graph is stored in adata.obsp['spatial_connectivities'] and adata.obsp['spatial_distances']. The long edges are removed from the graph according to the radius argument (if provided).

Note

The spatial coordinates are expected to be in microns, and stored in adata.obsm["spatial"]. If the coordinates are in pixels, set pixel_size to the size of a pixel in microns. If you don't know the pixel_size, or if you don't have adata.obsm["spatial"], you can also provide the technology argument.

Info

This function was updated from squidpy.

Parameters:

Name Type Description Default
adata AnnData

AnnData object

required
radius tuple[float, float] | float | None

tuple that prunes the final graph to only contain edges in interval [min(radius), max(radius)] microns. If float, uses [0, radius]. If None, all edges are kept.

100
slide_key str | None

Optional key in adata.obs indicating the slide ID of each cell. If provided, the graph is computed for each slide separately.

None
pixel_size float | None

Number of microns in one pixel of the image (use this argument if adata.obsm["spatial"] is in pixels). If None, the coordinates are assumed to be in microns.

None
technology str | None

Technology or machine used to generate the spatial data. One of "cosmx", "merscope", "xenium". If None, the coordinates are assumed to be in microns.

None
percentile float | None

Percentile of the distances to use as threshold.

None
set_diag bool

Whether to set the diagonal of the spatial connectivities to 1.0.

False
Source code in novae/utils/_build.py
def spatial_neighbors(
    adata: AnnData,
    radius: tuple[float, float] | float | None = 100,
    slide_key: str | None = None,
    pixel_size: float | None = None,
    technology: str | None = None,
    percentile: float | None = None,
    set_diag: bool = False,
):
    """Create a Delaunay graph from the spatial coordinates of the cells (in microns).
    The graph is stored in `adata.obsp['spatial_connectivities']` and `adata.obsp['spatial_distances']`. The long edges
    are removed from the graph according to the `radius` argument (if provided).

    Note:
        The spatial coordinates are expected to be in microns, and stored in `adata.obsm["spatial"]`.
        If the coordinates are in pixels, set `pixel_size` to the size of a pixel in microns.
        If you don't know the `pixel_size`, or if you don't have `adata.obsm["spatial"]`, you can also provide the `technology` argument.

    Info:
        This function was updated from [squidpy](https://squidpy.readthedocs.io/en/latest/api/squidpy.gr.spatial_neighbors.html#squidpy.gr.spatial_neighbors).

    Args:
        adata: AnnData object
        radius: `tuple` that prunes the final graph to only contain edges in interval `[min(radius), max(radius)]` microns. If `float`, uses `[0, radius]`. If `None`, all edges are kept.
        slide_key: Optional key in `adata.obs` indicating the slide ID of each cell. If provided, the graph is computed for each slide separately.
        pixel_size: Number of microns in one pixel of the image (use this argument if `adata.obsm["spatial"]` is in pixels). If `None`, the coordinates are assumed to be in microns.
        technology: Technology or machine used to generate the spatial data. One of `"cosmx", "merscope", "xenium"`. If `None`, the coordinates are assumed to be in microns.
        percentile: Percentile of the distances to use as threshold.
        set_diag: Whether to set the diagonal of the spatial connectivities to `1.0`.
    """
    if isinstance(radius, float) or isinstance(radius, int):
        radius = [0.0, float(radius)]

    assert radius is None or len(radius) == 2, "Radius is expected to be a tuple (min_radius, max_radius)"

    assert pixel_size is None or technology is None, "You must choose argument between `pixel_size` and `technology`"

    if technology is not None:
        adata.obsm["spatial"] = _technology_coords(adata, technology)

    assert (
        "spatial" in adata.obsm
    ), "Key 'spatial' not found in adata.obsm. This should contain the 2D spatial coordinates of the cells"

    if pixel_size is not None:
        assert (
            "spatial_pixel" not in adata.obsm
        ), "Do nott run `novae.utils.spatial_neighbors` twice ('spatial_pixel' already present in `adata.obsm`)."
        adata.obsm["spatial_pixel"] = adata.obsm["spatial"].copy()
        adata.obsm["spatial"] = adata.obsm["spatial"] * pixel_size

    log.info(f"Computing delaunay graph on {adata.n_obs:,} cells (radius threshold: {radius} microns)")

    if slide_key is not None:
        adata.obs[slide_key] = adata.obs[slide_key].astype("category")
        slides = adata.obs[slide_key].cat.categories
        make_index_unique(adata.obs_names)
    else:
        slides = [None]

    _build_fun = partial(
        _spatial_neighbor,
        set_diag=set_diag,
        radius=radius,
        percentile=percentile,
    )

    if slide_key is not None:
        mats: list[tuple[spmatrix, spmatrix]] = []
        ixs = []  # type: ignore[var-annotated]
        for slide in slides:
            ixs.extend(np.where(adata.obs[slide_key] == slide)[0])
            mats.append(_build_fun(adata[adata.obs[slide_key] == slide]))
        ixs = np.argsort(ixs)  # type: ignore[assignment] # invert
        Adj = block_diag([m[0] for m in mats], format="csr")[ixs, :][:, ixs]
        Dst = block_diag([m[1] for m in mats], format="csr")[ixs, :][:, ixs]
    else:
        Adj, Dst = _build_fun(adata)

    adata.obsp["spatial_connectivities"] = Adj
    adata.obsp["spatial_distances"] = Dst

    adata.uns["spatial_neighbors"] = {
        "connectivities_key": "spatial_connectivities",
        "distances_key": "spatial_distances",
        "params": {"radius": radius, "set_diag": set_diag},
    }

novae.utils.prepare_adatas(adata, slide_key=None, var_names=None)

Ensure the AnnData objects are ready to be used by the model.

Note

It performs the following operations:

  • Preprocess the data if needed (e.g. normalize, log1p), in which case raw counts are saved in adata.layers['counts']
  • Compute spatial neighbors (if not already computed), using novae.utils.spatial_neighbors
  • Compute the mean and std of each gene
  • Save which genes are highly variable, in case the number of genes is too high
  • If slide_key is provided, ensure that all slide_key are valid and unique
  • If using a pretrained model, save which genes are known by the model

Parameters:

Name Type Description Default
adata AnnData | list[AnnData] | None

An AnnData object, or a list of AnnData objects. Optional if the model was initialized with adata.

required
slide_key str | None

Optional key of adata.obs containing the ID of each slide. Not needed if each adata is a slide.

None
var_names set | list[str] | None

Only used when loading a pretrained model. To not use it yourself.

None

Returns:

Type Description
list[AnnData]

A list of AnnData objects ready to be used by the model. If only one adata object is provided, it will be wrapped in a list.

Source code in novae/utils/_validate.py
@format_docs
def prepare_adatas(
    adata: AnnData | list[AnnData] | None,
    slide_key: str | None = None,
    var_names: set | list[str] | None = None,
) -> list[AnnData]:
    """Ensure the AnnData objects are ready to be used by the model.

    Note:
        It performs the following operations:

        - Preprocess the data if needed (e.g. normalize, log1p), in which case raw counts are saved in `adata.layers['counts']`
        - Compute spatial neighbors (if not already computed), using [novae.utils.spatial_neighbors][]
        - Compute the mean and std of each gene
        - Save which genes are highly variable, in case the number of genes is too high
        - If `slide_key` is provided, ensure that all `slide_key` are valid and unique
        - If using a pretrained model, save which genes are known by the model

    Args:
        {adata}
        {slide_key}
        {var_names}

    Returns:
        A list of `AnnData` objects ready to be used by the model. If only one `adata` object is provided, it will be wrapped in a list.
    """
    assert adata is not None or var_names is not None, "One of `adata` and `var_names` must not be None"

    if adata is None:
        return None, var_names

    if isinstance(adata, AnnData):
        adatas = [adata]
    elif isinstance(adata, list):
        adatas = adata
    else:
        raise ValueError(f"Invalid type for `adata`: {type(adata)}")

    assert len(adatas) > 0, "No `adata` object found. Please provide an AnnData object, or a list of AnnData objects."

    _set_unique_slide_ids(adatas, slide_key=slide_key)
    _standardize_adatas(adatas, slide_key=slide_key)  # log1p + spatial_neighbors
    _lookup_highly_variable_genes(adatas)
    _select_novae_genes(adatas, var_names)

    if var_names is None:
        var_names = _genes_union(adatas, among_used=True)

    return adatas, var_names

novae.utils.load_dataset(pattern=None, tissue=None, species=None)

Automatically load slides from the Novae dataset repository.

Selecting slides

The function arguments allow to filter the slides based on the tissue, species, and name pattern. Internally, the function reads this dataset metadata file to select the slides that match the provided filters.

Parameters:

Name Type Description Default
pattern str | None

Optional pattern to match the slides names.

None
tissue list[str] | str | None

Optional tissue (or tissue list) to filter the slides. E.g., "brain", "colon".

None
species list[str] | str | None

Optional species (or species list) to filter the slides. E.g., "human", "mouse".

None

Returns:

Type Description
list[AnnData]

A list of AnnData objects, each object corresponds to one slide.

Source code in novae/utils/_data.py
def load_dataset(
    pattern: str | None = None, tissue: list[str] | str | None = None, species: list[str] | str | None = None
) -> list[AnnData]:
    """Automatically load slides from the Novae dataset repository.

    !!! info "Selecting slides"
        The function arguments allow to filter the slides based on the tissue, species, and name pattern.
        Internally, the function reads [this dataset metadata file](https://huggingface.co/datasets/MICS-Lab/novae/blob/main/metadata.csv) to select the slides that match the provided filters.

    Args:
        pattern: Optional pattern to match the slides names.
        tissue: Optional tissue (or tissue list) to filter the slides. E.g., `"brain", "colon"`.
        species: Optional species (or species list) to filter the slides. E.g., `"human", "mouse"`.

    Returns:
        A list of `AnnData` objects, each object corresponds to one slide.
    """
    metadata = pd.read_csv("hf://datasets/MICS-Lab/novae/metadata.csv", index_col=0)

    valid_species = metadata["species"].unique()
    valid_tissues = metadata["tissue"].unique()

    if species is not None:
        species = [species] if isinstance(species, str) else species
        assert all(
            s in valid_species for s in species
        ), f"Found invalid species in {species}. Valid species are {valid_species}."
        metadata = metadata[metadata["species"].isin(species)]

    if tissue is not None:
        tissues = [tissue] if isinstance(tissue, str) else tissue
        assert all(
            tissue in valid_tissues for tissue in tissues
        ), f"Found invalid tissues in {tissues}. Valid tissues for the provided species are {valid_tissues}."
        metadata = metadata[metadata["tissue"].isin(tissues)]

    assert not metadata.empty, "No dataset found for the provided filters."

    if pattern is not None:
        where = metadata.index.str.match(pattern)
        assert len(where), f"No dataset found for the provided pattern ({', '.join(list(metadata.index))})."
        metadata = metadata[where]

    assert not metadata.empty, "No dataset found for the provided filters."

    log.info(f"Found {len(metadata)} h5ad file(s) matching the filters.")
    return [_read_h5ad_from_hub(name, row) for name, row in metadata.iterrows()]

novae.utils.toy_dataset(n_panels=3, n_domains=4, n_slides_per_panel=1, xmax=500, n_vars=100, n_drop=20, step=20, panel_shift_lambda=5, slide_shift_lambda=1.5, domain_shift_lambda=2.0, slide_ids_unique=True, compute_spatial_neighbors=False, merge_last_domain_even_slide=False)

Creates a toy dataset, useful for debugging or testing.

Parameters:

Name Type Description Default
n_panels int

Number of panels. Each panel will correspond to one output AnnData object.

3
n_domains int

Number of domains.

4
n_slides_per_panel int

Number of slides per panel.

1
xmax int

Maximum value for the spatial coordinates (the larger, the more cells).

500
n_vars int

Maxmium number of genes per panel.

100
n_drop int

Number of genes that are randomly removed for each AnnData object. It will create non-identical panels.

20
step int

Step between cells in their spatial coordinates.

20
panel_shift_lambda float

Lambda used in the exponential law for each panel.

5
slide_shift_lambda float

Lambda used in the exponential law for each slide.

1.5
domain_shift_lambda float

Lambda used in the exponential law for each domain.

2.0
slide_ids_unique bool

Whether to ensure that slide ids are unique.

True
compute_spatial_neighbors bool

Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

False

Returns:

Type Description
list[AnnData]

A list of AnnData objects representing a valid Novae dataset.

Source code in novae/utils/_data.py
def toy_dataset(
    n_panels: int = 3,
    n_domains: int = 4,
    n_slides_per_panel: int = 1,
    xmax: int = 500,
    n_vars: int = 100,
    n_drop: int = 20,
    step: int = 20,
    panel_shift_lambda: float = 5,
    slide_shift_lambda: float = 1.5,
    domain_shift_lambda: float = 2.0,
    slide_ids_unique: bool = True,
    compute_spatial_neighbors: bool = False,
    merge_last_domain_even_slide: bool = False,
) -> list[AnnData]:
    """Creates a toy dataset, useful for debugging or testing.

    Args:
        n_panels: Number of panels. Each panel will correspond to one output `AnnData` object.
        n_domains: Number of domains.
        n_slides_per_panel: Number of slides per panel.
        xmax: Maximum value for the spatial coordinates (the larger, the more cells).
        n_vars: Maxmium number of genes per panel.
        n_drop: Number of genes that are randomly removed for each `AnnData` object. It will create non-identical panels.
        step: Step between cells in their spatial coordinates.
        panel_shift_lambda: Lambda used in the exponential law for each panel.
        slide_shift_lambda: Lambda used in the exponential law for each slide.
        domain_shift_lambda: Lambda used in the exponential law for each domain.
        slide_ids_unique: Whether to ensure that slide ids are unique.
        compute_spatial_neighbors: Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

    Returns:
        A list of `AnnData` objects representing a valid `Novae` dataset.
    """
    assert n_vars - n_drop - n_panels > 2

    spatial = np.mgrid[-xmax:xmax:step, -xmax:xmax:step].reshape(2, -1).T
    spatial = spatial[(spatial**2).sum(1) <= xmax**2]
    n_obs = len(spatial)

    int_domains = (np.sqrt((spatial**2).sum(1)) // (xmax / n_domains + 1e-8)).astype(int)
    domain = "domain_" + int_domains.astype(str).astype(object)
    merge_domain = "domain_" + int_domains.clip(0, n_domains - 2).astype(str).astype(object)

    adatas = []

    var_names = np.array(
        TRUE_GENE_NAMES[:n_vars] if n_vars <= len(TRUE_GENE_NAMES) else [f"g{i}" for i in range(n_vars)]
    )

    domains_shift = np.random.exponential(domain_shift_lambda, size=(n_domains, n_vars))

    for panel_index in range(n_panels):
        adatas_panel = []
        panel_shift = np.random.exponential(panel_shift_lambda, size=n_vars)

        for slide_index in range(n_slides_per_panel):
            slide_shift = np.random.exponential(slide_shift_lambda, size=n_vars)

            merge = merge_last_domain_even_slide and (slide_index % 2 == 0)

            adata = AnnData(
                np.zeros((n_obs, n_vars)),
                obsm={"spatial": spatial + panel_index + slide_index},  # ensure the locs are different
                obs=pd.DataFrame(
                    {"domain": merge_domain if merge else domain}, index=[f"cell_{i}" for i in range(spatial.shape[0])]
                ),
            )

            adata.var_names = var_names
            adata.obs_names = [f"c_{panel_index}_{slide_index}_{i}" for i in range(adata.n_obs)]

            slide_key = f"slide_{panel_index}_{slide_index}" if slide_ids_unique else f"slide_{slide_index}"
            adata.obs["slide_key"] = slide_key

            for i in range(n_domains):
                condition = adata.obs["domain"] == "domain_" + str(i)
                n_obs_domain = condition.sum()

                lambdas = domains_shift[i] + slide_shift + panel_shift
                X_domain = np.random.exponential(lambdas, size=(n_obs_domain, n_vars))
                adata.X[condition] = X_domain.astype(int)  # values should look like log1p values

            if n_drop:
                size = n_vars - n_drop - panel_index  # different number of genes
                var_indices = np.random.choice(n_vars, size=size, replace=False)
                adata = adata[:, var_indices].copy()

            adatas_panel.append(adata[: -1 - panel_index - slide_index].copy())  # different number of cells

        adata_panel = anndata.concat(adatas_panel)

        if compute_spatial_neighbors:
            spatial_neighbors(adata_panel, slide_key="slide_key")
            _drop_neighbors(adata_panel, index=3)  # ensure one node is not connected to any other

        adata_panel.layers["counts"] = adata_panel.X.copy()
        sc.pp.normalize_total(adata_panel)
        sc.pp.log1p(adata_panel)

        adatas.append(adata_panel)

    return adatas