Skip to content

Data

Loading datasets

novae.load_dataset(pattern=None, tissue=None, species=None, technology=None, custom_filter=None, top_k=None, dry_run=False)

Automatically load slides from the Novae dataset repository.

Selecting slides

The function arguments allow to filter the slides based on the tissue, species, and name pattern. Internally, the function reads this dataset metadata file to select the slides that match the provided filters.

Parameters:

Name Type Description Default
pattern str | None

Optional pattern to match the slides names.

None
tissue list[str] | str | None

Optional tissue (or tissue list) to filter the slides. E.g., "brain", "colon".

None
species list[str] | str | None

Optional species (or species list) to filter the slides. E.g., "human", "mouse".

None
technology list[str] | str | None

Optional technology (or technology list) to filter the slides. E.g., "xenium", or "visium_hd".

None
custom_filter Callable[[DataFrame], Series] | None

Custom filter function that takes the metadata DataFrame (see above link) and returns a boolean Series to decide which rows should be kept.

None
top_k int | None

Optional number of slides to keep. If None, keeps all slides.

None
dry_run bool

If True, the function will only return the metadata of slides that match the filters.

False

Returns:

Type Description
list[AnnData]

A list of AnnData objects, each object corresponds to one slide.

Source code in novae/data/_load/_hf.py
def load_dataset(
    pattern: str | None = None,
    tissue: list[str] | str | None = None,
    species: list[str] | str | None = None,
    technology: list[str] | str | None = None,
    custom_filter: Callable[[pd.DataFrame], pd.Series] | None = None,
    top_k: int | None = None,
    dry_run: bool = False,
) -> list[AnnData]:
    """Automatically load slides from the Novae dataset repository.

    !!! info "Selecting slides"
        The function arguments allow to filter the slides based on the tissue, species, and name pattern.
        Internally, the function reads [this dataset metadata file](https://huggingface.co/datasets/MICS-Lab/novae/blob/main/metadata.csv) to select the slides that match the provided filters.

    Args:
        pattern: Optional pattern to match the slides names.
        tissue: Optional tissue (or tissue list) to filter the slides. E.g., `"brain", "colon"`.
        species: Optional species (or species list) to filter the slides. E.g., `"human", "mouse"`.
        technology: Optional technology (or technology list) to filter the slides. E.g., `"xenium", or "visium_hd"`.
        custom_filter: Custom filter function that takes the metadata DataFrame (see above link) and returns a boolean Series to decide which rows should be kept.
        top_k: Optional number of slides to keep. If `None`, keeps all slides.
        dry_run: If `True`, the function will only return the metadata of slides that match the filters.

    Returns:
        A list of `AnnData` objects, each object corresponds to one slide.
    """
    metadata = pd.read_csv("hf://datasets/MICS-Lab/novae/metadata.csv", index_col=0)

    FILTER_COLUMN = [("species", species), ("tissue", tissue), ("technology", technology)]
    VALID_VALUES = {column: metadata[column].unique() for column, _ in FILTER_COLUMN}

    for column, value in FILTER_COLUMN:
        if value is not None:
            values = [value] if isinstance(value, str) else value
            valid_values = VALID_VALUES[column]

            assert all(value in valid_values for value in values), (
                f"Found invalid {column} value in {values}. Valid values are {valid_values}."
            )

            metadata = metadata[metadata[column].isin(values)]

    if custom_filter is not None:
        metadata = metadata[custom_filter(metadata)]

    assert not metadata.empty, "No dataset found for the provided filters."

    if pattern is not None:
        where = metadata.index.str.match(pattern)
        assert len(where), f"No dataset found for the provided pattern ({', '.join(list(metadata.index))})."
        metadata = metadata[where]

    assert not metadata.empty, "No dataset found for the provided filters."

    if top_k is not None:
        metadata = metadata.head(top_k)

    if dry_run:
        return metadata

    log.info(f"Found {len(metadata)} h5ad file(s) matching the filters.")
    return [_read_h5ad_from_hub(name, row) for name, row in metadata.iterrows()]

novae.toy_dataset(n_panels=3, n_domains=4, n_slides_per_panel=1, xmax=500, n_vars=100, n_drop=20, step=20, panel_shift_lambda=5, slide_shift_lambda=1.5, domain_shift_lambda=2.0, slide_ids_unique=True, compute_spatial_neighbors=False, merge_last_domain_even_slide=False)

Creates a toy dataset, useful for debugging or testing.

Parameters:

Name Type Description Default
n_panels int

Number of panels. Each panel will correspond to one output AnnData object.

3
n_domains int

