Skip to content

Over sampling

This module includes classes for clustering-based oversampling.

A general class for clustering-based oversampling as well as specific clustering-based oversamplers are provided.

ClusterOverSampler(oversampler, clusterer=None, distributor=None, raise_error=True, random_state=None, n_jobs=None)

Bases: BaseOverSampler

A class that handles clustering-based oversampling.

Any combination of oversampler, clusterer and distributor can be used.

Read more in the [user_guide].

Parameters:

Name Type Description Default
oversampler BaseOverSampler

Oversampler to apply to each selected cluster.

required
clusterer ClusterMixin | None

Clusterer to apply to input space before oversampling.

  • When None, it corresponds to a clusterer that assigns a single cluster to all the samples equivalent to no clustering.

  • When clusterer is given, it applies clustering to the input space. Then oversampling is applied inside each cluster and between clusters.

None
distributor BaseDistributor | None

Distributor to distribute the generated samples per cluster label.

  • When None and a clusterer is provided then it corresponds to the density distributor. If clusterer is also None than the distributor does not affect the over-sampling procedure.

  • When distributor object is provided, it is used to distribute the generated samples to the clusters.

None
raise_error bool

Raise an error when no samples are generated.

  • If True, it raises an error when no filtered clusters are identified and therefore no samples are generated.

  • If False, it displays a warning.

True
random_state RandomState | int | None

Control the randomization of the algorithm.

  • If int, it is the seed used by the random number generator.
  • If np.random.RandomState instance, it is the random number generator.
  • If None, the random number generator is the RandomState instance used by np.random.
None
n_jobs int | None

Number of CPU cores used.

  • If None, it means 1 unless in a joblib.parallel_backend context.

  • If -1 means using all processors.

None

Attributes:

Name Type Description
oversampler_ BaseOverSampler

A fitted clone of the oversampler parameter.

clusterer_ ClusterMixin

A fitted clone of the clusterer parameter or None when a clusterer is not given.

distributor_ BaseDistributor

A fitted clone of the distributor parameter or a fitted instance of the DensityDistributor when a distributor is not given.

labels_ Labels

Cluster labels of each sample.

neighbors_ Neighbors

An array that contains all neighboring pairs with each row being a unique neighboring pair. It is None when the clusterer does not support this attribute.

random_state_ RandomState

An instance of np.random.RandomState class.

sampling_strategy_ dict[int, int]

Actual sampling strategy.

Examples:

>>> from collections import Counter
>>> from clover.over_sampling import ClusterOverSampler
>>> from sklearn.datasets import make_classification
>>> from sklearn.cluster import KMeans
>>> from imblearn.over_sampling import SMOTE
>>> X, y = make_classification(random_state=0, n_classes=2, weights=[0.9, 0.1])
>>> print('Original dataset shape %s' % Counter(y))
Original dataset shape Counter({{0: 90, 1: 10}})
>>> cluster_oversampler = ClusterOverSampler(
... oversampler=SMOTE(random_state=5),
... clusterer=KMeans(random_state=10, n_init='auto'))
>>> X_res, y_res = cluster_oversampler.fit_resample(X, y)
>>> print('Resampled dataset shape %s' % Counter(y_res))
Resampled dataset shape Counter({{0: 90, 1: 90}})
Source code in src/clover/over_sampling/_cluster.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def __init__(
    self: Self,
    oversampler: BaseOverSampler,
    clusterer: ClusterMixin | None = None,
    distributor: BaseDistributor | None = None,
    raise_error: bool = True,
    random_state: np.random.RandomState | int | None = None,
    n_jobs: int | None = None,
) -> None:
    self.oversampler = oversampler
    self.clusterer = clusterer
    self.distributor = distributor
    self.raise_error = raise_error
    self.random_state = random_state
    self.n_jobs = n_jobs

fit(X, y)

Check inputs and statistics of the sampler.

You should use fit_resample to generate the synthetic data.

Parameters:

Name Type Description Default
X InputData

Data array.

required
y Targets

Target array.

required

Returns:

Name Type Description
self Self

Return the instance itself.

Source code in src/clover/over_sampling/_cluster.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def fit(self: Self, X: InputData, y: Targets) -> Self:
    """Check inputs and statistics of the sampler.

    You should use `fit_resample` to generate the synthetic data.

    Args:
        X:
            Data array.
        y:
            Target array.

    Returns:
        self:
            Return the instance itself.
    """
    X, y, _ = self._check_X_y(X, y)
    self._check(X, y)
    return self

