zhiqing0205
Add core libraries: anomalib, dinov2, open_clip_local
3de7bf6
"""Random Sparse Projector.
Sparse Random Projection using PyTorch Operations
"""
# Copyright (C) 2022-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import numpy as np
import torch
from sklearn.utils.random import sample_without_replacement
class NotFittedError(ValueError, AttributeError):
"""Raise Exception if estimator is used before fitting."""
class SparseRandomProjection:
"""Sparse Random Projection using PyTorch operations.
Args:
eps (float, optional): Minimum distortion rate parameter for calculating
Johnson-Lindenstrauss minimum dimensions.
Defaults to ``0.1``.
random_state (int | None, optional): Uses the seed to set the random
state for sample_without_replacement function.
Defaults to ``None``.
Example:
To fit and transform the embedding tensor, use the following code:
.. code-block:: python
import torch
from anomalib.models.components import SparseRandomProjection
sparse_embedding = torch.rand(1000, 5).cuda()
model = SparseRandomProjection(eps=0.1)
Fit the model and transform the embedding tensor:
.. code-block:: python
model.fit(sparse_embedding)
projected_embedding = model.transform(sparse_embedding)
print(projected_embedding.shape)
# Output: torch.Size([1000, 5920])
"""
def __init__(self, eps: float = 0.1, random_state: int | None = None) -> None:
self.n_components: int
self.sparse_random_matrix: torch.Tensor
self.eps = eps
self.random_state = random_state
def _sparse_random_matrix(self, n_features: int) -> torch.Tensor:
"""Random sparse matrix. Based on https://web.stanford.edu/~hastie/Papers/Ping/KDD06_rp.pdf.
Args:
n_features (int): Dimentionality of the original source space
Returns:
Tensor: Sparse matrix of shape (n_components, n_features).
The generated Gaussian random matrix is in CSR (compressed sparse row)
format.
"""
# Density 'auto'. Factorize density
density = 1 / np.sqrt(n_features)
if density == 1:
# skip index generation if totally dense
binomial = torch.distributions.Binomial(total_count=1, probs=0.5)
components = binomial.sample((self.n_components, n_features)) * 2 - 1
components = 1 / np.sqrt(self.n_components) * components
else:
# Sparse matrix is not being generated here as it is stored as dense anyways
components = torch.zeros((self.n_components, n_features), dtype=torch.float32)
for i in range(self.n_components):
# find the indices of the non-zero components for row i
nnz_idx = torch.distributions.Binomial(total_count=n_features, probs=density).sample()
# get nnz_idx column indices
# pylint: disable=not-callable
c_idx = torch.tensor(
sample_without_replacement(
n_population=n_features,
n_samples=nnz_idx,
random_state=self.random_state,
),
dtype=torch.int32,
)
data = torch.distributions.Binomial(total_count=1, probs=0.5).sample(sample_shape=c_idx.size()) * 2 - 1
# assign data to only those columns
components[i, c_idx] = data
components *= np.sqrt(1 / density) / np.sqrt(self.n_components)
return components
def _johnson_lindenstrauss_min_dim(self, n_samples: int, eps: float = 0.1) -> int | np.integer:
"""Find a 'safe' number of components to randomly project to.
Ref eqn 2.1 https://cseweb.ucsd.edu/~dasgupta/papers/jl.pdf
Args:
n_samples (int): Number of samples used to compute safe components
eps (float, optional): Minimum distortion rate. Defaults to 0.1.
"""
denominator = (eps**2 / 2) - (eps**3 / 3)
return (4 * np.log(n_samples) / denominator).astype(np.int64)
def fit(self, embedding: torch.Tensor) -> "SparseRandomProjection":
"""Generate sparse matrix from the embedding tensor.
Args:
embedding (torch.Tensor): embedding tensor for generating embedding
Returns:
(SparseRandomProjection): Return self to be used as
>>> model = SparseRandomProjection()
>>> model = model.fit()
"""
n_samples, n_features = embedding.shape
device = embedding.device
self.n_components = self._johnson_lindenstrauss_min_dim(n_samples=n_samples, eps=self.eps)
# Generate projection matrix
# torch can't multiply directly on sparse matrix and moving sparse matrix to cuda throws error
# (Could not run 'aten::empty_strided' with arguments from the 'SparseCsrCUDA' backend)
# hence sparse matrix is stored as a dense matrix on the device
self.sparse_random_matrix = self._sparse_random_matrix(n_features=n_features).to(device)
return self
def transform(self, embedding: torch.Tensor) -> torch.Tensor:
"""Project the data by using matrix product with the random matrix.
Args:
embedding (torch.Tensor): Embedding of shape (n_samples, n_features)
The input data to project into a smaller dimensional space
Returns:
projected_embedding (torch.Tensor): Sparse matrix of shape
(n_samples, n_components) Projected array.
Example:
>>> projected_embedding = model.transform(embedding)
>>> projected_embedding.shape
torch.Size([1000, 5920])
"""
if self.sparse_random_matrix is None:
msg = "`fit()` has not been called on SparseRandomProjection yet."
raise NotFittedError(msg)
return embedding @ self.sparse_random_matrix.T.float()