Source code for palm_tracer.PALMTracer

"""
Module contenant les fonctions de traitement de PALM.

.. todo:: Ajouter clairement un point dans la doc sur le systeme de filtre, l'enregistrement, le calcul sur l'intégralité des éléments
		  et filtre ensuite lors de la visualisation des graph et des sauvegarde si la case est coché...
"""

from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, cast, Optional

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from palm_tracer.Processing import Gallery, Palm, Parsing, Visualization as Viz
from palm_tracer.Processing.Drift import extract_beads
from palm_tracer.Settings import Settings
from palm_tracer.Settings.Groups import BaseSettingGroup, FilteringL, FilteringT
from palm_tracer.Settings.Groups.VisualizationGraph import GRAPH_MODE, GRAPH_SOURCE
from palm_tracer.Settings.Groups.VisualizationHR import HR_LOC_SOURCE, HR_TRC_SOURCE
from palm_tracer.Settings.Types import CheckRangeFloat, CheckRangeInt
from palm_tracer.Tools import FileIO, Logger, Ui
from palm_tracer.Tools.Ui import print_warning

MAX_UI_16 = np.iinfo(np.uint16).max
FILE_STATUS: list[str] = ["No", "Yes", "Yes (Filtered)",
						  "Yes (Reconnected)", "Yes (Reconnected and Filtered)",
						  "Yes (Corrected)", "Yes (Corrected and Filtered)"]