fit_resample(X, y, **fit_params)

Resample the dataset.

Parameters:

Name Type Description Default
X InputData

Matrix containing the data which have to be sampled.

required
y Targets

Corresponding label for each sample in X.

required
fit_params dict[str, str]

Parameters passed to the fit method of the clusterer.

{}

Returns:

Name Type Description
X_resampled InputData

The array containing the resampled data.

y_resampled Targets

The corresponding label of resampled data.

Source code in src/clover/over_sampling/_cluster.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def fit_resample(
    self: Self,
    X: InputData,
    y: Targets,
    **fit_params: dict[str, str],
) -> tuple[InputData, Targets]:
    """Resample the dataset.

    Args:
        X:
            Matrix containing the data which have to be sampled.
        y:
            Corresponding label for each sample in X.
        fit_params:
            Parameters passed to the fit method of the clusterer.

    Returns:
        X_resampled:
            The array containing the resampled data.
        y_resampled:
            The corresponding label of resampled data.
    """
    check_classification_targets(y)
    arrays_transformer = ArraysTransformer(X, y)
    X, y, binarize_y = self._check_X_y(X, y)

    self._check(X, y)._fit(X, y, **fit_params)

    output = self._fit_resample(X, y)

    y_ = label_binarize(y=output[1], classes=np.unique(y)) if binarize_y else output[1]

    X_, y_ = arrays_transformer.transform(output[0], y_)
    return (X_, y_)

KMeansSMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5, kmeans_estimator=None, imbalance_ratio_threshold='auto', distances_exponent='auto', raise_error=True, n_jobs=None)

Bases: ClusterOverSampler

KMeans-SMOTE algorithm.

Applies KMeans clustering to the input space before applying SMOTE. Read more in the [user_guide].

Parameters:

Name Type Description Default
sampling_strategy dict[int, int] | str

Sampling information to resample the data set.

  • When float, it corresponds to the desired ratio of the number of samples in the minority class over the number of samples in the majority class after resampling. It is only available for binary classification.

  • When str, specify the class targeted by the resampling. The number of samples in the different classes will be equalized. Possible choices are:

    • 'minority': resample only the minority class.
    • 'not minority': resample all classes but the minority class.
    • 'not majority': resample all classes but the majority class.
    • 'all': resample all classes.
    • 'auto': equivalent to 'not majority'.
  • When dict, the keys correspond to the targeted classes. The values correspond to the desired number of samples for each targeted class.

  • When callable, function taking y and returns a dict. The keys correspond to the targeted classes. The values correspond to the desired number of samples for each class.

'auto'
random_state RandomState | int | None

Control the randomization of the algorithm.

  • If int, it is the seed used by the random number generator.
  • If np.random.RandomState instance, it is the random number generator.
  • If None, the random number generator is the RandomState instance used by np.random.
None
k_neighbors NearestNeighbors | int

Defines the number of nearest neighbors to be used by SMOTE.

  • If int, this number is used to construct synthetic samples.

  • If object, an estimator that inherits from sklearn.neighbors.base.KNeighborsMixin that will be used to find the number of nearest neighbors.

5
kmeans_estimator KMeans | None

Defines the KMeans clusterer applied to the input space.

  • If None, sklearn.cluster.MiniBatchKMeans is used which tends to be better with large number of samples.

  • If KMeans object, then an instance from either sklearn.cluster.KMeans or sklearn.cluster.MiniBatchKMeans.

  • If int, the number of clusters to be used.

  • If float, the proportion of the number of clusters over the number of samples to be used.

None
imbalance_ratio_threshold float | str

The threshold of a filtered cluster. It can be any non-negative number or 'auto' to be calculated automatically.

  • If 'auto', the filtering threshold is calculated from the imbalance ratio of the target for the binary case or the maximum of the target's imbalance ratios for the multiclass case.

  • If float then it is manually set to this number.

Any cluster that has an imbalance ratio smaller than the filtering threshold is identified as a filtered cluster and can be potentially used to generate minority class instances. Higher values increase the number of filtered clusters.

'auto'
distances_exponent float | str

The exponent of the mean distance in the density calculation. It can be any non-negative number or 'auto' to be calculated automatically.

  • If 'auto' then it is set equal to the number of features. Higher values make the calculation of density more sensitive to the cluster's size i.e. clusters with large mean euclidean distance between samples are penalized.

  • If float then it is manually set to this number.

'auto'
raise_error bool

