#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 5/17/23 2:58 PM
# @Author : Chang Xu
# @File : utils.py
# @Email : changxu@nus.edu.sg
import torch
import numpy as np
import anndata
from scipy import sparse
import pandas as pd
import tqdm
import sys
from scipy import stats
from builtins import range
import scanpy as sc
import sklearn
from sklearn.neighbors import NearestNeighbors, KNeighborsRegressor
from sklearn.metrics.pairwise import euclidean_distances
from typing import Union, Callable, Any, Iterable, List, Optional
from sklearn.metrics import pairwise_distances
from torch_geometric.nn import knn_graph, radius_graph
import anndata
[docs]def make_one_hot(
labels: torch.LongTensor,
C=2,
) -> torch.FloatTensor:
"""
Converts an integer label torch.autograd.Variable to a one-hot Variable.
Parameters
----------
labels : torch.LongTensor or torch.cuda.LongTensor
[N, 1], where N is batch size.
Each value is an integer representing correct classification.
C : int
number of classes in labels.
Returns
-------
target : torch.FloatTensor or torch.cuda.FloatTensor
[N, C,], where C is class number. One-hot encoded.
"""
if labels.ndimension() < 2:
labels = labels.unsqueeze(1)
one_hot = torch.zeros(
[
labels.size(0),
C,
],
dtype=torch.float32,
device=labels.device,
)
target = one_hot.scatter_(1, labels, 1)
return target
[docs]def append_categorical_to_data(
X: Union[np.ndarray, sparse.csr.csr_matrix],
categorical: np.ndarray,
) -> (Union[np.ndarray, sparse.csr.csr_matrix], np.ndarray):
"""Convert `categorical` to a one-hot vector and append
this vector to each sample in `X`.
Parameters
----------
X : np.ndarray, sparse.csr.csr_matrix
[Cells, Features]
categorical : np.ndarray
[Cells,]
Returns
-------
Xa : np.ndarray
[Cells, Features + N_Categories]
categories : np.ndarray
[N_Categories,] str category descriptors.
"""
# `pd.Categorical(xyz).codes` are int values for each unique
# level in the vector `xyz`
labels = pd.Categorical(categorical)
idx = np.array(labels.codes)
idx = torch.from_numpy(idx.astype("int32")).long()
categories = np.array(labels.categories)
one_hot_mat = make_one_hot(
idx,
C=len(categories),
)
one_hot_mat = one_hot_mat.numpy()
assert X.shape[0] == one_hot_mat.shape[0], "dims unequal at %d, %d" % (
X.shape[0],
one_hot_mat.shape[0],
)
# append one hot vector to the [Cells, Features] matrix
if sparse.issparse(X):
X = sparse.hstack([X, one_hot_mat])
else:
X = np.concatenate([X, one_hot_mat], axis=1)
return X, categories
[docs]def get_adata_asarray(
adata: anndata.AnnData,
) -> Union[np.ndarray, sparse.csr.csr_matrix]:
"""Get the gene expression matrix `.X` of an
AnnData object as an array rather than a view.
Parameters
----------
adata : anndata.AnnData
[Cells, Genes] AnnData experiment.
Returns
-------
X : np.ndarray, sparse.csr.csr_matrix
[Cells, Genes] `.X` attribute as an array
in memory.
Notes
-----
Returned `X` will match the type of `adata.X` view.
"""
if sparse.issparse(adata.X):
X = sparse.csr.csr_matrix(adata.X)
else:
X = np.array(adata.X)
return X
[docs]def build_classification_matrix(
X: Union[np.ndarray, sparse.csr.csr_matrix],
model_genes: np.ndarray,
sample_genes: np.ndarray,
gene_batch_size: int = 512,
) -> Union[np.ndarray, sparse.csr.csr_matrix]:
"""
Build a matrix for classification using only genes that overlap
between the current sample and the pre-trained model.
Parameters
----------
X : np.ndarray, sparse.csr_matrix
[Cells, Genes] count matrix.
model_genes : np.ndarray
gene identifiers in the order expected by the model.
sample_genes : np.ndarray
gene identifiers for the current sample.
gene_batch_size : int
number of genes to copy between arrays per batch.
controls a speed vs. memory trade-off.
Returns
-------
N : np.ndarray, sparse.csr_matrix
[Cells, len(model_genes)] count matrix.
Values where a model gene was not present in the sample are left
as zeros. `type(N)` will match `type(X)`.
"""
# check types
if type(X) not in (np.ndarray, sparse.csr.csr_matrix):
msg = f"X is type {type(X)}, must `np.ndarray` or `sparse.csr_matrix`"
raise TypeError(msg)
n_cells = X.shape[0]
# check if gene names already match exactly
if len(model_genes) == len(sample_genes):
if np.all(model_genes == sample_genes):
print("Gene names match exactly, returning input.")
return X
# instantiate a new [Cells, model_genes] matrix where columns
# retain the order used during training
if type(X) == np.ndarray:
N = np.zeros((n_cells, len(model_genes)))
else:
# use sparse matrices if the input is sparse
N = sparse.lil_matrix(
(
n_cells,
len(model_genes),
)
)
# map gene indices from the model to the sample genes
model_genes_indices = []
sample_genes_indices = []
common_genes = 0
for i, g in tqdm.tqdm(enumerate(sample_genes), desc="mapping genes"):
if np.sum(g == model_genes) > 0:
model_genes_indices.append(int(np.where(g == model_genes)[0]))
sample_genes_indices.append(
i,
)
common_genes += 1
# copy the data in batches to the new array to avoid memory overflows
gene_idx = 0
n_batches = int(np.ceil(N.shape[1] / gene_batch_size))
for b in tqdm.tqdm(range(n_batches), desc="copying gene batches"):
model_batch_idx = model_genes_indices[gene_idx : gene_idx + gene_batch_size]
sample_batch_idx = sample_genes_indices[gene_idx : gene_idx + gene_batch_size]
N[:, model_batch_idx] = X[:, sample_batch_idx]
gene_idx += gene_batch_size
if sparse.issparse(N):
# convert to `csr` from `csc`
N = sparse.csr_matrix(N)
print("Found %d common genes." % common_genes)
return N
[docs]def knn_smooth_pred_class(
X: np.ndarray,
pred_class: np.ndarray,
grouping: np.ndarray = None,
k: int = 15,
) -> np.ndarray:
"""
Smooths class predictions by taking the modal class from each cell's
nearest neighbors.
Parameters
----------
X : np.ndarray
[N, Features] embedding space for calculation of nearest neighbors.
pred_class : np.ndarray
[N,] array of unique class labels.
groupings : np.ndarray
[N,] unique grouping labels for i.e. clusters.
if provided, only considers nearest neighbors *within the cluster*.
k : int
number of nearest neighbors to use for smoothing.
Returns
-------
smooth_pred_class : np.ndarray
[N,] unique class labels, smoothed by kNN.
Examples
--------
>>> smooth_pred_class = knn_smooth_pred_class(
... X = X,
... pred_class = raw_predicted_classes,
... grouping = louvain_cluster_groups,
... k = 15,)
Notes
-----
scNym classifiers do not incorporate neighborhood information.
By using a simple kNN smoothing heuristic, we can leverage neighborhood
information to improve classification performance, smoothing out cells
that have an outlier prediction relative to their local neighborhood.
"""
if grouping is None:
# do not use a grouping to restrict local neighborhood
# associations, create a universal pseudogroup `0`.
grouping = np.zeros(X.shape[0])
smooth_pred_class = np.zeros_like(pred_class)
for group in np.unique(grouping):
# identify only cells in the relevant group
group_idx = np.where(grouping == group)[0].astype("int")
X_group = X[grouping == group, :]
# if there are < k cells in the group, change `k` to the
# group size
if X_group.shape[0] < k:
k_use = X_group.shape[0]
else:
k_use = k
# compute a nearest neighbor graph and identify kNN
nns = NearestNeighbors(
n_neighbors=k_use,
).fit(X_group)
dist, idx = nns.kneighbors(X_group)
# for each cell in the group, assign a class as
# the majority class of the kNN
for i in range(X_group.shape[0]):
classes = pred_class[group_idx[idx[i, :]]]
uniq_classes, counts = np.unique(classes, return_counts=True)
maj_class = uniq_classes[int(np.argmax(counts))]
smooth_pred_class[group_idx[i]] = maj_class
return smooth_pred_class
[docs]def knn_smooth_pred_class_prob(
X: np.ndarray,
pred_probs: np.ndarray,
names: np.ndarray,
grouping: np.ndarray = None,
k: Union[Callable, int] = 15,
dm: np.ndarray = None,
**kwargs,
) -> np.ndarray:
"""
Smooths class predictions by taking the modal class from each cell's
nearest neighbors.
Parameters
----------
X : np.ndarray
[N, Features] embedding space for calculation of nearest neighbors.
pred_probs : np.ndarray
[N, C] array of class prediction probabilities.
names : np.ndarray,
[C,] names of predicted classes in `pred_probs`.
groupings : np.ndarray
[N,] unique grouping labels for i.e. clusters.
if provided, only considers nearest neighbors *within the cluster*.
k : int
number of nearest neighbors to use for smoothing.
dm : np.ndarray, optional
[N, N] distance matrix for setting the RBF kernel parameter.
speeds computation if pre-computed.
Returns
-------
smooth_pred_class : np.ndarray
[N,] unique class labels, smoothed by kNN.
Examples
--------
>>> smooth_pred_class = knn_smooth_pred_class_prob(
... X = X,
... pred_probs = predicted_class_probs,
... grouping = louvain_cluster_groups,
... k = 15,)
Notes
-----
scNym classifiers do not incorporate neighborhood information.
By using a simple kNN smoothing heuristic, we can leverage neighborhood
information to improve classification performance, smoothing out cells
that have an outlier prediction relative to their local neighborhood.
"""
if grouping is None:
# do not use a grouping to restrict local neighborhood
# associations, create a universal pseudogroup `0`.
grouping = np.zeros(X.shape[0])
smooth_pred_probs = np.zeros_like(pred_probs)
smooth_pred_class = np.zeros(pred_probs.shape[0], dtype="object")
for group in np.unique(grouping):
# identify only cells in the relevant group
group_idx = np.where(grouping == group)[0].astype("int")
X_group = X[grouping == group, :]
y_group = pred_probs[grouping == group, :]
# if k is a Callable, use it to define k for this group
if callable(k):
k_use = k(X_group.shape[0])
else:
k_use = k
# if there are < k cells in the group, change `k` to the
# group size
if X_group.shape[0] < k_use:
k_use = X_group.shape[0]
# set up weights using a radial basis function kernel
rbf = RBFWeight()
rbf.set_alpha(
X=X_group,
n_max=None,
dm=dm,
)
if "dm" in kwargs:
del kwargs["dm"]
# fit a nearest neighbor regressor
nns = KNeighborsRegressor(
n_neighbors=k_use,
weights=rbf,
**kwargs,
).fit(X_group, y_group)
smoothed_probs = nns.predict(X_group)
smooth_pred_probs[group_idx, :] = smoothed_probs
g_classes = names[np.argmax(smoothed_probs, axis=1)]
smooth_pred_class[group_idx] = g_classes
return smooth_pred_class
[docs]def argmax_pred_class(
grouping: np.ndarray,
prediction: np.ndarray,
):
"""Assign class to elements in groups based on the
most common predicted class for that group.
Parameters
----------
grouping : np.ndarray
[N,] partition values defining groups to be classified.
prediction : np.ndarray
[N,] predicted values for each element in `grouping`.
Returns
-------
assigned_classes : np.ndarray
[N,] class labels based on the most common class assigned
to elements in the group partition.
Examples
--------
>>> grouping = np.array([0,0,0,1,1,1,2,2,2,2])
>>> prediction = np.array(['A','A','A','B','A','B','C','A','B','C'])
>>> argmax_pred_class(grouping, prediction)
np.ndarray(['A','A','A','B','B','B','C','C','C','C',])
Notes
-----
scNym classifiers do not incorporate neighborhood information.
This simple heuristic leverages cluster information obtained by
an orthogonal method and assigns all cells in a given cluster
the majority class label within that cluster.
"""
assert (
grouping.shape[0] == prediction.shape[0]
), "`grouping` and `prediction` must be the same length"
groups = sorted(list(set(grouping.tolist())))
assigned_classes = np.zeros(grouping.shape[0], dtype="object")
for i, group in enumerate(groups):
classes, counts = np.unique(prediction[grouping == group], return_counts=True)
majority_class = classes[np.argmax(counts)]
assigned_classes[grouping == group] = majority_class
return assigned_classes
[docs]def compute_entropy_of_mixing(
X: np.ndarray,
y: np.ndarray,
n_neighbors: int,
n_iters: int = None,
**kwargs,
) -> np.ndarray:
"""Compute the entropy of mixing among groups given
a distance matrix.
Parameters
----------
X : np.ndarray
[N, P] feature matrix.
y : np.ndarray
[N,] group labels.
n_neighbors : int
number of nearest neighbors to draw for each iteration
of the entropy computation.
n_iters : int
number of iterations to perform.
if `n_iters is None`, uses every point.
Returns
-------
entropy_of_mixing : np.ndarray
[n_iters,] entropy values for each iteration.
Notes
-----
The entropy of batch mixing is computed by sampling `n_per_sample`
cells from a local neighborhood in the nearest neighbor graph
and contructing a probability vector based on their group membership.
The entropy of this probability vector is computed as a metric of
intermixing between groups.
If groups are more mixed, the probability vector will have higher
entropy, and vice-versa.
"""
# build nearest neighbor graph
n_neighbors = min(n_neighbors, X.shape[0])
nn = NearestNeighbors(
n_neighbors=n_neighbors,
metric="euclidean",
**kwargs,
)
nn.fit(X)
nn_idx = nn.kneighbors(return_distance=False)
# define query points
if n_iters is not None:
# don't duplicate points when sampling
n_iters = min(n_iters, X.shape[0])
if (n_iters is None) or (n_iters == X.shape[0]):
# sample all points
query_points = np.arange(X.shape[0])
else:
# subset random query points for entropy
# computation
assert n_iters < X.shape[0]
query_points = np.random.choice(
X.shape[0],
size=n_iters,
replace=False,
)
entropy_of_mixing = np.zeros(len(query_points))
for i, ridx in enumerate(query_points):
# get the nearest neighbors of a point
nn_y = y[nn_idx[ridx, :]]
nn_y_p = np.zeros(len(np.unique(y)))
for j, v in enumerate(np.unique(y)):
nn_y_p[j] = sum(nn_y == v)
nn_y_p = nn_y_p / nn_y_p.sum()
# use base 2 to return values in bits rather
# than the default nats
H = stats.entropy(nn_y_p)
entropy_of_mixing[i] = H
return entropy_of_mixing
[docs]def pp_adatas(
adata_sc,
adata_sp,
genes = None,
gene_to_lowercase = True
):
"""
Pre-process AnnDatas so that they can be mapped. Specifically:
- Remove genes that all entries are zero
- Find the intersection between adata_sc, adata_sp and given marker gene list, save the intersected markers in two adatas
- Calculate density priors and save it with adata_sp
Args:
adata_sc (AnnData): single cell data
adata_sp (AnnData): spatial expression data
genes (List): Optional. List of genes to use. If `None`, all genes are used.
Returns:
update adata_sc by creating `uns` `training_genes` `overlap_genes` fields
update adata_sp by creating `uns` `training_genes` `overlap_genes` fields and creating `obs` `rna_count_based_density` & `uniform_density` field
"""
# remove all-zero-valued genes
sc.pp.filter_genes(adata_sc, min_cells=1)
sc.pp.filter_genes(adata_sp, min_cells=1)
if genes is None:
# Use all genes
genes = adata_sc.var.index
# put all var index to lower case to align
if gene_to_lowercase:
adata_sc.var.index = [g.lower() for g in adata_sc.var.index]
adata_sp.var.index = [g.lower() for g in adata_sp.var.index]
genes = list(g.lower() for g in genes)
adata_sc.var_names_make_unique()
adata_sp.var_names_make_unique()
# Refine `marker_genes` so that they are shared by both adatas
genes = list(set(genes) & set(adata_sc.var.index) & set(adata_sp.var.index))
# logging.info(f"{len(genes)} shared marker genes.")
adata_sc.uns["training_genes"] = genes
adata_sp.uns["training_genes"] = genes
logging.info(
"{} training genes are saved in `uns``training_genes` of both single cell and spatial Anndatas.".format(
len(genes)
)
)
# Find overlap genes between two AnnDatas
overlap_genes = list(set(adata_sc.var.index) & set(adata_sp.var.index))
# logging.info(f"{len(overlap_genes)} shared genes.")
adata_sc.uns["overlap_genes"] = overlap_genes
adata_sp.uns["overlap_genes"] = overlap_genes
logging.info(
"{} overlapped genes are saved in `uns``overlap_genes` of both single cell and spatial Anndatas.".format(
len(overlap_genes)
)
)
# Calculate uniform density prior as 1/number_of_spots
adata_sp.obs["uniform_density"] = np.ones(adata_sp.X.shape[0]) / adata_sp.X.shape[0]
logging.info(
f"uniform based density prior is calculated and saved in `obs``uniform_density` of the spatial Anndata."
)
# Calculate rna_count_based density prior as % of rna molecule count
rna_count_per_spot = np.array(adata_sp.X.sum(axis=1)).squeeze()
adata_sp.obs["rna_count_based_density"] = rna_count_per_spot / np.sum(rna_count_per_spot)
logging.info(
f"rna count based density prior is calculated and saved in `obs``rna_count_based_density` of the spatial Anndata."
)
[docs]class RBFWeight(object):
def __init__(
self,
alpha: float = None,
) -> None:
"""Generate a set of weights based on distances to a point
with a radial basis function kernel.
Parameters
----------
alpha : float
radial basis function parameter. inverse of sigma
for a standard Gaussian pdf.
Returns
-------
None.
"""
self.alpha = alpha
return
[docs] def set_alpha(
self,
X: np.ndarray,
n_max: int = None,
dm: np.ndarray = None,
) -> None:
"""Set the alpha parameter of a Gaussian RBF kernel
as the median distance between points in an array of
observations.
Parameters
----------
X : np.ndarray
[N, P] matrix of observations and features.
n_max : int
maximum number of observations to use for median
distance computation.
dm : np.ndarray, optional
[N, N] distance matrix for setting the RBF kernel parameter.
speeds computation if pre-computed.
Returns
-------
None. Sets `self.alpha`.
References
----------
A Kernel Two-Sample Test
Arthur Gretton, Karsten M. Borgwardt, Malte J. Rasch,
Bernhard Schölkopf, Alexander Smola.
JMLR, 13(Mar):723−773, 2012.
http://jmlr.csail.mit.edu/papers/v13/gretton12a.html
"""
if n_max is None:
n_max = X.shape[0]
if dm is None:
# compute a distance matrix from observations
if X.shape[0] > n_max:
ridx = np.random.choice(
X.shape[0],
size=n_max,
replace=False,
)
X_p = X[ridx, :]
else:
X_p = X
dm = euclidean_distances(
X_p,
)
upper = dm[np.triu_indices_from(dm, k=1)]
# overwrite_input = True saves memory by overwriting
# the upper indices in the distance matrix array during
# median computation
sigma = np.median(
upper,
overwrite_input=True,
)
self.alpha = 1.0 / (2 * (sigma ** 2))
return
def __call__(
self,
distances: np.ndarray,
) -> np.ndarray:
"""Generate a set of weights based on distances to a point
with a radial basis function kernel.
Parameters
----------
distances : np.ndarray
[N,] distances used to generate weights.
Returns
-------
weights : np.ndarray
[N,] weights from the radial basis function kernel.
Notes
-----
We weight distances with a Gaussian RBF.
.. math::
f(r) = \exp -(\alpha r)^2
"""
# check that alpha parameter is set
if self.alpha is None:
msg = "must set `alpha` attribute before computing weights.\n"
msg += "use `.set_alpha() method to estimate from data."
raise ValueError(msg)
# generate weights with an RBF kernel
weights = np.exp(-((self.alpha * distances) ** 2))
return weights
[docs]def adata_to_cluster_expression(
adata,
cluster_label,
scale=True,
add_density=True
):
"""
Convert an AnnData to a new AnnData with cluster expressions. Clusters are based on `cluster_label` in `adata.obs`.
The returned AnnData has an observation for each cluster, with the cluster-level expression equals to the average expression for that cluster.
All annotations in `adata.obs` except `cluster_label` are discarded in the returned AnnData.
Args:
adata (AnnData): single cell data
cluster_label (String): field in `adata.obs` used for aggregating values
scale (bool): Optional. Whether weight input single cell by # of cells in cluster. Default is True.
add_density (bool): Optional. If True, the normalized number of cells in each cluster is added to the returned AnnData as obs.cluster_density. Default is True.
Returns:
AnnData: aggregated single cell data
"""
try:
value_counts = adata.obs[cluster_label].value_counts(normalize=True)
except KeyError as e:
raise ValueError("Provided label must belong to adata.obs.")
unique_labels = value_counts.index
new_obs = pd.DataFrame({cluster_label: unique_labels})
adata_ret = sc.AnnData(obs=new_obs, var=adata.var, uns=adata.uns)
X_new = np.empty((len(unique_labels), adata.shape[1]))
for index, l in enumerate(unique_labels):
if not scale:
X_new[index] = adata[adata.obs[cluster_label] == l].X.mean(axis=0)
else:
X_new[index] = adata[adata.obs[cluster_label] == l].X.sum(axis=0)
adata_ret.X = X_new
if add_density:
adata_ret.obs["cluster_density"] = adata_ret.obs[cluster_label].map(
lambda i: value_counts[i]
)
return adata_ret
[docs]def get_multi_edge_index(
pos,
regions,
graph_methods = "knn",
n_neighbors = None,
n_radius = None,
):
# construct edge indexes when there is region information
if not isinstance(pos, np.ndarray) or not isinstance(regions, np.ndarray):
raise ValueError("pos and regions must be numpy arrays")
if pos.shape[0] != regions.shape[0]:
raise ValueError("pos and regions must have the same length")
if graph_methods not in ["knn", "radius"]:
raise ValueError("graph_methods must be either 'knn' or 'radius'")
if graph_methods == "knn" and (n_neighbors is None or n_neighbors <= 0):
raise ValueError("n_neighbors must be a positive integer for knn method")
if graph_methods == "radius" and (n_radius is None or n_radius <= 0):
raise ValueError("n_radius must be a positive value for radius method")
edge_list = []
regions_unique = np.unique(regions)
for reg in regions_unique:
locs = np.where(regions == reg)[0]
pos_region = pos[locs, :]
if graph_methods == "knn":
edge_index = knn_graph(torch.Tensor(pos_region),
k = n_neighbors,
batch = torch.LongTensor(np.zeros(pos_region.shape[0])),
loop = True)
elif graph_methods == "radius":
edge_index = radius_graph(torch.Tensor(pos_region),
r = n_radius,
batch = torch.LongTensor(np.zeros(pos_region.shape[0])),
loop = True)
for (i, j) in zip(edge_index[1].numpy(), edge_index[0].numpy()):
edge_list.append([locs[i], locs[j]])
return edge_list
[docs]def get_single_edge_index(
pos,
graph_methods = "knn",
n_neighbors = None,
n_radius = None,
):
# construct edge indexes in one region
if not isinstance(pos, np.ndarray):
raise ValueError("pos must be a numpy array")
if graph_methods not in ["knn", "radius"]:
raise ValueError("graph_methods must be either 'knn' or 'radius'")
if graph_methods == "knn" and (n_neighbors is None or n_neighbors <= 0):
raise ValueError("n_neighbors must be a positive integer for knn method")
if graph_methods == "radius" and (n_radius is None or n_radius <= 0):
raise ValueError("n_radius must be a positive value for radius method")
edge_list = []
if graph_methods == "knn":
edge_index = knn_graph(torch.Tensor(pos),
k=n_neighbors,
batch=torch.LongTensor(np.zeros(pos.shape[0])),
loop=False)
elif graph_methods == "radius":
edge_index = radius_graph(torch.Tensor(pos),
r=n_radius,
batch=torch.LongTensor(np.zeros(pos.shape[0])),
loop=False)
for (i, j) in zip(edge_index[1].numpy(), edge_index[0].numpy()):
edge_list.append([i, j])
return edge_list
[docs]def tfidf(X):
r"""
TF-IDF normalization (following the Seurat v3 approach)
Parameters
----------
X
Input matrix
Returns
-------
X_tfidf
TF-IDF normalized matrix
"""
idf = X.shape[0] / X.sum(axis=0)
if sparse.issparse(X):
tf = X.multiply(1 / X.sum(axis=1))
return tf.multiply(idf)
else:
tf = X / X.sum(axis=1, keepdims=True)
return tf * idf
[docs]def lsi(
adata: anndata.AnnData,
n_comps: int = 20,
use_highly_variable: Optional[bool] = None,
**kwargs,
) -> None:
r"""
LSI analysis (following the Seurat v3 approach)
Parameters
----------
adata
Input dataset
n_components
Number of dimensions to use
use_highly_variable
Whether to use highly variable features only, stored in
``adata.var['highly_variable']``. By default uses them if they
have been determined beforehand.
**kwargs
Additional keyword arguments are passed to
:func:`sklearn.utils.extmath.randomized_svd`
Returns
-------
adata : anndata.AnnData
The input AnnData object with LSI results stored in `adata.obsm["X_lsi"]`.
"""
if "random_state" not in kwargs:
kwargs["random_state"] = 0 # Keep deterministic as the default behavior
if use_highly_variable is None:
use_highly_variable = "highly_variable" in adata.var
adata_use = adata[:, adata.var["highly_variable"]] if use_highly_variable else adata
X = tfidf(adata_use.X)
X_norm = normalize(X, norm="l1")
X_norm = np.log1p(X_norm * 1e4)
X_lsi = sklearn.utils.extmath.randomized_svd(X_norm, n_comps, **kwargs)[0]
X_lsi -= X_lsi.mean(axis=1, keepdims=True)
X_lsi /= X_lsi.std(axis=1, ddof=1, keepdims=True)
adata.obsm["X_lsi"] = X_lsi
return adata
def _optimize_cluster(
adata,
resolution: list = list(np.arange(0.1, 2.5, 0.01)),
):
scores = []
for r in resolution:
sc.tl.leiden(adata, resolution=r)
s = calinski_harabasz_score(adata.X, adata.obs["leiden"])
scores.append(s)
cl_opt_df = pd.DataFrame({"resolution": resolution, "score": scores})
best_idx = np.argmax(cl_opt_df["score"])
res = cl_opt_df.iloc[best_idx, 0]
print("Best resolution: ", res)
return res
def _priori_cluster(
adata,
eval_cluster_n=7,
):
for res in sorted(list(np.arange(0.03, 2.5, 0.01)), reverse=True):
sc.tl.leiden(adata, random_state=0, resolution=res)
count_unique_leiden = len(pd.DataFrame(adata.obs['leiden']).leiden.unique())
if count_unique_leiden == eval_cluster_n:
break
print("Best resolution: ", res)
return res
[docs]def mclust_R(
adata,
num_cluster,
modelNames='EEE',
used_obsm='emb_pca',
random_seed=2020,
key_added = "mclust",
):
"""\
Clustering using the mclust algorithm.
The parameters are the same as those in the R package mclust.
"""
np.random.seed(random_seed)
import rpy2.robjects as robjects
robjects.r.library("mclust")
import rpy2.robjects.numpy2ri
rpy2.robjects.numpy2ri.activate()
r_random_seed = robjects.r['set.seed']
r_random_seed(random_seed)
rmclust = robjects.r['Mclust']
res = rmclust(rpy2.robjects.numpy2ri.numpy2rpy(adata.obsm[used_obsm]), num_cluster, modelNames)
mclust_res = np.array(res[-2])
adata.obs[key_added] = mclust_res
adata.obs[key_added] = adata.obs[key_added].astype('int')
adata.obs[key_added] = adata.obs[key_added].astype('category')
# def mclust_R(
# adata,
# num_cluster,
# modelNames='EEE',
# used_obsm='emb_pca',
# random_seed=2020,
# key_added="mclust"
# ):
# """\
# Clustering using the mclust algorithm.
# The parameters are the same as those in the R package mclust.
# """
# import numpy as np
# import rpy2.robjects as robjects
# from rpy2.robjects import r, pandas2ri, numpy2ri
# import pandas as pd
# # Set seed for reproducibility
# np.random.seed(random_seed)
# # Activate numpy2ri converter
# numpy2ri.activate()
# # Load mclust library in R
# r.library("mclust")
# # Set random seed in R
# r['set.seed'](random_seed)
# # Get the Mclust function from R
# rmclust = r['Mclust']
# # Check if used_obsm exists in adata
# if used_obsm not in adata.obsm:
# raise ValueError(f"{used_obsm} not found in adata.obsm")
# # Convert the data to R format
# pca_data = adata.obsm[used_obsm]
# r_pca_data = numpy2ri.numpy2rpy(pca_data)
# # Run Mclust
# try:
# res = rmclust(r_pca_data, num_cluster, modelNames)
# except Exception as e:
# raise RuntimeError(f"Mclust clustering failed: {e}")
# # Check if the result is NULL
# if res.rclass[0] == "NULL":
# raise ValueError("Mclust returned NULL, indicating the clustering did not succeed.")
# # Extract clustering results
# mclust_res = np.array(res[-2])
# # Store the results in adata.obs
# adata.obs[key_added] = pd.Categorical(mclust_res.astype('int'))