Skip to content

clustering

Class for clustering the correlation matrices.

MIT License Copyright © 2021-2022, Daniel Nagel, Georg Diez All rights reserved.

Clustering(*, mode='CPM', weighted=True, n_neighbors=None, resolution_parameter=None, n_clusters=None, seed=None)

Bases: ClusterMixin, BaseEstimator

Class for clustering a correlation matrix.

Parameters:

  • mode (str, default: 'CPM' ) –

    the mode which determines the quality function optimized by the Leiden algorithm ('CPM', or 'modularity') or linkage clustering. - 'CPM': will use the constant Potts model on the full, weighted graph - 'modularity': will use modularity on a knn-graph - 'linkage': will use complete-linkage clustering - 'kmedoids': will use k-medoids clustering

  • weighted (bool, default: True ) –

    If True, the underlying graph has weighted edges. Otherwise, the graph is constructed using the adjacency matrix.

  • n_neighbors (int, default: None ) –

    This parameter specifies whether the whole matrix should be used, or a knn-graph, which reduces the required memory. The default depends on the mode - 'CPM': None uses the full graph, and - 'modularity': None uses square root of the number of features.

  • resolution_parameter (float, default: None ) –

    Required for mode 'CPM' and 'linkage'. If None, the resolution parameter will be set to the third quartile of X for n_neighbors=None and else to the mean value of the knn graph.

  • n_clusters (int, default: None ) –

    Required for 'kmedoids'. The number of medoids which will constitute the later clusters.

  • seed (int, default: None ) –

    Use an integer to make the randomness of Leidenalg deterministic. By default uses a random seed if nothing is specified.

Attributes:

  • clusters_ (ndarray of shape (n_clusters, )) –

    The result of the clustering process. A list of arrays, each containing all indices (features) corresponging to each cluster.

  • labels_ (ndarray of shape (n_features, )) –

    Labels of each feature.

  • matrix_ (ndarray of shape (n_features, n_features)) –

    Permuted matrix according to the determined clusters.

  • ticks_ (ndarray of shape (n_clusters, )) –

    The cumulative number of features containing to the clusters. May be used as ticks for plotting matrix_.

  • permutation_ (ndarray of shape (n_features, )) –

    Permutation of the input features (corresponds to flattened clusters_).

  • n_neighbors_ (int) –

    Only avaiable when using knn graph. Indicates the number of nearest neighbors used for constructin the knn-graph.

  • resolution_param_ (float) –

    Only for mode 'CPM' and 'linkage'. Indicates the resolution parameter used for the CPM based Leiden clustering.

  • linkage_matrix_ (ndarray of shape (n_clusters - 1, 4)) –

    Only for mode 'linkage'. Contains the hierarchical clustering encoded as a linkage matrix, see scipy:spatial.distance.linkage.

Examples:

>>> import mosaic
>>> mat = np.array([[1.0, 0.1, 0.9], [0.1, 1.0, 0.1], [0.9, 0.1, 1.0]])
>>> clust = mosaic.Clustering()
>>> clust.fit(mat)
Clustering(resolution_parameter=0.7)
>>> clust.matrix_
array([[1. , 0.9, 0.1],
       [0.9, 1. , 0.1],
       [0.1, 0.1, 1. ]])
>>> clust.clusters_
array([list([2, 0]), list([1])], dtype=object)

Initialize Clustering class.

Source code in src/mosaic/clustering.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@beartype
def __init__(
    self,
    *,
    mode: ClusteringModeString = 'CPM',
    weighted: bool = True,
    n_neighbors: Optional[PositiveInt] = None,
    resolution_parameter: Optional[NumInRange0to1] = None,
    n_clusters: Optional[PositiveInt] = None,
    seed: Optional[int] = None,
) -> None:
    """Initialize Clustering class."""
    self.mode: ClusteringModeString = mode
    self.n_clusters: Optional[PositiveInt] = n_clusters
    self.n_neighbors: Optional[PositiveInt] = n_neighbors
    self.resolution_parameter: Optional[NumInRange0to1] = (
        resolution_parameter
    )
    self.seed: Optional[int] = seed
    self.weighted: bool = weighted

    if mode in {'linkage', 'kmedoids'} and self.n_neighbors is not None:
        raise NotImplementedError(
            f"mode='{mode}' does not support knn-graphs.",
        )

    if mode == 'kmedoids' and self.n_clusters is None:
        raise TypeError(
            f"mode='{mode}' needs parameter 'n_clusters'",
        )
    elif mode != 'kmedoids' and self.n_clusters is not None:
        raise NotImplementedError(
            f"mode='{mode}' does not support the usage of 'n_clusters'",
        )

    if mode in {'CPM', 'linkage'}:
        if not weighted:
            raise NotImplementedError(
                f"mode='{mode}' does not support weighted=False",
            )
    elif resolution_parameter is not None:
        raise NotImplementedError(
            f"mode='{mode}' does not support the usage of the "
            'resolution_parameter',
        )

fit(X, y=None)

Clusters the correlation matrix by Leiden clustering on a graph.

Parameters:

  • X (ndarray of shape (n_features, n_features)) –

    Matrix containing the correlation metric which is clustered. The values should go from [0, 1] where 1 means completely correlated and 0 no correlation.

  • y (Ignored, default: None ) –

    Not used, present for scikit API consistency by convention.

Returns:

  • self ( object ) –

    Fitted estimator.