Raise an error when no samples are generated.

  • If True, it raises an error when no filtered clusters are identified and therefore no samples are generated.

  • If False, it displays a warning.

True
n_jobs int | None

Number of CPU cores used.

  • If None, it means 1 unless in a joblib.parallel_backend context.

  • If -1 means using all processors.

None

Attributes:

Name Type Description
oversampler_ SMOTE

A fitted imblearn.over_sampling.SMOTE instance.

clusterer_ KMeans | MiniBatchKMeans

A fitted sklearn.cluster.KMeans or sklearn.cluster.MiniBatchKMeans instance.

distributor_ DensityDistributor

A fitted clover.distribution.DensityDistributor instance.

labels_ Labels

Cluster labels of each sample.

neighbors_ None

It is None since KMeans does not support this attribute.

random_state_ RandomState

An instance of np.random.RandomState class.

sampling_strategy_ dict[int, int]

Actual sampling strategy.

Examples:

>>> import numpy as np
>>> from clover.over_sampling import KMeansSMOTE
>>> from sklearn.datasets import make_blobs
>>> blobs = [100, 800, 100]
>>> X, y  = make_blobs(blobs, centers=[(-10, 0), (0,0), (10, 0)])
>>> # Add a single 0 sample in the middle blob
>>> X = np.concatenate([X, [[0, 0]]])
>>> y = np.append(y, 0)
>>> # Make this a binary classification problem
>>> y = y == 1
>>> kmeans_smote = KMeansSMOTE(random_state=42)
>>> X_res, y_res = kmeans_smote.fit_resample(X, y)
>>> # Find the number of new samples in the middle blob
>>> n_res_in_middle = ((X_res[:, 0] > -5) & (X_res[:, 0] < 5)).sum()
>>> print("Samples in the middle blob: %s" % n_res_in_middle)
Samples in the middle blob: 801
>>> print("Middle blob unchanged: %s" % (n_res_in_middle == blobs[1] + 1))
Middle blob unchanged: True
>>> print("More 0 samples: %s" % ((y_res == 0).sum() > (y == 0).sum()))
More 0 samples: True
Source code in src/clover/over_sampling/_kmeans_smote.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __init__(
    self: Self,
    sampling_strategy: dict[int, int] | str = 'auto',
    random_state: np.random.RandomState | int | None = None,
    k_neighbors: NearestNeighbors | int = 5,
    kmeans_estimator: KMeans | None = None,
    imbalance_ratio_threshold: float | str = 'auto',
    distances_exponent: float | str = 'auto',
    raise_error: bool = True,
    n_jobs: int | None = None,
) -> None:
    self.sampling_strategy = sampling_strategy
    self.random_state = random_state
    self.k_neighbors = k_neighbors
    self.kmeans_estimator = kmeans_estimator
    self.imbalance_ratio_threshold = imbalance_ratio_threshold
    self.distances_exponent = distances_exponent
    self.raise_error = raise_error
    self.n_jobs = n_jobs

clone_modify(oversampler, class_label, y_in_cluster)

Clone and modify attributes of oversampler for corner cases.

Parameters:

Name Type Description Default
oversampler BaseOverSampler

The oversampler to modify its attributes.

required
class_label int

The class label.

required
y_in_cluster Targets

The data of the target in the cluster.

required

Returns:

Type Description
BaseOverSampler

A cloned oversampler with modified number of nearest neighbors.

Source code in src/clover/over_sampling/_cluster.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def clone_modify(oversampler: BaseOverSampler, class_label: int, y_in_cluster: Targets) -> BaseOverSampler:
    """Clone and modify attributes of oversampler for corner cases.

    Args:
        oversampler:
            The oversampler to modify its attributes.
        class_label:
            The class label.
        y_in_cluster:
            The data of the target in the cluster.

    Returns:
        A cloned oversampler with modified number of nearest neighbors.
    """
    # Clone oversampler
    oversampler = clone(oversampler)

    # Not modify attributes case
    if isinstance(oversampler, RandomOverSampler):
        return oversampler

    # Select and modify oversampler
    n_minority_samples = Counter(y_in_cluster)[class_label]
    if n_minority_samples == 1:
        oversampler = RandomOverSampler()
    else:
        if hasattr(oversampler, 'k_neighbors'):
            oversampler.k_neighbors = modify_nn(oversampler.k_neighbors, n_minority_samples)
        if hasattr(oversampler, 'm_neighbors'):
            oversampler.m_neighbors = modify_nn(oversampler.m_neighbors, y_in_cluster.size)
        if hasattr(oversampler, 'n_neighbors'):
            oversampler.n_neighbors = modify_nn(oversampler.n_neighbors, n_minority_samples)
    return oversampler