Number of domains.

4
n_slides_per_panel int

Number of slides per panel.

1
xmax int

Maximum value for the spatial coordinates (the larger, the more cells).

500
n_vars int

Maxmium number of genes per panel.

100
n_drop int

Number of genes that are randomly removed for each AnnData object. It will create non-identical panels.

20
step int

Step between cells in their spatial coordinates.

20
panel_shift_lambda float

Lambda used in the exponential law for each panel.

5
slide_shift_lambda float

Lambda used in the exponential law for each slide.

1.5
domain_shift_lambda float

Lambda used in the exponential law for each domain.

2.0
slide_ids_unique bool

Whether to ensure that slide ids are unique.

True
compute_spatial_neighbors bool

Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

False

Returns:

Type Description
list[AnnData]

A list of AnnData objects representing a valid Novae dataset.

Source code in novae/data/_load/_toy.py
def toy_dataset(
    n_panels: int = 3,
    n_domains: int = 4,
    n_slides_per_panel: int = 1,
    xmax: int = 500,
    n_vars: int = 100,
    n_drop: int = 20,
    step: int = 20,
    panel_shift_lambda: float = 5,
    slide_shift_lambda: float = 1.5,
    domain_shift_lambda: float = 2.0,
    slide_ids_unique: bool = True,
    compute_spatial_neighbors: bool = False,
    merge_last_domain_even_slide: bool = False,
) -> list[AnnData]:
    """Creates a toy dataset, useful for debugging or testing.

    Args:
        n_panels: Number of panels. Each panel will correspond to one output `AnnData` object.
        n_domains: Number of domains.
        n_slides_per_panel: Number of slides per panel.
        xmax: Maximum value for the spatial coordinates (the larger, the more cells).
        n_vars: Maxmium number of genes per panel.
        n_drop: Number of genes that are randomly removed for each `AnnData` object. It will create non-identical panels.
        step: Step between cells in their spatial coordinates.
        panel_shift_lambda: Lambda used in the exponential law for each panel.
        slide_shift_lambda: Lambda used in the exponential law for each slide.
        domain_shift_lambda: Lambda used in the exponential law for each domain.
        slide_ids_unique: Whether to ensure that slide ids are unique.
        compute_spatial_neighbors: Whether to compute the spatial neighbors graph. We remove some the edges of one node for testing purposes.

    Returns:
        A list of `AnnData` objects representing a valid `Novae` dataset.
    """
    assert n_vars - n_drop - n_panels > 2

    spatial = np.mgrid[-xmax:xmax:step, -xmax:xmax:step].reshape(2, -1).T
    spatial = spatial[(spatial**2).sum(1) <= xmax**2]
    n_obs = len(spatial)

    int_domains = (np.sqrt((spatial**2).sum(1)) // (xmax / n_domains + 1e-8)).astype(int)
    domain = "domain_" + int_domains.astype(str).astype(object)
    merge_domain = "domain_" + int_domains.clip(0, n_domains - 2).astype(str).astype(object)

    adatas = []

    var_names = np.array(
        GENE_NAMES_SUBSET[:n_vars] if n_vars <= len(GENE_NAMES_SUBSET) else [f"g{i}" for i in range(n_vars)]
    )

    domains_shift = np.random.exponential(domain_shift_lambda, size=(n_domains, n_vars))

    for panel_index in range(n_panels):
        adatas_panel = []
        panel_shift = np.random.exponential(panel_shift_lambda, size=n_vars)

        for slide_index in range(n_slides_per_panel):
            slide_shift = np.random.exponential(slide_shift_lambda, size=n_vars)

            merge = merge_last_domain_even_slide and (slide_index % 2 == 0)

            adata = AnnData(
                np.zeros((n_obs, n_vars)),
                obsm={"spatial": spatial + panel_index + slide_index},  # ensure the locs are different
                obs=pd.DataFrame(
                    {"domain": merge_domain if merge else domain}, index=[f"cell_{i}" for i in range(spatial.shape[0])]
                ),
            )

            adata.var_names = var_names
            adata.obs_names = [f"c_{panel_index}_{slide_index}_{i}" for i in range(adata.n_obs)]

            slide_key = f"slide_{panel_index}_{slide_index}" if slide_ids_unique else f"slide_{slide_index}"
            adata.obs["slide_key"] = slide_key

            for i in range(n_domains):
                condition = adata.obs["domain"] == "domain_" + str(i)
                n_obs_domain = condition.sum()

                lambdas = domains_shift[i] + slide_shift + panel_shift
                X_domain = np.random.exponential(lambdas, size=(n_obs_domain, n_vars))
                adata.X[condition] = X_domain.astype(int)  # values should look like counts

            if n_drop:
                size = n_vars - n_drop - panel_index  # different number of genes
                var_indices = np.random.choice(n_vars, size=size, replace=False)
                adata = adata[:, var_indices].copy()

            adatas_panel.append(adata[: -1 - panel_index - slide_index].copy())  # different number of cells

        adata_panel = anndata.concat(adatas_panel)

        if compute_spatial_neighbors:
            spatial_neighbors(adata_panel, slide_key="slide_key")
            _drop_neighbors(adata_panel, index=3)  # ensure one node is not connected to any other

        adata_panel.layers["counts"] = adata_panel.X.copy()
        sc.pp.normalize_total(adata_panel)
        sc.pp.log1p(adata_panel)

        adatas.append(adata_panel)

    return adatas

Preprocessing

novae.quantile_scaling(adata, multiplier=5, quantile=0.2, per_slide=True)

Preprocess fluorescence data from adata.X using quantiles of expression. For each column X, we compute asinh(X / 5*Q(0.2, X)), and store them back.

Parameters:

Name Type Description Default
adata AnnData | list[AnnData]

An AnnData object, or a list of AnnData objects.

required
multiplier float

The multiplier for the quantile.

5
quantile float

The quantile to compute.

0.2
per_slide bool

Whether to compute the quantile per slide. If False, the quantile is computed for each AnnData object.

True
Source code in novae/data/preprocess.py
def quantile_scaling(
    adata: AnnData | list[AnnData],
    multiplier: float = 5,
    quantile: float = 0.2,
    per_slide: bool = True,
) -> pd.DataFrame:
    """Preprocess fluorescence data from `adata.X` using quantiles of expression.
    For each column `X`, we compute `asinh(X / 5*Q(0.2, X))`, and store them back.

    Args:
        adata: An `AnnData` object, or a list of `AnnData` objects.
        multiplier: The multiplier for the quantile.
        quantile: The quantile to compute.
        per_slide: Whether to compute the quantile per slide. If `False`, the quantile is computed for each `AnnData` object.
    """
    _check_has_slide_id(adata)

    if isinstance(adata, list):
        for adata_ in adata:
            quantile_scaling(adata_, multiplier, quantile, per_slide=per_slide)
        return

    if not per_slide:
        return _quantile_scaling(adata, multiplier, quantile)

    for adata_ in iter_slides(adata):
        _quantile_scaling(adata_, multiplier, quantile)

novae.compute_histo_embeddings(sdata, model='conch', table_key='table', patch_overlap_ratio=0.5, image_key=None, device=None, batch_size=32)

Compute histology embeddings for a given model on a grid of overlapping patches.

It will add a new AnnData object to the SpatialData object, containing the embeddings of the patches, and add a column in the cells table with the index of the closest patch.

Installation

This function requires the multimodal extra. You can install it with pip install novae[multimodal]. If you use the CONCH model (default), you also need to install the conch extra with pip install 'novae[multimodal,conch]'.

Parameters:

Name Type Description Default
sdata SpatialData

A SpatialData object containing the data.

required
model str | Callable

The model to use for computing embeddings. See the sopa documentation for more details.

'conch'
table_key str

Name of the AnnData object containing the cells.

'table'
patch_overlap_ratio float

Ratio of overlap between patches.

0.5
image_key str | None

Name of the histology image. If None, the function will try to find the image key automatically.

None
device str | None

Torch device to use for computation.

None
batch_size int

Mini-batch size for computation.

32
Source code in novae/data/_embeddings/_histo.py
def compute_histo_embeddings(
    sdata: "SpatialData",
    model: str | Callable = "conch",
    table_key: str = "table",
    patch_overlap_ratio: float = 0.5,
    image_key: str | None = None,
    device: str | None = None,
    batch_size: int = 32,
) -> None:
    """Compute histology embeddings for a given model on a grid of overlapping patches.

    It will add a new `AnnData` object to the `SpatialData` object, containing the embeddings of the patches, and
    add a column in the cells table with the index of the closest patch.

    !!! warning "Installation"
        This function requires the `multimodal` extra. You can install it with `pip install novae[multimodal]`. If you use the CONCH model (default), you also need to install the `conch` extra with `pip install 'novae[multimodal,conch]'`.

    Args:
        sdata: A `SpatialData` object containing the data.
        model: The model to use for computing embeddings. See the [sopa documentation](https://gustaveroussy.github.io/sopa/api/patches/#sopa.patches.compute_embeddings) for more details.
        table_key: Name of the `AnnData` object containing the cells.
        patch_overlap_ratio: Ratio of overlap between patches.
        image_key: Name of the histology image. If None, the function will try to find the image key automatically.
        device: Torch device to use for computation.
        batch_size: Mini-batch size for computation.
    """
    try:
        import sopa
        from sopa._constants import SopaAttrs, SopaKeys
        from spatialdata.models import get_table_keys
    except ImportError:
        raise ImportError(
            "Please install the multimodal extra via `pip install novae[multimodal]`.\nIf you want to use CONCH, also install the corresponding extra via `pip install 'novae[multimodal,conch]'`."
        )

    assert 0 <= patch_overlap_ratio < 1, "patch_overlap_ratio must be between 0 and 1"
    patch_overlap = int(patch_overlap_ratio * Nums.HE_PATCH_WIDTH)

    adata: AnnData = sdata[table_key]

    if image_key is None:
        image_key, _ = sopa.utils.get_spatial_element(
            sdata.images, sdata.attrs.get(SopaAttrs.TISSUE_SEGMENTATION, None), return_key=True
        )

    # align the cells boundaries to the image coordinate system
    shapes_key, _, instance_key = get_table_keys(adata)

    if not sdata.shapes[shapes_key].geometry.is_valid.all():
        log.warning(f"Found invalid geometries in `sdata['{shapes_key}']`. They will be fixed using `.make_valid()`.")
        sdata.shapes[shapes_key].geometry = sdata.shapes[shapes_key].make_valid()

    cells = sopa.utils.to_intrinsic(sdata, shapes_key, image_key)
    cells = cells.loc[adata.obs[instance_key]]

    key_added = sopa.patches.compute_embeddings(
        sdata,
        model,
        level=0,
        patch_width=Nums.HE_PATCH_WIDTH,
        patch_overlap=patch_overlap,
        image_key=image_key,
        device=device,
        batch_size=batch_size,
        roi_key=shapes_key,
        use_roi_centroids=True,
    )

    patches_centroids = sdata[SopaKeys.EMBEDDINGS_PATCHES].centroid
    indices, distances = patches_centroids.sindex.nearest(cells.centroid, return_all=False, return_distance=True)

    _quality_control_join(distances)

    adata.obs["embedding_key"] = key_added
    adata.obs["embedding_index"] = indices[1]

novae.compute_histo_pca(sdatas, n_components=50, table_key='table')

Run PCA on the histology embeddings associated to each cell (from the closest patch). The embedding is stored in adata.obsm["histo_embeddings"], where adata is the table of cell expression.

Info

You need to run novae.data.compute_histo_embeddings before running this function.

Parameters:

Name Type Description Default
sdatas Union[SpatialData, list[SpatialData]]

One or several SpatialData object(s).

required
n_components int

Number of components for the PCA.

50
table_key str

Name of the AnnData object containing the cells.

'table'
Source code in novae/data/_embeddings/_histo.py
def compute_histo_pca(
    sdatas: Union["SpatialData", list["SpatialData"]], n_components: int = 50, table_key: str = "table"
) -> None:
    """Run PCA on the histology embeddings associated to each cell (from the closest patch).
    The embedding is stored in `adata.obsm["histo_embeddings"]`, where `adata` is the table of cell expression.

    !!! info
        You need to run [novae.data.compute_histo_embeddings][] before running this function.

    Args:
        sdatas: One or several `SpatialData` object(s).
        n_components: Number of components for the PCA.
        table_key: Name of the `AnnData` object containing the cells.
    """
    from spatialdata import SpatialData

    if isinstance(sdatas, SpatialData):
        sdatas = [sdatas]

    def _histo_emb(sdata: "SpatialData") -> np.ndarray:
        _table: AnnData = sdata.tables[table_key]

        assert "embedding_key" in _table.obs, (
            "Could not find `embedding_key` in adata.obs. Did you run `novae.data.compute_histo_embeddings` first?"
        )
        embedding_key = _table.obs["embedding_key"].iloc[0]
        return sdata.tables[embedding_key].X

    X = np.concatenate([_histo_emb(sdata) for sdata in sdatas], axis=0)

    pca = PCA(n_components=n_components)
    pipeline = Pipeline([("pca", pca), ("scaler", StandardScaler())])

    pipeline.fit(X)

    for sdata in sdatas:
        adata: AnnData = sdata[table_key]
        adata.obsm[Keys.HISTO_EMBEDDINGS] = pipeline.transform(_histo_emb(sdata)[adata.obs["embedding_index"]])