Source code in src/mosaic/clustering.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
@beartype
def fit(self, X: SimilarityMatrix, y: Optional[np.ndarray] = None):
    """Clusters the correlation matrix by Leiden clustering on a graph.

    Parameters
    ----------
    X : ndarray of shape (n_features, n_features)
        Matrix containing the correlation metric which is clustered. The
        values should go from [0, 1] where 1 means completely correlated
        and 0 no correlation.
    y : Ignored
        Not used, present for scikit API consistency by convention.

    Returns
    -------
    self : object
        Fitted estimator.

    """
    self._reset()

    # prepare matric for graph construction
    mat: FloatMatrix
    if self.mode in {'linkage', 'kmedoids'}:
        mat = np.copy(X)
    elif self.mode == 'CPM' and self.n_neighbors is None:
        mat = np.copy(X)
    else:
        mat = self._construct_knn_mat(X)

    if self.mode in {'CPM', 'linkage'}:
        # mask diagonal and zero elements
        mat[mat == 0] = np.nan
        mat[np.diag_indices_from(mat)] = np.nan

        if self.resolution_parameter is None:
            if self.n_neighbors is None:
                third_quartile = 0.75
                self.resolution_parameter = np.nanquantile(
                    mat, third_quartile,
                )
            else:
                self.resolution_parameter = np.nanmean(mat)

        self.resolution_param_: NumInRange0to1 = (
            self.resolution_parameter
        )

    # create graph
    mat[np.isnan(mat)] = 0

    clusters: Object1DArray
    if self.mode == 'linkage':
        clusters = self._clustering_linkage(mat)
    elif self.mode == 'kmedoids':
        clusters = self._clustering_kmedoids(mat)
    else:  # _mode in {'CPM', 'modularity'}
        graph: ig.Graph = ig.Graph.Weighted_Adjacency(
            list(mat.astype(np.float64)), loops=False,
        )
        clusters = self._clustering_leiden(graph)

    self.clusters_: Object1DArray = _sort_clusters(clusters, X)
    self.permutation_: Index1DArray = np.hstack(self.clusters_)
    self.matrix_: Float2DArray = np.copy(X)[
        np.ix_(self.permutation_, self.permutation_)
    ]
    self.ticks_: Index1DArray = np.cumsum(
        [len(cluster) for cluster in self.clusters_],
    )
    labels: Index1DArray = np.empty_like(self.permutation_)
    for idx, cluster in enumerate(self.clusters_):
        labels[cluster] = idx
    self.labels_: Index1DArray = labels

    return self

fit_predict(X, y=None)

Clusters the correlation matrix by Leiden clustering on a graph.

Parameters:

  • X (ndarray of shape (n_features, n_features)) –

    Matrix containing the correlation metric which is clustered. The values should go from [0, 1] where 1 means completely correlated and 0 no correlation.

  • y (Ignored, default: None ) –

    Not used, present for scikit API consistency by convention.

Returns:

  • labels ( ndarray of shape (n_samples,) ) –

    Cluster labels.

Source code in src/mosaic/clustering.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
@beartype
def fit_predict(
    self, X: SimilarityMatrix, y: Optional[np.ndarray] = None,
) -> Index1DArray:
    """Clusters the correlation matrix by Leiden clustering on a graph.

    Parameters
    ----------
    X : ndarray of shape (n_features, n_features)
        Matrix containing the correlation metric which is clustered. The
        values should go from [0, 1] where 1 means completely correlated
        and 0 no correlation.
    y : Ignored
        Not used, present for scikit API consistency by convention.

    Returns
    -------
    labels : ndarray of shape (n_samples,)
        Cluster labels.

    """
    return super().fit_predict(X, y)

score(X, y=None, sample_weight=None)

Estimate silhouette_score of new correlation matrix.

Parameters:

  • X (ndarray of shape (n_features, n_features)) –

    New matrix containing the correlation metric to score. The values should go from [0, 1] where 1 means completely correlated and 0 no correlation.

  • y (Ignored, default: None ) –

    Not used, present for scikit API consistency by convention.

  • sample_weight (Optional[ndarray], default: None ) –

    Not used, present for scikit API consistency by convention.

Returns:

  • score ( float ) –

    Silhouette score of new correlation matrix based on fitted labels.

Source code in src/mosaic/clustering.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
@beartype
def score(
    self,
    X: SimilarityMatrix,
    y: Optional[np.ndarray] = None,
    sample_weight: Optional[np.ndarray] = None,
) -> Float:
    """Estimate silhouette_score of new correlation matrix.

    Parameters
    ----------
    X : ndarray of shape (n_features, n_features)
        New matrix containing the correlation metric to score. The
        values should go from [0, 1] where 1 means completely correlated
        and 0 no correlation.
    y : Ignored
        Not used, present for scikit API consistency by convention.
    sample_weight: Ignored
        Not used, present for scikit API consistency by convention.

    Returns
    -------
    score : float
        Silhouette score of new correlation matrix based on fitted labels.

    """
    check_is_fitted(self, attributes=['labels_', 'matrix_'])

    n_labels = len(self.labels_)
    n_unique_labels = len(np.unique(self.labels_))

    if n_labels != len(X):
        raise ValueError(
            f'Dimension of X d={len(X):.0f} needs to agree with the '
            f'dimension of the fitted data d={n_labels:.0f}.',
        )

    if n_unique_labels in {1, n_labels}:
        return -1.0
    return silhouette_score(X, labels=self.labels_)