##################################################
[docs] @dataclass class PALMTracer: """Classe principale de PALM Tracer.""" settings: Settings = field(init=False, default_factory=Settings) """Classe principale des paramètres PALMTracer.""" palm: Palm = field(init=False, default_factory=Palm) """Interface vers la DLL C++ Palm.""" _logger: Logger = field(init=False, default_factory=Logger) """Journal d'activité.""" df: dict[str, pd.DataFrame] = field(init=False, default_factory=lambda: { "loc": pd.DataFrame(), "dft": pd.DataFrame(), "bds": pd.DataFrame(), "trc": pd.DataFrame(), "blk": pd.DataFrame(), "MSD": pd.DataFrame(), "InD": pd.DataFrame(), "Fit": pd.DataFrame(), "f_loc": pd.DataFrame(), "f_dft": pd.DataFrame(), "f_trc": pd.DataFrame(), "f_blk": pd.DataFrame(), "f_MSD": pd.DataFrame(), "f_InD": pd.DataFrame(), "f_Fit": pd.DataFrame()}) """Résultats des différents calculs.""" visualization: Optional[np.ndarray] = field(init=False, default=None) """Résultat de la visualisation.""" _path: str = field(init=False, default="") """Dossier de sortie pour le fichier en cours de traitement.""" _stack: Optional[np.ndarray] = field(init=False, default=None) """Pile en cours de traitement.""" _suffix: str = field(init=False, default="") """Suffixe des fichiers pour un traitement (timestamp au format `YYYYMMDD_HHMMSS`).""" KEYS_TO_FILE: dict[str, str] = field(init=False, default_factory=lambda: { "loc": "localizations", "f_loc": "localizations_filtered", "dft": "localizations_corrected", "f_dft": "localizations_corrected_filtered", "bds": "beads", "trc": "tracking", "f_trc": "tracking_filtered", "blk": "tracking_reconnected", "f_blk": "tracking_reconnected_filtered", "MSD": "tracking_MSD", "f_MSD": "tracking_MSD_filtered", "InD": "tracking_InstantD", "f_InD": "tracking_InstantD_filtered", "Fit": "tracking_Fit", "f_Fit": "tracking_Fit_filtered"}) """Alias entre les noms de fichiers et les clé dans le dictionnaire de dataframes.""" # ================================================== # region Initialization # ================================================== ################################################## def __post_init__(self): """Méthode appelée automatiquement après l'initialisation du dataclass.""" filters = self.settings.filtering filters.buttons["reset"].clicked.connect(self.reset_filtered) filters.buttons["update"].clicked.connect(self.update_filtered) filters.buttons["save"].clicked.connect(self.save_filtered) ##################################################
[docs] def is_dll_valid(self) -> bool: """ Vérifie la validité de la DLL utilisée par le plugin. :return: True si la DLL est valide, False sinon. """ return self.palm.is_valid()
##################################################
[docs] def reset_result(self): """Vide entièrement les DataFrame de résultat dans `df`.""" for key in self.df: self.df[key] = pd.DataFrame()
# ================================================== # endregion Initialization # ================================================== # ================================================== # region Getter / Setter # ================================================== ##################################################
[docs] def get_localization_key(self) -> str: """Clé des localisations (filtrée si elle est non vide) et corrigé si elle est non vide également""" if self.df["f_dft"].empty: if self.df["dft"].empty: if self.df["f_loc"].empty: return "loc" return "f_loc" return "dft" return "f_dft"
##################################################
[docs] def get_tracks_key(self) -> str: """Clé des trajectoires (filtrée si elle est non vide) et reconnectée si elle est non vide également.""" if self.df["f_blk"].empty: if self.df["blk"].empty: if self.df["f_trc"].empty: return "trc" return "f_trc" return "blk" return "f_blk"
##################################################
[docs] def get_tracks_compute_key(self) -> list[str]: """Clé des calculs sur trajectoires (filtrés si non vide).""" if self.df["f_MSD"].empty and self.df["f_InD"].empty and self.df["f_Fit"].empty: return ["MSD", "InD", "Fit"] return ["f_MSD", "f_InD", "f_Fit"]
##################################################
[docs] def get_status(self) -> dict[str, str]: """ Retourne un dictionnaire décrivant le statut des tableaux actuellement chargés dans ``self._df`` pour les différentes catégories de données (Localisation, Trajectoires, MSD, Diffusion instantanée, Fit). Cette méthode analyse chaque tableau pour savoir s'il correspond : - à un tableau standard, - à un tableau filtré, - à un tableau reconnecté (pour les trajectoires), - à un tableau corrigé (pour les localisations), - ou à une absence de données. Les statuts retournés sont des chaînes de caractères provenant de la constante globale :data:`FILE_STATUS`. Le dictionnaire retourné contient systématiquement les clés suivantes : ``"Localization"``, ``"Beads"``,``"Tracking"``, ``"MSD"``, ``"Instant D"``, ``"Fit"`` :return: Un dictionnaire ``{str: str}`` contenant le statut de chaque type de tableau. """ res = {"Localization": FILE_STATUS[0], "Tracking": FILE_STATUS[0], "MSD": FILE_STATUS[0], "Instant D": FILE_STATUS[0], "Fit": FILE_STATUS[0]} # --- Localisation --- if self.df["f_dft"].empty: if self.df["dft"].empty: if self.df["f_loc"].empty: if self.df["loc"].empty: res["Localization"] = FILE_STATUS[0] else: res["Localization"] = FILE_STATUS[1] else: res["Localization"] = FILE_STATUS[2] else: res["Localization"] = FILE_STATUS[5] else: res["Localization"] = FILE_STATUS[6] # --- Billes --- if self.df["bds"].empty: res["Beads"] = FILE_STATUS[0] else: res["Beads"] = FILE_STATUS[1] # --- Suivi --- if self.df["f_blk"].empty: if self.df["blk"].empty: if self.df["f_trc"].empty: if self.df["trc"].empty: res["Tracking"] = FILE_STATUS[0] else: res["Tracking"] = FILE_STATUS[1] else: res["Tracking"] = FILE_STATUS[2] else: res["Tracking"] = FILE_STATUS[3] else: res["Tracking"] = FILE_STATUS[4] # --- Calcul sur trajectoires --- tcs = [("MSD", "MSD"), ("InD", "Instant D"), ("Fit", "Fit")] for k1, k2 in tcs: if self.df[f"f_{k1}"].empty: if self.df[k1].empty: res[k2] = FILE_STATUS[0] else: res[k2] = FILE_STATUS[1] else: res[k2] = FILE_STATUS[2] return res
################################################## @property def localizations(self) -> pd.DataFrame: """Getter du :class:`DataFrame <pandas.DataFrame>` de la localisation (filtrée si elle est non vide).""" return self.df[self.get_localization_key()] ################################################## @property def beads(self) -> pd.DataFrame: """Getter du :class:`DataFrame <pandas.DataFrame>` des billes détectées.""" return self.df["bds"] ################################################## @property def tracks(self) -> pd.DataFrame: """Getter du :class:`DataFrame <pandas.DataFrame>` du suivi (filtré s'il est non vide) et reconnecté s'il est non vide également.""" return self.df[self.get_tracks_key()] ################################################## @property def tracks_compute(self) -> dict[str, pd.DataFrame]: """Getter du trio de :class:`DataFrame <pandas.DataFrame>` des calculs sur trajectoires (filtrés si non vide).""" keys = self.get_tracks_compute_key() return {"MSD": self.df[keys[0]], "InD": self.df[keys[1]], "Fit": self.df[keys[2]]} ################################################## @property def path(self) -> str: """Dossier de sortie pour le fichier en cours de traitement.""" return self._path ################################################## @property def stack(self) -> np.ndarray: """Pile en cours de traitement.""" return self._stack ################################################## @property def suffix(self) -> str: """Suffixe des fichiers pour un traitement (timestamp au format `YYYYMMDD_HHMMSS`).""" return self._suffix ################################################## def _output_name(self, name: str, ext: str = "csv") -> str: return f"{self._path}/{name}-{self._suffix}.{ext}" # ================================================== # endregion Getter / Setter # ================================================== # ================================================== # region Process # ================================================== ##################################################
[docs] def load(self, path: str = ""): """Charge les précédents résultats du fichier courant.""" if not self.is_dll_valid(): Ui.print_warning("Process not completed due to missing DLLs.") return # --- Chargement des paramètres --- self._path = self.settings.batch.get_paths()[0] if path == "" else path # Parsing du batch settings_filename = FileIO.get_last_file(self._path, "settings") self._suffix = FileIO.extract_suffix(settings_filename) if not settings_filename or not self._suffix: Ui.print_warning("No valid settings file to load.") return print(f"Loading setting file '{settings_filename}'.") with self.settings.signal_blocked(): cfg = FileIO.open_json(str(settings_filename)) self.settings.update_from_compact_dict(cfg) # self.settings.update_from_dict(cfg) si l'on veut un setting complet self.settings.localization["Preview"].value = False # --- Chargement des fichiers associés à ces paramètres. --- self.reset_result() # Reset result Dataframes print(f"\tLoading files from the '{self._path}' folder with the timestamp {self._suffix}.") for key, fname in self.KEYS_TO_FILE.items(): f = self._output_name(fname) try: if Path(f).is_file(): self.df[key] = pd.read_csv(f) # Lecture du fichier CSV avec pandas print(f"\tFile '{fname}' loaded successfully.") else: self.df[key] = pd.DataFrame() print(f"\tFile '{fname}' not found.") except Exception as e: self.df[key] = pd.DataFrame() print_warning(f"\tError loading file '{fname}': {e}") # --- Chargement de la pile --- try: self._stack = self.settings.batch.get_stacks()[0] print(f"\tStack loaded successfully (size: {self._stack.shape}).") except Exception as e: print(f"\tError loading stack: {e}")
##################################################
[docs] def process(self): """Lance le process de PALM selon les éléments en paramètres.""" if not self.is_dll_valid(): Ui.print_warning("Process not completed due to missing DLLs.") return # --- Parsing du batch --- paths = self.settings.batch.get_paths() stacks = self.settings.batch.get_stacks() if len(stacks) == 0: Ui.print_warning("No files.") return # --- Parcours du batch --- for self._path, self._stack in zip(paths, stacks): # Reset result Dataframes self.reset_result() # Logger Path(self._path).mkdir(parents=True, exist_ok=True) self._suffix = FileIO.get_timestamp_for_files() self._logger.open(self._output_name("log", "log")) self._logger.add("Start Processing.") self._logger.add(f"Output folder: {self._path}") # Save settings FileIO.save_json(self._output_name("settings", "json"), self.settings.to_compact_dict()) # self.settings.to_dict() si l'on veut un setting complet self._logger.add("Settings saved.") # Save meta file (Création du DataFrame et sauvegarde en CSV) depth, height, width = self._stack.shape df = Parsing.get_meta([height, width, depth, self.settings.calibration["Pixel Size"].value, self.settings.calibration["Exposure"].value, self.settings.calibration["Intensity"].value]) df.to_csv(self._output_name("meta"), index=False) self._logger.add("Meta file saved.") # Lancement des traitements self._process_step(self.settings.localization, "localization", ["loc"], self._localization, self.filter_localizations) self._process_step(self.settings.beads, "beads extraction", ["bds"], self._beads_extraction, self.filter_localizations) self._process_step(self.settings.tracking, "tracking", ["trc"], self._tracking, self.filter_tracks) self._process_step(self.settings.blinking, "blinking reconnection", ["blk"], self._blinking_reconnection, self.filter_tracks) self._process_step(self.settings.tracks_compute, "tracks computes", ["MSD", "InD", "Fit"], self._tracks_compute, self._filter_tracks_compute) # Lancement de la Visualisation Haute Résolution if self.settings.visualization_hr.active: self._logger.add("High-resolution visualization enabled.") self._visualization_hr() else: self._logger.add("High-resolution visualization disabled.") # Lancement de la Visualisation graphique if self.settings.visualization_graph.active: self._logger.add("Graphical visualization enabled.") self._visualization_graph() else: self._logger.add("Graphical visualization disabled.") # Lancement de la génération de Galleries if self.settings.gallery.active: self._logger.add("Gallery generation enabled.") self._gallery() else: self._logger.add("Gallery generation disabled.") # Fermeture du Log self._logger.add("Processing complete.") self._logger.close()
################################################## def _process_step(self, group: BaseSettingGroup, name: str, keys: list[str], process_func: Callable, filter_func: Callable): """Etape du processus :param group: Groupe de paramètres lié :param name: Nom de l'étape :param keys: Clé du DataFrame dans le dictionnaire :param process_func: Fonction de traitement de cette étape :param filter_func: Fonction de filtre de cette étape """ if group.active: # Process self._logger.add(f"{name.title()} enabled.") try: process_func() except Exception: raise else: # Load last file possible self._logger.add(f"{name.title()} disabled.") for key in keys: if key in ["dft", "blk"]: continue # On ne charge pas ceux-là f = FileIO.get_last_file(self._path, f"{self.KEYS_TO_FILE[key]}-") if f.endswith("csv"): try: self._logger.add(f"\tLoading a pre-computed {name} file.") self.df[key] = pd.read_csv(f) self._logger.add(f"\tFile '{f}' loaded successfully, {len(self.df[key])} {name}(s) found.") except Exception as e: self.df[key] = pd.DataFrame() self._logger.add(f"\tError loading file '{f}': {e}") else: # Sinon self.df[key] = pd.DataFrame() self._logger.add(f"\tNo pre-computed {name} data.") if len(keys) == 1: f_key = f"f_{keys[0]}" self.df[f_key] = filter_func(self.df[keys[0]]) n_init, n_end = len(self.df[keys[0]]), len(self.df[f_key]) if n_init != n_end: self._logger.add(f"\t\tFiltering of {name} file {n_end} {name} instead of {n_init}: {n_init - n_end} deletion(s).") if self.settings.filtering["Save"].value and n_end != 0: self._logger.add(f"\tSaving the filtered {name} file.") self.df[f_key].to_csv(self._output_name(self.KEYS_TO_FILE[f_key]), index=False) else: self.df[f_key] = pd.DataFrame() else: filter_func() # Cas spécial des tracks_compute qui modifient beaucoup de chose en même temps ################################################## def _localization(self): """Lance la localisation à partir des paramètres de l'interface.""" # Parse settings s = self.settings.localization.settings filters = self.settings.filtering # Filtre sur les plans planes = filters["Plane"].value planes = list(range(planes[0] - 1, planes[1])) if filters["Plane"].active else None fit = self.settings.localization.get_fit() try: fit_params = self.settings.localization.get_fit_params() except Exception: raise # Run command self.df["loc"] = self.palm.localization(self._stack, s["Threshold"], s["Watershed"], fit, fit_params, planes) self._logger.add(f"\tSaving the localization file ({len(self.df['loc'])} localization(s) found).") self.df["loc"].to_csv(self._output_name(self.KEYS_TO_FILE["loc"]), index=False) ################################################## def _beads_extraction(self): """Lance la correction du drift à partir des paramètres de l'interface.""" df = self.localizations # Récupère automatiquement le "bon" dataframe (filtré ou non) if "Integrated Intensity" in df.columns: df = df[df["Integrated Intensity"] > 0] # Suppression des éléments où l'ajustement a échoué. if df.empty: self._logger.add("\tNo localizations data calculated, no additional calculations can be performed.") return s = self.settings.beads.settings try: self.df["bds"] = extract_beads(df, s["Max Distance"], s["3D"], strict=False, k=2) except ValueError: self.df["bds"] = pd.DataFrame() if self.df["bds"].empty: self._logger.add("\tNo beads found.") return self._logger.add(f"\tSaving the beads file ({self.df['bds'].iloc[-1, 0]} beads(s) found).") self.df["bds"].to_csv(self._output_name(self.KEYS_TO_FILE["bds"]), index=False) ################################################## def _tracking(self): """Lance le suivi à partir des paramètres de l'interface.""" df = self.localizations # Récupère automatiquement le "bon" dataframe (filtré ou non) if "Integrated Intensity" in df.columns: df = df[df["Integrated Intensity"] > 0] # Suppression des éléments où l'ajustement a échoué. if df.empty: self._logger.add("\tNo localizations data calculated, no additional calculations can be performed.") return s = self.settings.tracking.settings self.df["trc"] = self.palm.tracking(df, s["Max Distance"]) self._logger.add(f"\tSaving the tracking file ({len(self.df['trc'])} point(s) found).") self.df["trc"].to_csv(self._output_name(self.KEYS_TO_FILE["trc"]), index=False) ################################################## def _blinking_reconnection(self): """Lance le tracking à partir des paramètres de l'interface.""" df = self.df["trc"] # Récupère le dataframe du suivi if df.empty: self._logger.add("\tNo tracking data calculated, no additional calculations can be performed.") return s = self.settings.blinking.settings pixel_size = self.settings.calibration.settings["Pixel Size"] self.df["blk"] = self.palm.blinking_reconnection(df, pixel_size * 1000, s["Mode"], s["Max Duration"], s["Max Distance"] * 1000) self._logger.add(f"\tSaving the reconnected tracking file ({len(self.df['blk'])} point(s) found).") self.df["blk"].to_csv(self._output_name(self.KEYS_TO_FILE["blk"]), index=False) ################################################## def _tracks_compute(self): """Lance les calculs sur les trajectoires à partir des paramètres de l'interface.""" df = self.tracks # Récupère automatiquement le "bon" dataframe (blinking et filtré ou non) if df.empty: self._logger.add("\tNo tracking data calculated, no additional calculations can be performed.") return # Parse settings sc = self.settings.calibration.settings s = self.settings.tracks_compute.settings if not s["MSD"] and not s["Instant Diffusion"] and s["Fit"] == 0: self._logger.add("\tNo metrics selected, no additional calculations can be performed.") return # Run command (pixel size doit rester en micromètre cette fois, car toutes les mesures seront en micromètres carré) res = self.palm.tracks_compute(df, s["MSD"], s["Instant Diffusion"], s["3D"], s["Log Scale"], sc["Pixel Size"], sc["Exposure"], s["Fit"], np.array([s["Fit Length"]], dtype=np.float64)) for key in res: self.df[key] = res[key] for key, name in [("MSD", "MSD"), ("InD", "Instant Diffusion"), ("Fit", "Fit")]: if s[name] and not res[key].empty: self._logger.add(f"\tSaving the {name} file.") res[key].to_csv(self._output_name(self.KEYS_TO_FILE[key]), index=False) # ================================================== # endregion Process # ================================================== # ================================================== # region Filtering # ================================================== ##################################################
[docs] def reset_filtered(self): """Vide entièrement les DataFrames filtrés dans `df`.""" with self.settings.signal_blocked(): self.settings.filtering.reset() for key in self.df: if key.startswith("f_"): self.df[key] = pd.DataFrame()
##################################################
[docs] def update_filtered(self, last: bool = True): """ Recalcul les filtres sur le dernier dataframe disponible pour chacun si last est sélectionné, sinon sur l'original. :param last: Utilise les dernières version des dataframes si `True`, sinon les données brutes serotn utilisées. """ df = {} for key in ["loc", "dft", "trc", "blk", "MSD", "InD", "Fit"]: df[key] = self.df[key] if self.df[f"f_{key}"].empty or not last else self.df[f"f_{key}"] self.df["f_loc"] = self.filter_localizations(df["loc"]) self.df["f_dft"] = self.filter_localizations(df["dft"]) self.df["f_trc"] = self.filter_tracks(df["trc"]) self.df["f_blk"] = self.filter_tracks(df["blk"]) o_name = "f_trc" if self.df["f_blk"].empty else "f_blk" self.df[o_name], self.df["f_MSD"], self.df["f_InD"], self.df["f_Fit"] \ = self.filter_tracks_compute(self.tracks, df["MSD"], df["InD"], df["Fit"]) for key in ["loc", "dft", "trc", "blk"]: f_key = f"f_{key}" if len(self.df[key]) == len(self.df[f_key]): self.df[f_key] = pd.DataFrame() if self.settings.filtering["Save"].value: self.save_filtered()
##################################################
[docs] def save_filtered(self): """Enregistre tous les fichiers filtrés s'ils ne sont pas vide.""" self._suffix = FileIO.get_timestamp_for_files() for key, fname in self.KEYS_TO_FILE.items(): # Il s'agit d'un filtre, il n'est pas vide et il a une taille différente de l'original if "f_" in key and not self.df[key].empty and len(self.df[key]) != len(self.df[key[2:]]): self.df[key].to_csv(self._output_name(fname), index=False)
################################################## def _filter_tracks_compute(self): """Filtre les fichiers de metrique.""" n_init = len(self.df["MSD"]) o_name = self.get_tracks_key() if "f_" not in o_name: o_name = f"f_{o_name}" # Si aucun filtre la clé sera sans le f_ devant self.df[o_name], self.df["f_MSD"], self.df["f_InD"], self.df["f_Fit"] \ = self.filter_tracks_compute(self.tracks, self.df["MSD"], self.df["InD"], self.df["Fit"]) n_end = len(self.df["f_MSD"]) if n_init != n_end: self._logger.add(f"\t\tFiltering of tracks compute files {n_end} tracks instead of {n_init}: {n_init - n_end} deletion(s)") if self.settings.filtering["Save"].value: for key, name in [(o_name, "tracking"), ("f_MSD", "MSD"), ("f_InD", "Instant Diffusion"), ("f_Fit", "Fit")]: if not self.df[key].empty: self._logger.add(f"\tSaving the filtered {name} file.") self.df[key].to_csv(self._output_name(self.KEYS_TO_FILE[key]), index=False) else: for key in ["f_MSD", "f_InD", "f_Fit"]: self.df[key] = pd.DataFrame() ##################################################
[docs] def filter_localizations(self, datas: pd.DataFrame) -> pd.DataFrame: """ Filtre un DataFrame de localisation. :param datas: DataFrame à filtrer :return: :class:`DataFrame <pandas.DataFrame>` filtré. """ res = datas.copy() if "Integrated Intensity" in res.columns: df = res[res["Integrated Intensity"] > 0] # Suppression des éléments où l'ajustement a échoué. if res.empty: return res f = self.settings.filtering fl = cast(FilteringL, f["Localization"]) filters = [[f["Plane"], "Plane"], [fl["X"], "X"], [fl["Y"], "Y"], [fl["Z"], "Z"], [fl["Intensity"], "Integrated Intensity"], [fl["Sigma X"], "Sigma X"], [fl["Sigma Y"], "Sigma Y"], [fl["Theta"], "Theta"], [fl["Circularity"], "Circularity"], [fl["MSE XY"], "MSE XY"], [fl["MSE Z"], "MSE Z"]] for filt, col in filters: if isinstance(filt, CheckRangeFloat | CheckRangeInt) and filt.active: limits = filt.value res = res[res[col].between(limits[0], limits[1])] # Bornes incluses return res
##################################################
[docs] def filter_tracks(self, datas: pd.DataFrame) -> pd.DataFrame: """ Filtre un DataFrame de trajectoires. :param datas: DataFrame à filtrer :return: :class:`DataFrame <pandas.DataFrame>` filtré. """ res = datas.copy() if res.empty: return res f = cast(CheckRangeInt, cast(FilteringT, self.settings.filtering["Tracks"])["Length"]) # Linter passage if f.active: limits = f.value counts = res.groupby("Track").size() # . Comptage par trajectoire keep_ids = counts.index[(counts >= limits[0]) & (counts <= limits[1])] # IDs de trajectoires gardées: min_len <= nb points <= max_len res = res[res["Track"].isin(keep_ids)] # . Filtrage (on garde l'ordre original) return res
##################################################
[docs] def filter_tracks_compute(self, tracks: pd.DataFrame, msd: pd.DataFrame, instant_d: pd.DataFrame, fit: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Filtre un DataFrame de calcul sur les trajectoires. :param tracks: DataFrame de trajectoires :param msd: DataFrame de calcul des MSD :param instant_d: DataFrame de calcul de la diffusion instantanée :param fit: DataFrame de calcul de l'ajustement :return: DataFrames filtrés. """ o_trc = tracks.copy() o_msd = msd.copy() o_ind = instant_d.copy() o_fit = fit.copy() if o_trc.empty: return o_trc, o_msd, o_ind, o_fit f = cast(FilteringT, self.settings.filtering["Tracks"]) # ----- Base : tous les IDs présents dans la référence ----- keep_ids: set = set(o_trc["Track"].unique().tolist()) # ----- Filtre Longueur ----- f_tmp = cast(CheckRangeInt, f["Length"]) if f_tmp.active: limits_l = f_tmp.value counts = o_trc.groupby("Track").size() ok_len_ids = set(counts.index[(limits_l[0] <= counts) & (counts <= limits_l[1])].tolist()) keep_ids &= ok_len_ids # intersection sur des sets d'IDs # ----- Filtre sur Instant D ----- f_tmp = cast(CheckRangeInt, f["Instant D"]) if f_tmp.active and not o_ind.empty: limits_d = f_tmp.value o_ind = o_ind[o_ind["Track"].isin(keep_ids)] # . Restreindre aux trajectoires admissibles jusqu'ici if not o_ind.empty: val_cols = [c for c in o_ind.columns if c != "Track"] # . Colonnes de valeurs = toutes sauf 'Track' vals = o_ind[val_cols] vals_np = vals.to_numpy(dtype=float) # . Convertir en numpy pour un contrôle fin finite = np.isfinite(vals_np) # . Masque des valeurs finies (ni NaN, ni ±inf) outside = (vals_np <= limits_d[0]) | (vals_np >= limits_d[1]) # Valeurs hors bornes (sur le numpy brut) outside &= finite # . On ne compte les "outside" que là où c'est vraiment une valeur finie n_valid, n_out = finite.sum(axis=1), outside.sum(axis=1) # . Nombre de valeurs valides/hors bornes par ligne pct_out_np = np.zeros_like(n_out, dtype=float) # . Pourcentage hors bornes (évite la division par 0 avec where=) np.divide(n_out, n_valid, out=pct_out_np, where=n_valid > 0) pct_out = pd.Series(pct_out_np * 100.0, index=o_ind.index) # avec une troisieme valeur limit[2] qui serait le pourcentage de fail max autorisé # Ou alors un nouveau setting type Instant D Failure Tolerance (%), je vais mettre 50% ici ok_ids = set(map(int, np.unique(o_ind.loc[pct_out <= 50.0, "Track"].to_numpy()))) keep_ids &= ok_ids # ----- Filtre sur Fit ----- if not o_fit.empty: o_fit = o_fit[o_fit["Track"].isin(keep_ids)] # Restreindre aux trajectoires admissibles jusqu'ici if not o_fit.empty: filters = [ # Quel que soit l'ajustement. [f["D Coeff"], "D(0) (μm²/s)"], # Fit Puissance [f["Alpha"], "Alpha"], [f["Speed"], "Average Speed (Last-First)(μm/s)"], # Fit Exponentiel [f["Confinement"], "Confinement Radius (μm)"]] for filt, col in filters: if col in o_fit.columns and isinstance(filt, CheckRangeFloat | CheckRangeInt) and filt.active: limits = filt.value o_fit = o_fit[o_fit[col].between(limits[0], limits[1])] # Bornes incluses keep_ids &= set(o_fit["Track"].unique().tolist()) # ----- Filtre final des trajectoires restantes ----- if not o_trc.empty: o_trc = o_trc[o_trc["Track"].isin(keep_ids)] if not o_msd.empty: o_msd = o_msd[o_msd["Track"].isin(keep_ids)] if not o_ind.empty: o_ind = o_ind[o_ind["Track"].isin(keep_ids)] if not o_fit.empty: o_fit = o_fit[o_fit["Track"].isin(keep_ids)] return o_trc, o_msd, o_ind, o_fit
# ================================================== # endregion Filtering # ================================================== # ================================================== # region Visualization # ================================================== ##################################################
[docs] def add_color_to_tracks(self, datas: pd.DataFrame, source: str) -> pd.DataFrame: """ Ajoute une couleur pour chaque point des trajectoires en fonction d'un critère agrégé au niveau **Track**. Règles : - Si source == "Track Number" : couleur = (Track-1) % MAX_UI_16 + 1 - Si source ∈ {"Length", "Instant D", "MSD", "Total Intensity"} : * on utilise la table ``self.tracks_compute["Fit"]`` (1 ligne par Track) pour récupérer la métrique. * si `Fit` est vide, on déclenche le calcul puis on réessaie ; si toujours vide, fallback = "Track Number". * si une seule piste valide ou si `min==max`, toutes les pistes prennent la couleur médiane `MAX_UI_16//2`. * sinon, étalonnage linéaire `min→1`, `max→MAX_UI_16`. * toute piste absente de `Fit` ou `NaN` sur la métrique retombe sur la couleur "Track Number". :param datas: DataFrame des points de trajectoires, doit contenir au minimum la colonne 'Track'. :param source: Critère de coloration ("Track Number", "Length", "Instant D", "MSD", "Total Intensity"). :return: Copie de `datas` avec une colonne 'Color' de type UInt16. """ res = datas.copy() # HR_TRC_SOURCE = ["All", "Track Number", "Length", "Instant D", "MSD", "Total Intensity"] # Chemin rapide : simple palette périodique par numéro de piste if source == "Track Number": res = res.assign(Color=((res["Track"] - 1) % MAX_UI_16 + 1).astype("UInt16")) return res # Récupération / calcul du Fit (1 ligne par Track) s'il manque fit = self.tracks_compute["Fit"] if fit.empty: # . Vide (non calculé) self._logger.add("\t\tTracks compute to be performed to define a color during visualization.") # On active le fit lineaire si aucun n'est sélectionné. if self.settings.tracks_compute["Fit"].value == 0: self.settings.tracks_compute["Fit"].value = 1 self._tracks_compute() # . On lance le calcul fit = self.tracks_compute["Fit"] # On reaffecte le resultat if fit.empty: # . Toujours vide (erreur de calcul ou autre, on prend le numéro des trajectoires par défaut). res = res.assign(Color=((res["Track"] - 1) % MAX_UI_16 + 1).astype("UInt16")) return res # Normalisation : mapping des noms de métriques metric_by_source = { "Length": "Length", "Total Intensity": "Total Intensity", "Instant D": "D(0) (μm²/s)", "MSD": "MSD(0) (μm²)", } metric = metric_by_source[source] vmin, vmax = fit[metric].min(), fit[metric].max() # vmin, vmax = fit[metric].quantile([0.05, 0.95]) A envisager au lieu du min et max en cas d'outlier. if len(fit) == 1 or vmin >= vmax: res["Color"] = MAX_UI_16 // 2 # Cas Uniforme else: # Étalonnage linéaire : min→1, max→MAX_UI_16 (inclusif), arrondi au plus proche scale = (MAX_UI_16 - 1) / (vmax - vmin) vals = fit[metric].to_numpy(dtype=float) colors = np.rint(1.0 + (vals - vmin) * scale).astype(np.int64) np.clip(colors, 1, MAX_UI_16, out=colors) color_map = dict(zip(fit["Track"].to_numpy(), colors.astype(np.uint16))) # Application par map (vectorisé) : on remplit avec le fallback quand absent mapped = res["Track"].map(color_map) # 'mapped' est de type float si NaN possibles → on remplace NaN par fallback, puis cast en UInt16 res["Color"] = mapped.fillna(MAX_UI_16 // 2).astype("UInt16") return res
################################################## def _visualization_hr(self): """Lance la creation d'une visualisation haute résolution à partir des paramètres passés en paramètres.""" # Parse settings s = self.settings.visualization_hr.settings # Création de l'image finale depth, height, width = self._stack.shape if s["Type"] == 0: if self.localizations.empty: self._logger.add(f"\tNo localization data for high-resolution visualization.") else: sources = HR_LOC_SOURCE[1:] if s["Source L"] == 0 else [HR_LOC_SOURCE[s["Source L"]]] for source in sources: visualization = Viz.render_hr_image(width, height, s["Ratio"], self.localizations[["X", "Y", source]].to_numpy()) self._logger.add(f"\tSaving high-resolution visualization (x{s['Ratio']}, {source}).") FileIO.save_png(visualization, self._output_name(f"visualization_x{s['Ratio']}_{source}", "png")) else: if self.tracks.empty: self._logger.add(f"\tNo tracking data for high-resolution visualization.") else: sources = HR_TRC_SOURCE[1:] if s["Source T"] == 0 else [HR_TRC_SOURCE[s["Source T"]]] for source in sources: tracks = self.add_color_to_tracks(self.tracks, source) tracks.to_csv(self._output_name("tracking_hr_color"), index=False) visualization = Viz.render_tracks_image(width, height, s["Ratio"], tracks) visualization = FileIO.grayscale_to_color(visualization, "viridis") self._logger.add(f"\tSaving tracking high-resolution visualization (x{s['Ratio']}, {source}).") FileIO.save_png(visualization, self._output_name(f"visualization_tracks_x{s['Ratio']}_{source}", "png")) ################################################## def _visualization_graph(self): """Lance la creation d'une visualisation graphique à partir des paramètres passés en paramètres.""" if self.localizations.empty: self._logger.add(f"\tNo localization data for graphical visualization.") return # Parse settings s = self.settings.visualization_graph.settings sources = GRAPH_SOURCE[1:] if s["Source"] == 0 else [GRAPH_SOURCE[s["Source"]]] modes = GRAPH_MODE[1:] if s["Mode"] == 0 else [GRAPH_MODE[s["Mode"]]] for source in sources: loc = self.localizations[["Plane", source]].to_numpy() if np.all(loc[:, 1] == loc[0, 1]): self._logger.add(f"\tCanceling the graphical visualization: {source} uniform.") continue for mode in modes: fig, ax = plt.subplots() if mode == "Histogram": Viz.plot_histogram(ax, loc[:, 1], source + " Histogram", True, True, False) elif mode == "Plane Heat Map": Viz.plot_plane_heatmap(ax, loc, source + " Heatmap") else: # elif mode == "Plane Violin": Viz.plot_plane_violin(ax, loc, source + " Violin") self._logger.add(f"\tSaving graphical visualization ({mode}, {source}).") fig.savefig(self._output_name(f"graph_{mode}_{source}", "png"), bbox_inches="tight") plt.close(fig) ################################################## def _gallery(self): """Lance la génération d'une galerie à partir des paramètres passés en paramètres.""" s = self.settings.gallery.settings if self.localizations.empty: self._logger.add(f"\tNo localization data for gallery generation.") return gallery = Gallery.make_gallery(self._stack, self.localizations, s["ROI Size"], s["ROIs Per Line"]) self._logger.add(f"\tSaving gallery ({s}).") FileIO.save_tif(gallery, self._output_name(f"gallery_{s['ROI Size']}_{s['ROIs Per Line']}", "tif"))