import yaml
import json
import re
from types import SimpleNamespace
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import datetime as dt
from importlib import import_module
from librosa import get_duration
import logging
logger = logging.getLogger("bacpipe")
[docs]
class DefaultLabels:
[docs]
def __init__(self, paths, model, default_label_keys, **kwargs):
"""
Class to generate default labels based on audio files and
number of generated embeddings per file.
Parameters
----------
paths : SimpleNamespace
convenient object for path handling
model : str
model name
default_label_keys : list
list of default labels, see settings.yaml
Raises
------
ValueError
if no embeddings were found
"""
self.model = model
self.default_label_keys = default_label_keys
self.paths = paths
if kwargs.get('only_embed_annotations'):
self.only_embed_annotations = True
self.df, _ = load_labels_and_build_dict(
paths,
kwargs.get('annotations_filename'),
self.paths.audio_dir,
bool_filter_labels=False
)
if (self.paths.preds_path / "original_classifier_outputs").exists():
if not "default_classifier" in self.default_label_keys:
self.default_label_keys += ["default_classifier"]
elif "default_classifier" in self.default_label_keys:
self.default_label_keys.remove("default_classifier")
try:
embed_path = model_specific_embedding_path(paths.main_embeds_path, model)
self.metadata = yaml.safe_load(open(embed_path.joinpath("metadata.yml"), "r"))
self.nr_embeds_per_file = self.metadata["files"]["nr_embeds_per_file"]
self.nr_embeds_total = self.metadata["nr_embeds_total"]
except ValueError as e:
logger.info(
"No embeddings found. Gathering files and nr of embeddings "
"per file from audio files."
)
_, _, metadata = get_files_if_no_embeds(paths.audio_dir, model)
self.metadata = metadata
self.nr_embeds_per_file = metadata['files']['nr_embeds_per_file']
self.nr_embeds_total = sum(
metadata['files']['nr_embeds_per_file']
)
if not sum(self.nr_embeds_per_file) == self.nr_embeds_total:
error = (
"\nThe number of embeddings per file does not match "
"the total number of embeddings.")
logger.exception(error)
raise ValueError(error)
[docs]
def generate(self):
self.default_label_dict = {}
for default_label in self.default_label_keys:
getattr(self, default_label)()
if hasattr(self, f"{default_label}_per_embedding"):
self.default_label_dict.update(
{default_label: getattr(self, f"{default_label}_per_embedding")}
)
[docs]
def get_datetimes(self):
if not hasattr(self, "timestamp_per_file"):
self.timestamp_per_file = {}
for file in self.metadata["files"]["audio_files"]:
file_stem = Path(file).stem
self.timestamp_per_file.update({file: get_dt_filename(file_stem)})
[docs]
def time_of_day(self):
self.get_datetimes()
segment_s = (
self.metadata["segment_length (samples)"]
/ self.metadata["sample_rate (Hz)"]
)
segment_s_dt = dt.timedelta(seconds=float(segment_s))
time_of_day_per_file = {}
for file, datetime_of_file in self.timestamp_per_file.items():
timeofday = dt.datetime(
2000,
1,
1, # using a default day just to keep working with timestamps
datetime_of_file.hour,
datetime_of_file.minute,
datetime_of_file.second,
)
time_of_day_per_file.update({file: timeofday})
self.time_of_day_per_embedding = []
for file_idx, (file, time_of_day) in enumerate(time_of_day_per_file.items()):
for index_of_embedding in range(self.nr_embeds_per_file[file_idx]):
if hasattr(self, 'only_embed_annotations') and getattr(self, 'only_embed_annotations'):
starts = self.df.start.values[self.df.audiofilename == file]
timestamp = (
(time_of_day + dt.timedelta(seconds=float(starts[index_of_embedding])))
.time()
.replace(microsecond=0)
)
else:
timestamp = (
(time_of_day + index_of_embedding * segment_s_dt)
.time()
.replace(microsecond=0)
)
self.time_of_day_per_embedding.append(timestamp.strftime("%H-%M-%S"))
[docs]
def day_of_year(self):
self.get_datetimes()
day_of_year_per_file = {}
for file, datetime_of_file in self.timestamp_per_file.items():
time_of_day = dt.datetime(
2000, datetime_of_file.month, datetime_of_file.day
)
day_of_year_per_file.update({file: time_of_day})
self.day_of_year_per_embedding = []
for file_idx, (file, day_of_year) in enumerate(day_of_year_per_file.items()):
self.day_of_year_per_embedding.extend(
np.repeat(
day_of_year.strftime("%Y-%m-%d"), self.nr_embeds_per_file[file_idx]
)
)
[docs]
def continuous_timestamp(self):
self.get_datetimes()
segment_s = (
self.metadata["segment_length (samples)"]
/ self.metadata["sample_rate (Hz)"]
)
segment_s_dt = dt.timedelta(seconds=segment_s)
self.continuous_timestamp_per_embedding = []
for file_idx, (file, datetime_per_file) in enumerate(
self.timestamp_per_file.items()
):
for index_of_embedding in range(self.nr_embeds_per_file[file_idx]):
if hasattr(self, 'only_embed_annotations') and getattr(self, 'only_embed_annotations'):
starts = self.df.start.values[self.df.audiofilename == file]
timestamp = (
(datetime_per_file + dt.timedelta(seconds=float(starts[index_of_embedding])))
.time()
.replace(microsecond=0)
)
else:
timestamp = (
datetime_per_file + index_of_embedding * segment_s_dt
).replace(microsecond=0)
self.continuous_timestamp_per_embedding.append(
timestamp.strftime("%Y-%m-%d_%H:%M:%S")
)
[docs]
def parent_directory(self):
self.parent_directory_per_embedding = []
for file_idx, file in enumerate(self.metadata["files"]["audio_files"]):
self.parent_directory_per_embedding.extend(
np.repeat(str(Path(file).parent), self.nr_embeds_per_file[file_idx])
)
[docs]
def audio_file_name(self):
self.audio_file_name_per_embedding = []
for file_idx, file in enumerate(self.metadata["files"]["audio_files"]):
self.audio_file_name_per_embedding.extend(
np.repeat(file, self.nr_embeds_per_file[file_idx])
)
[docs]
def default_classifier(self):
clfier_paths = list(self.paths.preds_path.rglob("*_classifier_annotations.csv"))
if len(clfier_paths) == 0:
self.default_label_keys.remove("default_classifier")
else:
path = clfier_paths[0]
df = pd.read_csv(path)
if not len(self.parent_directory_per_embedding) == len(df):
df = self.fill_remaining_labels(df)
self.default_classifier_per_embedding = df[
"label:default_classifier"
].values.tolist()
[docs]
def fill_remaining_labels(self, df):
seg_len = self.metadata['segment_length (samples)'] / self.metadata['sample_rate (Hz)']
df_new = {
'start': [],
'end': [],
'audiofilename': [],
'label:default_classifier': []
}
for file, nr_embeds in zip(
self.metadata['files']['audio_files'],
self.metadata['files']['nr_embeds_per_file']
):
df_part = df[df.audiofilename == file]
if hasattr(self, 'only_embed_annotations') and getattr(self, 'only_embed_annotations'):
starts = self.df.start[self.df.audiofilename == file]
all_time_bins = np.round(starts.values, 4).tolist()
else:
all_time_bins = np.round(np.arange(nr_embeds) * seg_len, 4).tolist()
[all_time_bins.remove(l) for l in np.round(df_part.start, 4)]
df_new['start'].extend(all_time_bins)
df_new['end'].extend((np.array(all_time_bins) + seg_len).tolist())
df_new['audiofilename'].extend([file] * len(all_time_bins))
df_new['label:default_classifier'].extend(['below_thresh'] * len(all_time_bins))
df = pd.concat([df, pd.DataFrame(df_new)], ignore_index=True)
if not len(df) == self.metadata['nr_embeds_total']:
raise AssertionError(
"The number of points does not match the total number of embeddings."
)
return df.sort_values(['audiofilename', 'start'])
[docs]
def make_set_paths_func(
audio_dir,
main_results_dir=None,
dim_reduc_parent_dir="dim_reduced_embeddings",
testing=False,
**kwargs,
):
global get_paths
def get_paths(model_name):
"""
Generate model specific paths for the results of the embedding evaluation.
This includes paths for the embeddings, labels, clustering, classification,
and plots. The paths are created based on the audio directory,
and model name.
Parameters
----------
audio_dir : string
full path to audio files
model_name : string
name of the model used for embedding
main_results_dir : string
top level directory for the results of the embedding evaluation
Returns
-------
paths : SimpleNamespace
object containing the paths for the results of the embedding evaluation
"""
dataset_path = Path(main_results_dir).joinpath(Path(audio_dir).parts[-1])
task_path = dataset_path.joinpath("evaluations").joinpath(model_name)
paths = {
"audio_dir": audio_dir,
"dataset_path": dataset_path,
"dim_reduc_parent_dir": dataset_path.joinpath(dim_reduc_parent_dir),
"main_embeds_path": dataset_path.joinpath("embeddings"),
"labels_path": task_path.joinpath("labels"),
"clust_path": task_path.joinpath("clustering"),
"probe_path": task_path.joinpath("probing"),
"preds_path": task_path.joinpath("predictions"),
"plot_path": task_path.joinpath("plots"),
}
paths = SimpleNamespace(**paths)
paths.main_embeds_path.mkdir(exist_ok=True, parents=True)
paths.labels_path.mkdir(exist_ok=True, parents=True)
paths.clust_path.mkdir(exist_ok=True)
paths.probe_path.mkdir(exist_ok=True)
paths.plot_path.mkdir(exist_ok=True)
return paths
return get_paths
[docs]
def get_dim_reduc_path_func(model_name, dim_reduction_model="umap", **kwargs):
if dim_reduction_model in [None, "None", "", []]:
dim_reduction_model = "umap"
logger.warning(
f"Dimensionality reduction model not specified. "
f"Search for default dim_reduction_model: {dim_reduction_model}."
)
return model_specific_embedding_path(
get_paths(model_name).dim_reduc_parent_dir,
model_name,
dim_reduction_model=dim_reduction_model,
**kwargs,
)
[docs]
def get_default_labels(model_name, **kwargs):
"""
Return dictionary of the default labels based on the files that were
already processed and saved. This is model dependent, as the input length is
model dependent and therefore this function requires a model name as input.
The default labels are calculated based on the default labels specified in the
settings.yaml file.
Parameters
----------
model_name : str
model name
Returns
-------
dict
dictionary of default labels
"""
paths = get_paths(model_name)
return create_default_labels(paths.audio_dir, model_name, paths, **kwargs)
[docs]
def get_ground_truth(model_name):
"""
Return dictionary of the ground truth labels based on the files that were
already processed and saved. This is model dependent, as the input length is
model dependent and therefore this function requires a model name as input.
Parameters
----------
model_name : str
model name
Returns
-------
dict
dictionary of ground truth labels
"""
return np.load(
get_paths(model_name).labels_path.joinpath("ground_truth.npy"),
allow_pickle=True,
).item()
[docs]
def get_dt_filename(file):
"""
Return the timestamp within a filename as a datetime object based on
the most common naming conventions in bioacoustics. This is not bullet
proof but it works with the vast majority of naming conventions for files.
Parameters
----------
file : str
filename as string
Returns
-------
dt.datetime object
datetime object of the filename
"""
if "+" in file:
file = file.split("+")[0]
numbs = re.findall("[0-9]+", file)
numbs = [n for n in numbs if len(n) % 2 == 0]
file_date = None
i, datetime = 1, ""
while len(datetime) < 12:
if i > 1000:
logger.warning(
f"Could not find a valid datetime in the filename {file}. "
"Please check the filename format."
"Creating a default datetime corresponding to 2000, 1, 1."
)
datetime = "20001010000000"
break
datetime = "".join(numbs[-i:])
i += 1
i = 1
while 12 <= len(datetime) > 14:
datetime = datetime[:-i]
for _ in range(2):
try:
if len(datetime) == 12:
file_date = dt.datetime.strptime(datetime, "%y%m%d%H%M%S")
elif len(datetime) == 14:
file_date = dt.datetime.strptime(datetime, "%Y%m%d%H%M%S")
except:
i = 1
while len(datetime) > 12:
datetime = datetime[:-i]
# add fix if file_date is never created as a datetime object
if file_date is None:
logger.warning(
f"Could not find a valid datetime in the filename {file}. "
"Please check the filename format."
"Creating a default datetime corresponding to 2000, 1, 1."
)
file_date = dt.datetime.strptime("20001010000000", "%y%m%d%H%M%S")
return file_date
[docs]
def model_specific_embedding_path(path, model, dim_reduction_model=None, **kwargs):
"""
Get the path to the model specific embeddings.
This function searches for the most recent directory
containing the embeddings for the specified model and
dimensionality reduction model.
Parameters
----------
path : Path
Path to the main embeddings directory.
model : str
Name of the model used for embedding.
dim_reduction_model : str
Name of the dimensionality reduction model used. Default is 'umap'.
kwargs : dict
Additional keyword arguments.
Returns
-------
Path
Path to the model specific embeddings directory.
Raises
-------
ValueError
If no embeddings are found for the specified model.
"""
if not isinstance(model, str):
model = str(model)
embed_paths_for_this_model = [
d
for d in path.iterdir()
if d.is_dir() and model in d.stem.split("___")[-1].split("-")
]
if not dim_reduction_model in [None, "None", "", []]:
embed_paths_for_this_model = [
d for d in embed_paths_for_this_model if dim_reduction_model in d.stem
]
embed_paths_for_this_model.sort()
if len(embed_paths_for_this_model) == 0:
error = (
f"\nNo embeddings found for model {model} in {path}. "
"Please check the directory path."
)
logger.exception(error)
raise ValueError(error)
elif len(embed_paths_for_this_model) > 1:
logger.info(
f"Multiple embeddings found for model {model} in {path}. "
"Using the most recent path."
)
return embed_paths_for_this_model[-1]
[docs]
def create_default_labels(
audio_dir=None, model=None, paths=None, overwrite=True, **kwargs
):
"""
Create default labels based on audio files and model timestamps to
match the number of embeddings created per file for visualization
and clustering purposes.
Parameters
----------
audio_dir : str, optional
path to audio data, by default None
model : str, optional
model name, by default None
paths : SimpleNamespace, optional
convenient object for path handling, by default None
overwrite : bool, optional
if True labels are overwritten, by default True
Returns
-------
dict
dictionary with default labels
"""
if paths is None:
assign_global_get_paths_function(audio_dir)
paths = get_paths(model)
if overwrite or not paths.labels_path.joinpath("default_labels.npy").exists():
if not kwargs.get('default_label_keys'):
from bacpipe import settings as bacpipe_settings
kwargs['default_label_keys'] = bacpipe_settings.default_label_keys
default_labels = DefaultLabels(
paths, model=model, audio_dir=audio_dir, **kwargs
)
default_labels.generate()
def_labels = default_labels.default_label_dict
np.save(
paths.labels_path.joinpath("default_labels.npy"),
def_labels,
)
else:
def_labels = np.load(
paths.labels_path.joinpath("default_labels.npy"), allow_pickle=True
).item()
return def_labels
[docs]
def concatenate_annotation_files(
annotation_src,
appendix=".txt",
acodet_annotations=False,
start_col_name="start",
end_col_name="end",
lab_col_name="label",
):
# TODO needs testing
p = Path(annotation_src)
if acodet_annotations:
## This should always work for acodet combined annotations
dfc = pd.read_csv(p.joinpath("combined_annotations.csv"))
dfn = pd.read_csv(p.joinpath("explicit_noise.csv"))
dfall = pd.concat([dfc, dfn])
aud = dfall["filename"]
auds = [Path(a).stem + ".wav" for a in aud]
dfall["audiofilename"] = auds
df = dfall[["start", "end", "label", "audiofilename"]]
else:
df = pd.DataFrame()
for file in tqdm(
p.rglob(f"*{appendix}"), desc="Loading annotations", leave=False
):
try:
ann = pd.read_csv(file, sep="\t", header=None)
except pd.errors.EmptyDataError:
continue
df = pd.concat([df, dff], ignore_index=True)
dff = pd.DataFrame()
dff["start"] = ann[start_col_name]
dff["end"] = ann[end_col_name]
dff["label"] = ann[lab_col_name]
dff["audiofilename"] = file.stem + ".wav"
if True:
short_to_species = pd.read_csv(
"/mnt/swap/Work/Data/Amphibians/AnuranSet/species.csv"
)
for spe in df.label.unique():
df.label[df.label == spe] = short_to_species.SPECIES[
short_to_species.CODE == spe
].values[0]
df.to_csv(
p.joinpath("annotations.csv"),
index=False,
)
[docs]
def filter_annotations_by_minimum_number_of_occurrences(
df, min_occurrences=150, min_duration=0.65
):
"""
Filter the annotations to have at least a minimum number of occurrences
and a minimum duration.
Parameters
----------
df : pd.DataFrame
DataFrame containing the annotations.
min_occurrences : int, optional
Minimum number of occurrences for each label, by default 150.
min_duration : float, optional
Minimum duration for each label, by default 0.65.
Returns
-------
pd.DataFrame
Filtered DataFrame containing the annotations.
"""
label_counts = df["label"].value_counts()
labels_to_keep = label_counts[label_counts >= min_occurrences].index
filtered_df = df[
(df["label"].isin(labels_to_keep)) & ((df["end"] - df["start"]) >= min_duration)
]
return filtered_df
[docs]
def load_labels_and_build_dict(
paths,
annotations_filename,
audio_dir,
bool_filter_labels=True,
min_label_occurrences=150,
main_label_column=None,
testing=False,
**kwargs,
):
try:
try:
label_df = pd.read_csv(Path(audio_dir).joinpath(annotations_filename))
except FileNotFoundError as e:
label_df = pd.read_csv(list(Path(audio_dir).rglob(annotations_filename))[0])
except FileNotFoundError as e:
logger.warning(
f"No annotations file found in {audio_dir}, trying in "
f"{str(paths.dataset_path.resolve())}."
)
try:
label_df = pd.read_csv(paths.dataset_path.joinpath(annotations_filename))
except:
logger.warning(
"No annotations file found, not able to create ground_truth.npy file. "
"bacpipe should still work, but you will not be able to label by ground truth. "
"You also will not be able to evaluate using classification."
)
raise FileNotFoundError("No annotations file found.")
if bool_filter_labels and not testing:
filtered_labels = [
lab
for lab in set(label_df[main_label_column])
if len(label_df[label_df[main_label_column] == lab])
> min_label_occurrences
]
if not filtered_labels:
logger.info(
"\nBy filtering the annotations.csv file using the "
f"{min_label_occurrences=}, no labels are left. In "
"case you are just testing, the labels will not be filtered"
f" and {bool_filter_labels=} will be ignored. If this "
"a serious probing task, you will need more annotations. "
"This might cause the probing or clustering to crash.\n"
)
else:
label_df = label_df[label_df[main_label_column].isin(filtered_labels)]
label_idx_dict = {}
for label_column in [l for l in label_df.columns if 'label:' in l]:
label_idx_dict[label_column] = {
label: idx
for idx, label in enumerate(label_df[label_column].unique())
}
if paths.labels_path.exists():
with open(paths.labels_path.joinpath("label_idx_dict.json"), "w") as f:
json.dump(label_idx_dict, f, indent=1)
return label_df, label_idx_dict
[docs]
def fit_labels_to_embedding_timestamps(
df,
label_idx_dict,
num_embeds,
segment_s,
label_column=None,
single_label=True,
min_annotation_length=0.65,
**kwargs,
):
file_labels = np.ones(num_embeds) * -1
embed_timestamps = np.arange(num_embeds) * segment_s
if single_label:
single_label_arr = [True] * len(embed_timestamps)
else:
file_labels = file_labels.reshape([len(file_labels), 1])
for _, row in df.iterrows():
em_start = np.where(embed_timestamps - row.start <= 0)[0][-1]
em_end = np.where(embed_timestamps - row.end >= 0)[0]
if len(em_end) > 0:
em_end = em_end[0]
else:
em_end = len(embed_timestamps)
# if not all of the values are noise, meaning there are already
# some labels in this segment
if not np.all(file_labels[em_start:em_end] == -1):
if single_label:
single_label_arr[em_start:em_end] = [False] * (em_end - em_start)
else:
for idx in range(em_start, em_end):
# if there is any noise in this segment, we'll write into
# those places
if np.any(file_labels[idx:idx+1] == -1):
file_labels[idx:idx+1][
file_labels[idx:idx+1]==-1
] = label_idx_dict[row[f"label:{label_column}"]]
# if the current label is already written in that segment
# skip. This assumes that we don't have two annotations of the
# same class of varying length overlaying each other
elif (
label_idx_dict[row[f"label:{label_column}"]]
in file_labels[idx:idx+1]
):
continue
# if all labels in this segment are the same. meaning that
# if we have a 2d array already but for each timestamp the
# labels are the same, we can just overwrite one. we won't
# loose anything and we don't have to create any new columns
elif len(np.unique(file_labels[idx:idx+1])) == 1:
file_labels[idx:idx+1, -1] = label_idx_dict[row[f"label:{label_column}"]]
# We only go here, if there is no place we can write our new
# class into the array without loosing information. we therefore
# create a new column, which is created with noise (-1) values and
# we then write our current label index into that column
else:
new_column = np.ones(len(file_labels)) * -1
new_column = new_column.reshape([len(file_labels), 1])
file_labels = np.hstack([file_labels, new_column])
file_labels[idx:idx+1, -1] = label_idx_dict[row[f"label:{label_column}"]]
# check if the annotation length is longer that the specified min_annotation_length
elif row.end - row.start > min_annotation_length:
if single_label:
file_labels[em_start:em_end] = label_idx_dict[row[f"label:{label_column}"]]
else:
file_labels[em_start:em_end, 0] = label_idx_dict[row[f"label:{label_column}"]]
if len(file_labels.shape) > 1 and (
file_labels.shape[0] > 1 or file_labels.shape[-1] > 1
):
file_labels = file_labels.squeeze()
if single_label:
file_labels[~np.array(single_label_arr)] = -2
return file_labels
else:
if len(file_labels.shape) == 1:
array = np.ones([len(file_labels), 2]) * -1
array[:, 0] = file_labels
return array
else:
return file_labels
[docs]
def build_ground_truth_labels_by_file(
paths,
ind,
model,
num_embeds,
segment_s,
metadata,
all_labels,
label_df=None,
label_idx_dict=None,
label_column=None,
**kwargs,
):
audio_file = metadata["files"]["audio_files"][ind]
df = filter_df_by_filename(label_df, audio_file, model=model)
if df.empty:
logger.info(
f'df is empty for {audio_file}, meaning no annotations. '
"If that's incorrect, ensure the audiofilename column has the correct "
"file names."
)
label_dimensions = all_labels.shape[-1] if len(all_labels.shape) > 1 else 1
all_labels = np.concatenate(
(all_labels, np.ones([num_embeds, label_dimensions]).squeeze() * -1)
)
return all_labels
if kwargs.get('only_embed_annotations'):
values = df[f'label:{label_column}']
file_labels = np.array([label_idx_dict[v] for v in values])
else:
file_labels = fit_labels_to_embedding_timestamps(
df, label_idx_dict, num_embeds, segment_s,
label_column=label_column, **kwargs
)
all_labels = fill_all_labels_array(file_labels, all_labels)
if np.unique(file_labels).shape[0] > 2 and kwargs.get('testing'):
raven_tables_sanity_check(
df.start if kwargs.get('only_embed_annotations') else num_embeds,
segment_s, paths, audio_file,
label_df, label_idx_dict, label_column, file_labels, **kwargs
)
return all_labels
[docs]
def filter_df_by_filename(
df_to_filer, file_name, file_name_column = 'audiofilename', model=None
):
df = df_to_filer[df_to_filer[file_name_column] == Path(file_name).as_posix()]
if len(df) == 0:
df = df_to_filer[
df_to_filer[file_name_column] == (
Path(file_name).stem + Path(file_name).suffix
)
]
# if no files are found, match by classifier_prediction files
if len(df) == 0:
df = df_to_filer[
df_to_filer[file_name_column]
== Path(file_name).parent / (
Path(file_name).stem + f'_{model}.json'
)
]
return df
[docs]
def fill_all_labels_array(file_labels, all_labels):
if len(file_labels.shape) > 1:
if len(all_labels) == 0:
all_labels = file_labels
else:
# if file_labels got columns that all_labels don't have
# add noise columns to ensure both can be stacked
if all_labels.shape[-1] < file_labels.shape[-1]:
new_column = np.ones([
len(all_labels),
file_labels.shape[-1] - all_labels.shape[-1]
]) * -1
all_labels = np.hstack([all_labels, new_column])
# if all_labels got columns that file_labels don't have
# add noise columns to ensure both can be stacked
elif all_labels.shape[-1] > file_labels.shape[-1]:
new_column = np.ones([
len(file_labels),
all_labels.shape[-1] - file_labels.shape[-1]
]) * -1
file_labels = np.hstack([file_labels, new_column])
all_labels = np.concatenate((all_labels, file_labels))
else:
all_labels = np.concatenate((all_labels, file_labels))
return all_labels
[docs]
def raven_tables_sanity_check(
embed_timestamps, segment_s, paths, audio_file,
label_df, label_idx_dict, label_column, file_labels,
**kwargs
):
if len(file_labels.shape) > 1:
file_labels = file_labels[:, 0]
if not kwargs.get('only_embed_annotations'):
embed_timestamps = np.arange(embed_timestamps) * segment_s
path = paths.labels_path.joinpath("raven_tables_for_sanity_check")
path.mkdir(exist_ok=True, parents=True)
if (
len(list(path.iterdir())) < 10
): # make sure to only do this a handful of times
df_file_gt = label_df[label_df.audiofilename == audio_file]
df_file_fit = pd.DataFrame()
df_file_fit["start"] = embed_timestamps[file_labels > -1]
df_file_fit["end"] = embed_timestamps[file_labels > -1] + segment_s
inv = {v: k for k, v in label_idx_dict.items()}
df_file_fit[f"label:{label_column}"] = [
inv[i] for i in file_labels[file_labels > -1]
]
raven_gt = create_Raven_annotation_table(df_file_gt, label_column)
raven_fit = create_Raven_annotation_table(df_file_fit, label_column)
raven_fit["Low Freq (Hz)"] = 1500
raven_fit["High Freq (Hz)"] = 2000
raven_gt.to_csv(
path.joinpath(f"{Path(audio_file).stem}_gt.txt"), sep="\t", index=False
)
raven_fit.to_csv(
path.joinpath(f"{Path(audio_file).stem}_fit.txt"), sep="\t", index=False
)
[docs]
def create_Raven_annotation_table(df, label_column, high_freq=1000):
df.index = np.arange(1, len(df) + 1)
raven_df = pd.DataFrame()
raven_df["Selection"] = df.index
raven_df.index = np.arange(1, len(df) + 1)
raven_df["View"] = 'Spectrogram 1'
raven_df["Channel"] = 1
raven_df["Begin Time (s)"] = df.start
raven_df["End Time (s)"] = df.end
raven_df["Low Freq (Hz)"] = 0
raven_df["High Freq (Hz)"] = high_freq
raven_df["Label"] = df[f"label:{label_column}"]
return raven_df
[docs]
def collect_ground_truth_labels(
paths, files, model, segment_s, metadata,
label_df, label_idx_dict, **kwargs
):
ground_truth = np.array([])
for ind, file in tqdm(
enumerate(files),
desc=f"Loading {model} embeddings and split by labels",
leave=False,
):
assert (
Path(metadata["files"]["audio_files"][ind]).stem
== file.stem.split(f"_{model}")[0]
), (
f"File names do not match for {file} and "
f"{metadata['files']['audio_files'][ind]}"
)
if kwargs.get('only_embed_annotations'):
num_embeds = int(
metadata["files"]["file_lengths (s)"][ind]
/ (metadata['segment_length (samples)'] / metadata['sample_rate (Hz)'])
)
else:
num_embeds = metadata["files"]["nr_embeds_per_file"][ind]
ground_truth = build_ground_truth_labels_by_file(
paths,
ind,
model,
num_embeds,
segment_s,
metadata,
ground_truth,
label_df,
label_idx_dict,
**kwargs,
)
return ground_truth
[docs]
def assign_global_get_paths_function(audio_dir):
if not 'get_paths' in globals():
from bacpipe import settings as bapcipe_settings
make_set_paths_func(
audio_dir, bapcipe_settings.main_results_dir
)
[docs]
def ground_truth_by_model(
model,
audio_dir,
label_df=None,
label_idx_dict=None,
label_column='label:species',
paths=None,
annotations_filename="annotations.csv",
overwrite=True,
single_label=True,
bool_filter_labels=False,
**kwargs,
):
"""
Generate ground truth labels that are mapped onto the
timestamps of a model, based on the model-specific
input lengths. This way the embeddings and ground truth
labels have the same lengths, and can be used for downstream
evaluation like probing or clustering.
This function supports single or multi-label generation
of ground truth labels.
A dictionary is created with a numpy array for the labels
and a dictionary to associate the int values with the
corresponding label class.
The labels are processed based on a single annotation file
which requires predefined column names:
`audiofilename`, `start`, `end`, `label:species` (species
can be replaced with other things but the `label:` needs to
be consistent). See 'bacpipe/tests/test_data/annotations.csv'
for an example.
After processing the ground truth, the dictionary is saved
as a numpy file and upon reexecution is simply loaded for
shorter runtime.
Parameters
----------
model : str
model name
audio_dir : str
path to audio data
label_df : pandas.DataFrame, optional
ground truth annotations in specified format, by default None
label_idx_dict : dict, optional
link between int values and class labels
can be auto generated, by default None
label_column : str, optional
name of column in annotation file, by default 'label:species'
paths : SimpleNamespace, optional
convenient object for path handling, by default None
annotations_filename : str, optional
path to annotations csv file, by default "annotations.csv"
overwrite : bool, optional
If True, the dict will be generated again and saved
rather than loaded from a file if already
processed, by default True
single_label : bool, optional
set False if you want multi-label, by default True
bool_filter_labels : bool, optional
set to True, if you want a minimum number of occurrence
for labels to be included in the ground truth. See
settings file for more options and descriptions, by default False
Returns
-------
dict
dictionary of ground truth labels with numpy array
and dict to link int values to class labels
Raises
------
ValueError
if gorund truth file is not found
"""
if paths is None:
assign_global_get_paths_function(audio_dir)
paths = get_paths(model)
if (
overwrite
or not paths.labels_path.joinpath("ground_truth.npy").exists()
):
# check if embeddings exist
try:
path = model_specific_embedding_path(paths.main_embeds_path, model)
except Exception as e:
logger.warning(
f"No embeddings directory seems to exist. {e}"
)
path = None
# get annotations is not provided
if label_df is None or label_idx_dict is None:
if not 'label:' in label_column:
label_column = 'label:' + label_column
if kwargs.get('testing'):
annotations_filename='annotations.csv'
label_df, label_idx_dict = load_labels_and_build_dict(
paths, annotations_filename,
main_label_column=label_column,
audio_dir=audio_dir,
bool_filter_labels=bool_filter_labels,
**kwargs
)
# build files, segment_s and metadata variables
# depending if embeddings exist or not
if path is not None and len(list(path.iterdir())) > 0:
files = list(path.rglob("*.npy"))
files.sort()
metadata = yaml.safe_load(
open(list(path.rglob("metadata.yml"))[0], "r")
)
segment_s = (
metadata["segment_length (samples)"]
/ metadata["sample_rate (Hz)"]
)
else:
files, segment_s, metadata = get_files_if_no_embeds(
audio_dir, model, label_df
)
# find all label columns
label_columns = [col for col in label_df.columns if "label:" in col]
ground_truth_dict = {}
# collect all the ground truth for all the label columns
for label_col in label_columns:
labels = label_col.split("label:")[-1]
ground_truth = collect_ground_truth_labels(
paths,
files,
model,
segment_s,
metadata,
label_df,
label_idx_dict[label_col],
single_label=single_label,
label_column=labels,
**kwargs,
)
ground_truth_dict.update({
f"label:{labels}": ground_truth,
f"label_dict:{labels}": label_idx_dict[label_col],
})
np.save(paths.labels_path.joinpath("ground_truth.npy"), ground_truth_dict)
else:
if not paths.labels_path.joinpath("ground_truth.npy").exists():
error = (
"\nThe ground truth label file ground_truth.npy does not exist. "
"Please create it first by rerunning with `overwrite=True`."
)
logger.exception(error)
raise ValueError(error)
ground_truth_dict = np.load(
paths.labels_path.joinpath("ground_truth.npy"), allow_pickle=True
).item()
return ground_truth_dict
[docs]
def ensure_audio_files(found_audio_files, annotated_audio_files, audio_dir):
if not annotated_audio_files:
return found_audio_files
matching = set(found_audio_files).intersection(set(annotated_audio_files))
if len(matching) < len(annotated_audio_files) or len(matching) == 0:
relative_to_audio_dir = [
Path(f).relative_to(audio_dir) for f in found_audio_files
]
matching = set(relative_to_audio_dir).intersection(set(annotated_audio_files))
if len(matching) < len(annotated_audio_files) or len(matching) == 0:
annotated_stems = [
Path(f).stem for f in annotated_audio_files
]
found_stems = [
Path(f).stem for f in found_audio_files
]
matching = set(annotated_stems).intersection(set(found_stems))
# TODO maybe a case where they are nested but have duplicate filenames
if len(matching) < len(annotated_audio_files) or len(matching) == 0:
not_found = []
found_annotated_audio_files = [
list(Path(audio_dir).rglob(f'*{f.stem + f.suffix}'))[0]
if list(Path(audio_dir).rglob(f'*{f.stem + f.suffix}'))
else not_found.append(f)
for f in annotated_audio_files
]
if not_found:
logger.warning(
f"{not_found} were not found in {audio_dir}. "
"Are you sure you entered the correct path to the audio data?"
)
if len(found_annotated_audio_files) > 0:
found_annotated_audio_files = found_audio_files
return [str(f) for f in found_audio_files]
[docs]
def get_files_if_no_embeds(audio_dir, model, label_df=None):
if label_df is None:
annotated_audio_files = []
else:
annotated_audio_files = label_df.audiofilename.unique()
annotated_audio_files = [Path(f) for f in annotated_audio_files]
module = import_module(
f"bacpipe.model_pipelines.feature_extractors.{model}"
)
segment_s = module.LENGTH_IN_SAMPLES / module.SAMPLE_RATE
metadata = {}
metadata['files'] = {}
from bacpipe import get_audio_files
found_audio_files = get_audio_files(audio_dir)
matching_audio_files = ensure_audio_files(
found_audio_files, annotated_audio_files, audio_dir
)
matching_audio_files.sort()
metadata["segment_length (samples)"] = module.LENGTH_IN_SAMPLES
metadata["sample_rate (Hz)"] = module.SAMPLE_RATE
metadata['files']['audio_files'] = matching_audio_files
metadata['files']['nr_embeds_per_file'] = [
int(
get_duration(path=f) / segment_s
)
for f in matching_audio_files
]
files = [Path(f'{Path(d).stem}_{model}') for d in matching_audio_files]
return files, segment_s, metadata