Source code for palm_tracer.Processing.Filtering

"""Fonction de filtrages"""
from typing import cast

import numpy as np
import pandas as pd

from palm_tracer.Settings.Groups import Filters
from palm_tracer.Settings.Types import CheckRangeFloat, CheckRangeInt


##################################################
[docs] class Filtering: """Classe de filtrages""" filters: Filters ################################################## def __init__(self, filters: Filters): self.filters = filters ##################################################
[docs] def localization(self, datas: pd.DataFrame) -> pd.DataFrame: """ Filtre un DataFrame de localisation. :param datas: DataFrame à filtrer :return: :class:`DataFrame <pandas.DataFrame>` filtré. """ res = datas.copy() if "Integrated Intensity" in res.columns: df = res[res["Integrated Intensity"] > 0] # Suppression des éléments où l'ajustement a échoué. if res.empty: return res fl = self.filters.localization filters = [[self.filters["Plane"], "Plane"], [fl["X"], "X"], [fl["Y"], "Y"], [fl["Z"], "Z"], [fl["Intensity"], "Integrated Intensity"], [fl["Sigma X"], "Sigma X"], [fl["Sigma Y"], "Sigma Y"], [fl["Theta"], "Theta"], [fl["Circularity"], "Circularity"], [fl["MSE XY"], "MSE XY"], [fl["MSE Z"], "MSE Z"]] for filt, col in filters: if isinstance(filt, CheckRangeFloat | CheckRangeInt) and filt.active: limits = filt.value res = res[res[col].between(limits[0], limits[1])] # Bornes incluses return res
##################################################
[docs] def tracking(self, datas: pd.DataFrame) -> pd.DataFrame: """ Filtre un DataFrame de trajectoires. Simpliste uniquement sur la longueur, car il faut le calcul des statistiques sur trajectoires pour le reste. Cependant, il peut s'agir d'une première étape avant, justement, ces calculs de statistiques. :param datas: DataFrame à filtrer :return: :class:`DataFrame <pandas.DataFrame>` filtré. """ res = datas.copy() if res.empty: return res f = cast(CheckRangeInt, self.filters.tracking["Length"]) if f.active: limits = f.value counts = res.groupby("Track").size() # . Comptage par trajectoire keep_ids = counts.index[(counts >= limits[0]) & (counts <= limits[1])] # IDs de trajectoires gardées: min_len <= nb points <= max_len res = res[res["Track"].isin(keep_ids)] # . Filtrage (on garde l'ordre original) return res
##################################################
[docs] def tracks_compute(self, tracks: pd.DataFrame, msd: pd.DataFrame, instant_d: pd.DataFrame, fit: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Filtre un DataFrame de calcul sur les trajectoires. :param tracks: DataFrame de trajectoires :param msd: DataFrame de calcul des MSD :param instant_d: DataFrame de calcul de la diffusion instantanée :param fit: DataFrame de calcul de l'ajustement :return: DataFrames filtrés. """ o_trc = tracks.copy() o_msd = msd.copy() o_ind = instant_d.copy() o_fit = fit.copy() if o_trc.empty: return o_trc, o_msd, o_ind, o_fit f = self.filters.tracking # ----- Base : IDs présents dans les 4 DataFrames (donc une intersection) ----- id_sets: list[set[int]] = [] for df in (o_trc, o_msd, o_ind, o_fit): # évite d'ajouter un set vide qui tuerait l'intersection if not df.empty and "Track" in df.columns: id_sets.append(set(df["Track"].unique().tolist())) keep_ids = set.intersection(*id_sets) # ----- Filtre Longueur ----- f_tmp = cast(CheckRangeInt, f["Length"]) if f_tmp.active: limits_l = f_tmp.value counts = o_trc.groupby("Track").size() ok_len_ids = set(counts.index[(limits_l[0] <= counts) & (counts <= limits_l[1])].tolist()) keep_ids &= ok_len_ids # intersection sur des sets d'IDs # ----- Filtre sur Instant D ----- f_tmp = cast(CheckRangeInt, f["Instant D"]) if f_tmp.active and not o_ind.empty: limits_d = f_tmp.value o_ind = o_ind[o_ind["Track"].isin(keep_ids)] # . Restreindre aux trajectoires admissibles jusqu'ici if not o_ind.empty: val_cols = [c for c in o_ind.columns if c != "Track"] # . Colonnes de valeurs = toutes sauf 'Track' vals = o_ind[val_cols] vals_np = vals.to_numpy(dtype=float) # . Convertir en numpy pour un contrôle fin finite = np.isfinite(vals_np) # . Masque des valeurs finies (ni NaN, ni ±inf) outside = (vals_np <= limits_d[0]) | (vals_np >= limits_d[1]) # Valeurs hors bornes (sur le numpy brut) outside &= finite # . On ne compte les "outside" que là où c'est vraiment une valeur finie n_valid, n_out = finite.sum(axis=1), outside.sum(axis=1) # . Nombre de valeurs valides/hors bornes par ligne pct_out_np = np.zeros_like(n_out, dtype=float) # . Pourcentage hors bornes (évite la division par 0 avec where=) np.divide(n_out, n_valid, out=pct_out_np, where=n_valid > 0) pct_out = pd.Series(pct_out_np * 100.0, index=o_ind.index) # avec une troisieme valeur limit[2] qui serait le pourcentage de fail max autorisé # Ou alors un nouveau setting type Instant D Failure Tolerance (%), je vais mettre 50% ici ok_ids = set(map(int, np.unique(o_ind.loc[pct_out <= 50.0, "Track"].to_numpy()))) keep_ids &= ok_ids # ----- Filtre sur Fit ----- if not o_fit.empty: o_fit = o_fit[o_fit["Track"].isin(keep_ids)] # Restreindre aux trajectoires admissibles jusqu'ici if not o_fit.empty: filters = [ # Quel que soit l'ajustement. [f["D Coeff"], "D(0) (μm²/s)"], # Fit Puissance [f["Alpha"], "Alpha"], [f["Speed"], "Average Speed (Last-First)(μm/s)"], # Fit Exponentiel [f["Confinement"], "Confinement Radius (μm)"]] for filt, col in filters: if col in o_fit.columns and isinstance(filt, CheckRangeFloat | CheckRangeInt) and filt.active: limits = filt.value o_fit = o_fit[o_fit[col].between(limits[0], limits[1])] # Bornes incluses keep_ids &= set(o_fit["Track"].unique().tolist()) # ----- Filtre final des trajectoires restantes ----- if not o_trc.empty: o_trc = o_trc[o_trc["Track"].isin(keep_ids)] if not o_msd.empty: o_msd = o_msd[o_msd["Track"].isin(keep_ids)] if not o_ind.empty: o_ind = o_ind[o_ind["Track"].isin(keep_ids)] if not o_fit.empty: o_fit = o_fit[o_fit["Track"].isin(keep_ids)] return o_trc, o_msd, o_ind, o_fit