import numpy as np
from scipy.optimize import linear_sum_assignment
from sklearn.metrics import pairwise_distances, pairwise_distances_argmin_min
from scipy import linalg
from scipy.stats import multivariate_normal
from .base import BaseQuerySampler
from .version import check_modules
from .kmeans import IncrementalMiniBatchKMeans
from .uncertainty import MarginSampler
[docs]class KCentroidSampler(BaseQuerySampler):
""" KCentroid based query sampler.
In order to increase diversity, it is possible to use a centroid based
clustering to select samples.
Args:
clustering: A clustering algorithm matching the sklearn interface
batch_size: Number of samples to draw when predicting.
Attributes:
clustering_ : The fitted clustering estimator.
"""
def __init__(self, clustering, batch_size):
super().__init__(batch_size)
self.clustering_ = clustering
[docs] def fit(self, X, y=None) -> 'KCentroidSampler':
"""Does nothing, this method is unsupervised.
Args:
X: Labeled samples of shape (n_samples, n_features).
y: Labels of shape (n_samples).
Returns:
The object itself
"""
return self
[docs] def select_samples(self, X: np.array,
sample_weight: np.array = None) -> np.array:
"""Clusters the samples and select the ones closest to centroids.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
if self._not_enough_samples(X):
return np.arange(X.shape[0])
kwargs = dict(sample_weight=sample_weight) if (sample_weight is not None) else dict()
model = self.clustering_.fit(X, **kwargs)
distances = model.transform(X)
# Sometimes, one sample can be the closest to two centroids. In that
# case, we want to take the second closest one, etc.
# linear_sum_assignemnt solves this problem.
return linear_sum_assignment(distances)[0]
[docs]class KMeansSampler(KCentroidSampler):
"""Select samples as closest sample to KMeans centroids.
Args:
batch_size: Number of samples to draw when predicting.
"""
def __init__(self, batch_size, **kmeans_args):
check_modules('sklearn', 'clustering.KmeansSampler')
from sklearn.cluster import KMeans
if 'n_clusters' in kmeans_args:
raise ValueError(
'You have specified n_clusters={} when creating KMeansSampler.'
' This is not supported since n_clusters is overridden using '
'batch_size.'.format(kmeans_args['n_clusters']))
kmeans_args['n_clusters'] = batch_size
super().__init__(KMeans(**kmeans_args), batch_size)
[docs]class GMMLikelihoodSampler(BaseQuerySampler):
""" GM (Gaussian Mixture Models) based query sampler.
In order to increase diversity, it is possible to use maximum likelihood to select samples.
Args:
clustering: A clustering algorithm matching the sklearn interface
batch_size: Number of samples to draw when predicting.
Attributes:
clustering_ : The fitted clustering estimator.
"""
def __init__(self, clustering, batch_size):
super().__init__(batch_size)
self.clustering_ = clustering
[docs] def fit(self, X, y=None) -> 'GMMLikelihoodSampler':
"""Does nothing, this method is unsupervised.
Args:
X: Labeled samples of shape (n_samples, n_features).
y: Labels of shape (n_samples).
Returns:
The object itself
"""
return self
[docs] def select_samples(self, X: np.array) -> np.array:
"""Clusters the samples and select the ones with highest likelihood.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
if self._not_enough_samples(X):
return np.arange(X.shape[0])
gmm_model = self.clustering_.fit(X)
centers= np.empty(shape=(gmm_model.n_components,), dtype=int)
for i in range(gmm_model.n_components):
try:
density = multivariate_normal.logpdf(X, cov=gmm_model.covariances_[i],
mean=gmm_model.means_[i],
allow_singular=True)
except linalg.LinAlgError:
cov = gmm_model.covariances_[i]
approximated_cov = np.linalg.pinv(cov)
density = multivariate_normal.logpdf(X, cov=approximated_cov,
mean=gmm_model.means_[i],
allow_singular=True)
centers[i] = np.argmax(density)
return centers
[docs]class GMMSampler(GMMLikelihoodSampler):
"""Select samples as highest likelihood to GMM clusters.
Args:
batch_size: Number of samples to draw when predicting.
"""
def __init__(self, batch_size, **gmm_args):
check_modules('sklearn', 'clustering.GMMSampler')
from sklearn.mixture import GaussianMixture
if 'n_clusters' in gmm_args:
raise ValueError(
'You have specified n_clusters={} when creating GMMSampler.'
' This is not supported since n_clusters is overridden using '
'batch_size.'.format(gmm_args['n_clusters']))
gmm_args['n_components'] = batch_size
super().__init__(GaussianMixture(**gmm_args), batch_size)
[docs]class TwoStepGMMSampler:
"""GMM sampler using a margin uncertainty sampler as preselector
"""
def __init__(self, beta: int, classifier, batch_size: int,
assume_fitted: bool = False, verbose: int = 0, **gmm_args):
self.sampler_list = [
MarginSampler(classifier, beta * batch_size, strategy='top',
assume_fitted=assume_fitted, verbose=verbose),
GMMSampler(batch_size, **gmm_args)
]
[docs] def fit(self, X: np.array, y: np.array = None) -> 'TwoStepGMMSampler':
"""Fits the first query sampler
Args:
X: Labeled samples of shape [n_samples, n_features].
y: Labels of shape [n_samples].
Returns:
The object itself
"""
for sampler in self.sampler_list:
sampler.fit(X, y)
return self
[docs] def select_samples(self, X: np.array) -> np.array:
"""Selects the using uncertainty preselection and KMeans sampler.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
selected = self.sampler_list[0].select_samples(X)
kwargs = dict()
new_selected = self.sampler_list[1].select_samples(
X[selected], **kwargs)
selected = selected[new_selected]
return selected
[docs]class MiniBatchKMeansSampler(KCentroidSampler):
"""Select samples as closest sample to MiniBatchKMeans centroids.
Args:
batch_size: Number of samples to draw when predicting.
"""
def __init__(self, batch_size, **kmeans_args):
check_modules('sklearn', 'clustering.MiniBatchKmeansSampler')
from sklearn.cluster import MiniBatchKMeans
if 'n_clusters' in kmeans_args:
raise ValueError(
'You have specified n_clusters={} when creating '
'MiniBatchKMeansSampler. This is not supported since'
'n_clusters is overridden using '
'batch_size.'.format(kmeans_args['n_clusters']))
kmeans_args['n_clusters'] = batch_size
super().__init__(MiniBatchKMeans(**kmeans_args), batch_size)
[docs]class IncrementalMiniBatchKMeansSampler(KCentroidSampler):
"""Select samples as closest sample to MiniBatchKMeans centroids.
Args:
batch_size: Number of samples to draw when predicting.
"""
def __init__(self, batch_size, **kmeans_args):
if 'n_clusters' in kmeans_args:
raise ValueError(
'You have specified n_clusters={} when creating '
'MiniBatchKMeansSampler. This is not supported since'
'n_clusters is overridden using '
'batch_size.'.format(kmeans_args['n_clusters']))
kmeans_args['n_clusters'] = batch_size
super().__init__(IncrementalMiniBatchKMeans(**kmeans_args), batch_size)
self.fixed_cluster_centers = None
[docs] def fit(self, X, y=None) -> 'KCentroidSampler':
"""Does nothing, this method is unsupervised.
Args:
X: Labeled samples of shape (n_samples, n_features).
y: Labels of shape (n_samples).
Returns:
The object itself
"""
self.fixed_cluster_centers = X
return self
[docs] def select_samples(self, X: np.array,
sample_weight: np.array = None,
recenter_every=None) -> np.array:
"""Clusters the samples and select the ones closest to centroids.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
if self._not_enough_samples(X):
return np.arange(X.shape[0])
kwargs = dict()
n_fixed_clusters = 0
if sample_weight is not None:
kwargs['sample_weight'] = sample_weight
if self.fixed_cluster_centers is not None:
kwargs['fixed_cluster_centers'] = self.fixed_cluster_centers
n_fixed_clusters = self.fixed_cluster_centers.shape[0]
if recenter_every is not None:
kwargs['recenter_every'] = recenter_every
self.clustering_.n_clusters = self.batch_size + n_fixed_clusters
model = self.clustering_.fit(X, **kwargs)
distances = model.transform(X)[:, n_fixed_clusters:]
# Sometimes, one sample can be the closest to two centroids. In that
# case, we want to take the second closest one, etc.
# linear_sum_assignemnt solves this problem.
return linear_sum_assignment(distances)[0]
[docs]class TwoStepKCentroidSampler(BaseQuerySampler):
"""KMeans sampler using a margin uncertainty sampler as preselector
"""
def __init__(self, kcentroid_sampler, beta: int, classifier, batch_size: int,
assume_fitted: bool = False, verbose: int = 0, **kmeans_args):
super().__init__(batch_size)
self.sampler_list = [
MarginSampler(classifier, beta * batch_size, strategy='top',
assume_fitted=assume_fitted, verbose=verbose),
kcentroid_sampler(batch_size, **kmeans_args)
]
[docs] def fit(self, X: np.array, y: np.array = None) -> 'TwoStepKMeansSampler':
"""Fits the first query sampler
Args:
X: Labeled samples of shape [n_samples, n_features].
y: Labels of shape [n_samples].
Returns:
The object itself
"""
self.sampler_list[0].fit(X, y)
return self
[docs] def select_samples(self, X: np.array,
sample_weight: np.array = None) -> np.array:
"""Selects the using uncertainty preselection and KMeans sampler.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
selected = self.sampler_list[0].select_samples(X)
kwargs = dict()
if sample_weight is not None:
kwargs['sample_weight'] = sample_weight[selected]
new_selected = self.sampler_list[1].select_samples(
X[selected], **kwargs)
selected = selected[new_selected]
return selected
[docs]class TwoStepIWKMeansSampler(TwoStepKCentroidSampler):
def __init__(self, beta: int, classifier, batch_size: int,
assume_fitted: bool = False, verbose: int = 0, **kmeans_args):
self.sampler_list = [
MarginSampler(classifier, beta * batch_size, strategy='top',
assume_fitted=assume_fitted, verbose=verbose),
IncrementalMiniBatchKMeansSampler(batch_size, **kmeans_args)
]
[docs]class KCenterGreedy(BaseQuerySampler):
""" KCenter greedy query sampler.
Select the furthest sample from already select ones, add it to the
selected, and repeat until batch_size is reached.
Args:
batch_size: Number of samples to draw when predicting.
"""
def __init__(self, embedding_fun, batch_size, metric='euclidean'):
super().__init__(batch_size)
self._embedding_fun = embedding_fun
self.metric = metric
[docs] def fit(self, X, y=None) -> 'KCenterGreedy':
"""Does nothing, this method is unsupervised.
Args:
X: Labeled samples of shape (n_samples, n_features).
y: Labels of shape (n_samples).
Returns:
The object itself
"""
self._X_centers = self._embedding_fun(X)
return self
[docs] def select_samples(self, X: np.array,
sample_weight: np.array = None) -> np.array:
"""Clusters the samples and select the ones closest to centroids.
Args:
X: Pool of unlabeled samples of shape (n_samples, n_features).
sample_weight: Weight of the samples of shape (n_samples),
optional.
Returns:
Indices of the selected samples of shape (batch_size).
"""
if self._not_enough_samples(X):
return np.arange(X.shape[0])
selected = []
X = self._embedding_fun(X)
_, distances = pairwise_distances_argmin_min(X, self._X_centers, metric=self.metric)
for _ in range(self.batch_size):
# Select the point furthest from already selected
furthest_point = np.argmax(distances)
if furthest_point in selected:
raise ValueError('Selection of duplicate index:', furthest_point)
selected.append(furthest_point)
distances[furthest_point] = 0.
# Consider this point added to label by updating distances
distances_to_new = pairwise_distances(X, X[selected[-1], None], metric=self.metric)[:, 0]
distances = np.min([distances, distances_to_new], axis=0)
if np.allclose(distances, 0.):
# Distances have collapsed, we select randomly the rest of the samples
p = np.ones(X.shape[0])
selected = np.asarray(selected)
p[selected] = 0.
p /= p.sum()
selected = np.concatenate([selected, np.random.choice(X.shape[0], size=self.batch_size - selected.shape[0], replace=False, p=p)])
break
# Return numpy array, not a list.
return np.asarray(selected)