extract_inter_data(X, y, cluster_labels, inter_distribution, sampling_strategy, random_state)

Extract data between filtered clusters.

Parameters:

Name Type Description Default
X InputData

The input data.

required
y Targets

The targets.

required
cluster_labels Labels

The cluster labels.

required
inter_distribution InterDistribution

The inter-clusters distributions.

required
sampling_strategy OrderedDict[int, int]

The sampling strategy to follow.

required
random_state RandomState

Control the randomization of the algorithm.

required

Returns:

Type Description
list[tuple[dict[int, int], InputData, Targets]]

The inter-clusters data.

Source code in src/clover/over_sampling/_cluster.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def extract_inter_data(
    X: InputData,
    y: Targets,
    cluster_labels: Labels,
    inter_distribution: InterDistribution,
    sampling_strategy: OrderedDict[int, int],
    random_state: np.random.RandomState,
) -> list[tuple[dict[int, int], InputData, Targets]]:
    """Extract data between filtered clusters.

    Args:
        X:
            The input data.
        y:
            The targets.
        cluster_labels:
            The cluster labels.
        inter_distribution:
            The inter-clusters distributions.
        sampling_strategy:
            The sampling strategy to follow.
        random_state:
            Control the randomization of the algorithm.

    Returns:
        The inter-clusters data.
    """
    majority_class_label = Counter(y).most_common()[0][0]
    clusters_data = []
    for (
        ((cluster_label1, class_label1), (cluster_label2, class_label2)),
        proportion,
    ) in inter_distribution.items():
        mask1 = (cluster_labels == cluster_label1) & (np.isin(y, [majority_class_label, class_label1]))
        mask2 = (cluster_labels == cluster_label2) & (np.isin(y, [majority_class_label, class_label2]))
        X1, X2, y1, y2 = X[mask1], X[mask2], y[mask1], y[mask2]
        majority_mask1, majority_mask2 = (
            (y1 == majority_class_label),
            (y2 == majority_class_label),
        )
        n_minority_samples = int(round(sampling_strategy[class_label1] * proportion))
        for _ in range(n_minority_samples):
            ind1, ind2 = (
                random_state.randint(0, (~majority_mask1).sum()),
                random_state.randint(0, (~majority_mask2).sum()),
            )
            X_in_clusters = np.vstack(
                (
                    X1[~majority_mask1][ind1].reshape(1, -1),
                    X2[~majority_mask2][ind2].reshape(1, -1),
                    X1[majority_mask1],
                    X2[majority_mask2],
                ),
            )
            y_in_clusters = np.hstack(
                (
                    y1[~majority_mask1][ind1],
                    y2[~majority_mask2][ind2],
                    y1[majority_mask1],
                    y2[majority_mask2],
                ),
            )
            clusters_sampling_strategy = {class_label1: 1}
            clusters_data.append((clusters_sampling_strategy, X_in_clusters, y_in_clusters))
    return clusters_data

extract_intra_data(X, y, cluster_labels, intra_distribution, sampling_strategy)

Extract data for each filtered cluster.

Parameters:

Name Type Description Default
X InputData

The input data.

required
y Targets

The targets.

required
cluster_labels Labels

The cluster labels.

required
intra_distribution IntraDistribution

The intra-clusters distributions.

required
sampling_strategy OrderedDict[int, int]

The sampling strategy to follow.

required

Returns:

Type Description
list[tuple[dict[int, int], InputData, Targets]]

The intra-clusters data.

