import numpy as np
import json
import logging
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score as SS
from sklearn.metrics import adjusted_rand_score as ARI
from sklearn.metrics import adjusted_mutual_info_score as AMI
import bacpipe.embedding_evaluation.label_embeddings as le
import bacpipe
logger = logging.getLogger(__name__)
[docs]
def convert_numpy_types(obj):
if isinstance(obj, np.int64):
return int(obj)
elif isinstance(obj, np.float32):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
[docs]
def run_clustering(
embeds, cluster_configs,
label_column=None, ground_truth=[]
):
"""
Fit clustering algorithms to embeddings.
Parameters
----------
embeds : np.array
embeddings
cluster_configs : dict
clustering algorithm objects
label_column : string
label type defined in annotations.csv file
ground_truth : list
ground truth labels
Returns
-------
dict
labels accordings to clustering algorithms
"""
clusterings = {}
for name, clusterer in cluster_configs.items():
clusterings[name] = clusterer.fit_predict(embeds)
if len(ground_truth) > 0:
clusterings[name + "_no_noise"] = clusterer.fit_predict(
embeds[ground_truth != -1]
)
if len(ground_truth) > 0 and label_column:
clusterings[label_column] = ground_truth
clusterings[f"{label_column}_no_noise"] = ground_truth[ground_truth != -1]
return clusterings
[docs]
def eval_clustering(
clusterings, ground_truth=[],
embeds=None,
default_labels=None,
label_column=None,
**kwargs
):
"""
Evaluate clustering performance.
Parameters
----------
clusterings : dict
dictionary with clusterings
ground_truth : list
ground truth labels
default_labels : dict
default labels for the dataset
label_column : string
label type defined in annotations.csv file
Returns
-------
dict
performance metrics
"""
metrics = {"AMI": dict(), "ARI": dict()}
for cl_name, cl_labels in clusterings.items():
if cl_name == f"{label_column}_no_noise":
if -1 in ground_truth:
embeds = embeds[ground_truth != -1]
cl_labels = ground_truth[ground_truth != -1]
if default_labels and not hasattr(default_labels, 'kmeans'):
default_labels["kmeans"] = clusterings["kmeans"]
if not default_labels:
metrics[f"AMI"][f"{cl_name}-ground_truth"] = AMI(ground_truth, cl_labels)
metrics[f"ARI"][f"{cl_name}-ground_truth"] = ARI(ground_truth, cl_labels)
else:
for def_name, def_labels in default_labels.items():
if "no_noise" in cl_name:
def_labels = np.array(def_labels)[ground_truth != -1]
metrics[f"AMI"][f"{cl_name}-{def_name}"] = AMI(def_labels, cl_labels)
metrics[f"ARI"][f"{cl_name}-{def_name}"] = ARI(def_labels, cl_labels)
return metrics
[docs]
def eval_with_silhouette(embeds, ground_truth, metrics=None):
"""
Evaluate clustering using Silhouette Score.
Parameters
----------
embeds : np.ndarray
embeddings
ground_truth : list
ground truth array
metrics : dict, optional
already generated evaluation metrics, if any, by default None
Returns
-------
dict
evaluation metrics including Silhouette score
"""
if not metrics:
metrics = dict()
metrics["SS"] = SS(embeds, ground_truth)
return metrics
[docs]
def get_clustering_models(clust_params):
"""
Initialize the clustering models specified in settings.yaml
Parameters
----------
clust_params : dict
clusterings specified in settings.yaml
Returns
-------
dict
clustering objects to run the data on
"""
cluster_configs = {}
for name, params in clust_params.items():
if name == "kmeans":
cluster_configs[name] = KMeans(**params)
if False: # TODO name == "hdbscan":
from hdbscan import hdbscan
cluster_configs[name] = hdbscan.HDBSCAN(**params, core_dist_n_jobs=-1)
return cluster_configs
[docs]
def get_nr_of_clusters(labels, clust_configs, **kwargs):
"""
Get number of clusters either from ground truth or if doesn't exist
from settings.yaml
Parameters
----------
labels : list
ground truth labels
clust_configs : dict
clusterings specified in settings.yaml
Returns
-------
dict
clustering dict with correct number of clusters
"""
clust_params = {}
for config in clust_configs.values():
if config["name"] == "kmeans":
if len(labels) > 0:
nr_of_classes = len(np.unique(labels))
clust_params[config["name"]] = {
"n_clusters": nr_of_classes,
}
else:
clust_params[config["name"]] = config["params"]
else:
if config["bool"]:
clust_params[config["name"]] = config["params"]
return clust_params
[docs]
def clustering_pipeline(
model_name,
ground_truth, embeds,
paths=None,
overwrite=True,
label_column=bacpipe.settings.label_column,
**kwargs
):
"""
Clustering pipeline, generating clusterings based on the
settings file. Clusterings are then evaluated and a dictionary
with the evaluation scores is saved and returned
Parameters
----------
model_name : str
name of model backbone
ground_truth : dict
ground truth labels and a label2dict dictionary
embeds : np.array
embeddings
paths : SimpleNamespace object
dict with path attributs for saving and loading
overwrite : bool, optional
whether to overwrite exisiting clustering files, by default False
label_column : str, optional
name of column in annotations file, defaults to bacpipe.settings.label_column
"""
if not kwargs:
kwargs = {**vars(bacpipe.settings)}
kwargs.pop('label_column')
if not paths:
get_paths_func = bacpipe.make_set_paths_func(
bacpipe.config.audio_dir, bacpipe.settings.main_results_dir
)
paths = get_paths_func(model_name)
if (
overwrite
or not len(list(paths.clust_path.glob("*.json"))) > 0
):
if "audio_dir" in kwargs: kwargs.pop("audio_dir")
if ground_truth:
ground_truth = ground_truth[f"label:{label_column}"]
if len(ground_truth.shape) > 1:
logger.warning(
"You have passed a multi-label ground truth array. "
"However bacpipe only supports single label clustering "
"and will therefore only take one species for each timestamp."
)
ground_truth = ground_truth[:, 0]
else:
ground_truth = []
clust_params = get_nr_of_clusters(ground_truth, **kwargs)
cluster_configs = get_clustering_models(clust_params)
default_labels = le.create_default_labels(
paths.audio_dir, paths.clust_path.parent.stem, paths, **kwargs
)
clusterings = run_clustering(
embeds, cluster_configs, label_column, ground_truth
)
metrics = eval_clustering(
clusterings, ground_truth, embeds, default_labels, label_column, **kwargs
)
if kwargs.get('evaluate_with_silhouette'):
metrics = eval_with_silhouette(embeds, clusterings, metrics)
save_clustering_performance(paths, clusterings, metrics, label_column)
else:
logger.info(
"Clustering file cluster_metrics.json already exists and"
" so is not computed. If you want to overwrite existing results, "
"set overwrite to True in settings.yaml."
)
clusterings = np.load(
paths.clust_path.joinpath(f"clust_labels.npy"),
allow_pickle=True
).item()
with open(paths.clust_path.joinpath(f"clust_results.json"), "r") as f:
metrics = json.load(f)
return clusterings, metrics