Source code in src/clover/over_sampling/_cluster.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def extract_intra_data(
    X: InputData,
    y: Targets,
    cluster_labels: Labels,
    intra_distribution: IntraDistribution,
    sampling_strategy: OrderedDict[int, int],
) -> list[tuple[dict[int, int], InputData, Targets]]:
    """Extract data for each filtered cluster.

    Args:
        X:
            The input data.
        y:
            The targets.
        cluster_labels:
            The cluster labels.
        intra_distribution:
            The intra-clusters distributions.
        sampling_strategy:
            The sampling strategy to follow.

    Returns:
        The intra-clusters data.
    """
    majority_class_label = Counter(y).most_common()[0][0]

    # Get offsets
    selected_multi_labels = []
    classes_labels = {class_label for _, class_label in intra_distribution}
    distribution_value_tie = 0.5
    for selected_class_label in classes_labels:
        intra_distribution_class_label = {
            (cluster_label, class_label): proportion
            for (cluster_label, class_label), proportion in intra_distribution.items()
            if class_label == selected_class_label
        }
        selected_multi_label = max(
            intra_distribution_class_label,
            key=lambda multi_label: intra_distribution_class_label[multi_label],
        )
        if intra_distribution_class_label[selected_multi_label] <= distribution_value_tie:
            selected_multi_labels.append(selected_multi_label)

    # Get clusters data
    clusters_data = []
    for (cluster_label, class_label), proportion in intra_distribution.items():
        mask = (cluster_labels == cluster_label) & (np.isin(y, [majority_class_label, class_label]))
        offset = int((cluster_label, class_label) in selected_multi_labels)
        n_minority_samples = int(round(sampling_strategy[class_label] * proportion)) + offset
        X_in_cluster, y_in_cluster = X[mask], y[mask]
        cluster_sampling_strategy = {class_label: n_minority_samples}
        if n_minority_samples > 0:
            clusters_data.append((cluster_sampling_strategy, X_in_cluster, y_in_cluster))
    return clusters_data

generate_in_cluster(oversampler, transformer, cluster_sampling_strategy, X_in_cluster, y_in_cluster)

Generate intra-cluster or inter-cluster new samples.

Parameters:

Name Type Description Default
oversampler BaseOverSampler

Oversampler to apply to each selected cluster.

required
transformer TransformerMixin

Transformer to apply before oversampling.

required
cluster_sampling_strategy dict[int, int]

The sampling strategy in the cluster.

required
X_in_cluster InputData

The input data in the cluster.

required
y_in_cluster Targets

The targets in the cluster.

required

Returns:

Name Type Description
X_new InputData

The generated.

y_new Targets

The corresponding label of resampled data.

Source code in src/clover/over_sampling/_cluster.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def generate_in_cluster(
    oversampler: BaseOverSampler,
    transformer: TransformerMixin,
    cluster_sampling_strategy: dict[int, int],
    X_in_cluster: InputData,
    y_in_cluster: Targets,
) -> tuple[InputData, Targets]:
    """Generate intra-cluster or inter-cluster new samples.

    Args:
        oversampler:
            Oversampler to apply to each selected cluster.
        transformer:
            Transformer to apply before oversampling.
        cluster_sampling_strategy:
            The sampling strategy in the cluster.
        X_in_cluster:
            The input data in the cluster.
        y_in_cluster:
            The targets in the cluster.

    Returns:
        X_new:
            The generated.
        y_new:
            The corresponding label of resampled data.
    """

    # Create oversampler for specific cluster and class
    class_label = next(iter(cluster_sampling_strategy.keys()))
    oversampler = clone_modify(oversampler, class_label, y_in_cluster)
    oversampler.sampling_strategy_ = cluster_sampling_strategy

    # Resample cluster and class data
    X_res, y_res = oversampler._fit_resample(
        transformer.transform(X_in_cluster) if transformer is not None else X_in_cluster,
        y_in_cluster,
    )

    # Filter only new data
    X_new, y_new = X_res[len(X_in_cluster) :], y_res[len(y_in_cluster) :]

    return X_new, y_new

modify_nn(n_neighbors, n_samples)

Modify the nearest neighbors object.

Parameters:

Name Type Description Default
n_neighbors NearestNeighbors | int

The NearestNeighbors object or number.

required
n_samples int

The number of samples.

required

Returns:

Type Description
NearestNeighbors | int

The modified NearestNeighbors object or number.

Source code in src/clover/over_sampling/_cluster.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def modify_nn(n_neighbors: NearestNeighbors | int, n_samples: int) -> NearestNeighbors | int:
    """Modify the nearest neighbors object.

    Args:
        n_neighbors:
            The `NearestNeighbors` object or number.
        n_samples:
            The number of samples.

    Returns:
        The modified `NearestNeighbors` object or number.
    """
    if isinstance(n_neighbors, NearestNeighbors):
        n_neighbors = (
            clone(n_neighbors).set_params(n_neighbors=n_samples - 1)
            if n_neighbors.n_neighbors >= n_samples
            else clone(n_neighbors)
        )
    elif isinstance(n_neighbors, int) and n_neighbors >= n_samples:
        n_neighbors = n_samples - 1
    return n_neighbors