Source code for palm_tracer.UI.PALMTracerWidget
"""
Module contenant la classe :class:`PALMTracerWidget` pour l'interface principale de l'application.
Ce module définit la classe :class:`.PALMTracerWidget`, qui crée et gère l'interface utilisateur principale de l'application.
Elle contient des sections de paramètres organisées sous forme de layout,
permettant de modifier différents paramètres pour l'exécution des algorithmes et l'affichage des résultats.
.. todo::
Pour le moment, la partie permettant de mettre en attente et annuler des preview ne fonctionne pas car Napari freeze le temps de la mise à jour.
l'utilisation de thread pour lancer certaines fonctions est problématique à l'heure actuelle.
"""
from pathlib import Path
from typing import Callable, cast, Optional
import napari
import numpy as np
from napari import Viewer
from napari.layers import Points, Shapes
from napari.utils.notifications import show_error, show_info, show_warning
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QApplication, QDoubleSpinBox, QFileDialog, QHBoxLayout, QPushButton, QScrollArea, QSizePolicy, QTabWidget, QVBoxLayout, QWidget
from palm_tracer.PALMTracer import PALMTracer
from palm_tracer.Settings.Types import FileList
from palm_tracer.Tools import Ui
from palm_tracer.Tools.FileIO import open_json, open_tif, save_json
from palm_tracer.UI.GraphViewerWidget import GraphViewerWidget
from palm_tracer.UI.KeyBlocker import KeyBlocker
from palm_tracer.UI.Viewer3DWidget import create_viewer3d
from palm_tracer.UI.ViewerHRWidget import create_viewerhr
try: from napari.qt.threading import thread_worker, FunctionWorker # . Chemin public, à préférer
except ImportError: from superqt.utils import thread_worker, FunctionWorker # Très rare fallback
CONFIG_DIR = Path.home() / ".palm_tracer"
SETTINGS_FILE = CONFIG_DIR / "settings.json"
##################################################
[docs]
class PALMTracerWidget(QWidget):
"""Widget principal gérant toute l'interface"""
# ==================================================
# region Init
# ==================================================
##################################################
def __init__(self, viewer: "napari.viewer.Viewer"):
"""
Initialise le widget principal de l'interface utilisateur.
Cette méthode configure l'interface en ajoutant différentes sections de paramètres dans la mise en page.
:param viewer: Viewer Napari.
"""
super().__init__()
# ----- Viewers -----
self.viewer = viewer
self.viewer_hr: Optional[Viewer] = None
self.viewer_3d: Optional[Viewer] = None
self.viewer_graph: Optional[GraphViewerWidget] = None
# ----- Threading -----
self._processing = False # . Permets d'éviter les clics multiples.
self._worker: Optional[FunctionWorker] = None # Worker Napari en cours
# ----- Objets -----
self.pt = PALMTracer()
self.last_file = ""
self._preview_locs: dict[str, None | np.ndarray] = {"Past": None, "Present": None, "Future": None}
# ----- UI -----
self.key_blocker = KeyBlocker()
self._init_ui()
self._connect_signal()
self._on_startup()
##################################################
def _init_ui(self):
"""Initialisation de l'interface utilisateur du widget."""
# Base
main_layout = QVBoxLayout(self)
Ui.init_layout(main_layout, 0, 0)
# -- Size policy / bornes --
self.layout().setAlignment(Qt.AlignmentFlag.AlignTop)
self.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Preferred)
self.setMinimumWidth(360) # borne basse réaliste (à ajuster)
self.setMinimumHeight(220)
# Viewer Button
self.btn_viewer_gr = QPushButton("Open Graph Viewer")
self.btn_viewer_hr = QPushButton("Open HR Viewer")
self.btn_viewer_3d = QPushButton("Open 3D Viewer")
# Load Setting Button
self.btn_load_setting = QPushButton("Load Setting")
self.btn_reset_setting = QPushButton("Reset Setting")
setting_action_row = QHBoxLayout()
setting_action_row.addWidget(self.btn_load_setting)
setting_action_row.addWidget(self.btn_reset_setting)
action_widget = QWidget() # Encapsulation dans un QWidget
action_widget.setLayout(setting_action_row)
self.layout().addWidget(action_widget)
self.layout().addWidget(self.pt.settings.batch.widget)
self.layout().addWidget(self.pt.settings.calibration.widget)
# Ajout des onglets
tabs = QTabWidget() # Création du QTabWidget
tabs.addTab(self._create_tab([self.pt.settings.localization.widget, self.pt.settings.beads.widget, self.pt.settings.tracking.widget,
self.pt.settings.blinking.widget, self.pt.settings.tracks_compute.widget]), "Processing")
tabs.addTab(self._create_tab([self.pt.settings.gallery.widget,
# self.pt.settings.visualization_hr.widget,
# self.pt.settings.visualization_graph.widget,
self.btn_viewer_gr, self.btn_viewer_hr, self.btn_viewer_3d]), "Visualization")
tabs.addTab(self._create_tab([self.pt.settings.filtering.widget]), "Filtering")
# Layout principal
self.layout().addWidget(tabs)
# Launch/Load Button
self.btn_process = QPushButton("Start Processing")
self.btn_load_result = QPushButton("Load Last Result")
btn_action_row = QHBoxLayout()
btn_action_row.addWidget(self.btn_process)
btn_action_row.addWidget(self.btn_load_result)
action_widget = QWidget()
action_widget.setLayout(btn_action_row)
self.layout().addWidget(action_widget)
##################################################
def _connect_signal(self):
"""Connecte les signaux UI aux callbacks."""
# Boutons QT classiques
self.btn_viewer_gr.clicked.connect(self._open_graph_viewer)
self.btn_viewer_hr.clicked.connect(self._open_hr_viewer)
self.btn_viewer_3d.clicked.connect(self._open_3d_viewer)
self.btn_load_setting.clicked.connect(self._on_load_setting_btn)
self.btn_reset_setting.clicked.connect(self._on_reset_setting_btn)
# lambda *_: Absorbe les arguments du QPushButton pour ne pas envoyer True à load qui a un argument str par défaut vide qui doit le rester.
self.btn_load_result.clicked.connect(lambda *_: self.pt.load())
self.btn_process.clicked.connect(lambda: self._thread_process(self.pt.process))
self.pt.settings.batch["Files"].connect(self._reset_layer) # . Supprime les calques et charge le fichier tif dans un calque Raw
self.pt.settings.localization["Auto Threshold"].connect(self._auto_threshold) # Calcul automatique du Seuil
self.pt.settings.connect(self._on_change_setting) # . Connexion à chaque changement de paramètres
filters = self.pt.settings.filtering["Localization"]
filters["X"].connect(self._add_roi_filter_layer) # . Mise à jour de la ROI dans l'affichage.
filters["Y"].connect(self._add_roi_filter_layer) # . Mise à jour de la ROI dans l'affichage.
# Synchronisation des spin pour tracking et blinking reconnection
s1 = cast(QDoubleSpinBox, self.pt.settings.tracking["Max Distance"].box)
s2 = cast(QDoubleSpinBox, self.pt.settings.blinking["Max Distance"].box)
s1.valueChanged.connect(lambda v: Ui.sync_spin(s2, v))
s2.valueChanged.connect(lambda v: Ui.sync_spin(s1, v))
# Update de preview en changeant de plan
self.viewer.dims.events.current_step.connect(lambda: self._thread_process(self._preview, self._add_preview_layers))
self.viewer.layers.selection.events.active.connect(self._on_select_layer)
self.viewer.window.qt_viewer.installEventFilter(self.key_blocker)
##################################################
def _on_startup(self):
"""Action lors du démarrage après l'initialisation de l'UI."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True) # Création du dossier de config s'il n'existe pas
self._load_setting(SETTINGS_FILE)
##################################################
@staticmethod
def _create_tab(widgets: list[QWidget]) -> QWidget:
"""
Crée un onglet scrollable contenant une liste de widgets.
Le contenu est placé dans une QScrollArea (style "page web") avec un scroll vertical
et une largeur qui s'adapte à l'onglet.
"""
# Widget "conteneur" qui porte le layout réel
tab, layout = Ui.make_tab()
for w in widgets: layout.addWidget(w)
# Important : permet au contenu de ne pas "collapser" et d'être en mode "colonne"
tab.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Maximum)
# ScrollArea permet d'avoir une barre de défilement si la fenêtre est trop petite.
scroll = QScrollArea()
scroll.setWidgetResizable(True) # le contenu suit la largeur disponible.
scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
scroll.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
scroll.setFrameShape(QScrollArea.Shape.NoFrame)
scroll.setWidget(tab)
return scroll
# ==================================================
# endregion Init
# ==================================================
# ==================================================
# region Threading
# ==================================================
##################################################
def _thread_process(self, compute_func: Callable[[], None], post_func: Optional[Callable[[], None]] = None):
"""
Démarre un traitement long dans un thread séparé et mets à jour l'interface.
Cette méthode désactive l'interface utilisateur (UI) et change le curseur en "attente" pendant l'exécution de la fonction passée en paramètre.
Elle vérifie si un fichier est en cours de prévisualisation avant de lancer le traitement.
Le traitement est exécuté dans un thread séparé pour ne pas bloquer l'interface principale de l'application.
:param compute_func: Fonction à exécuter dans un thread séparé. Elle ne doit pas prendre de paramètres et ne retourne rien.
:param post_func: Fonction à exécuter après le thread. Elle ne doit pas prendre de paramètres et ne retourne rien.
"""
if self._processing: return
if self.last_file == "": return
self._processing = True
self._freeze_ui(True)
@thread_worker(start_thread=False)
def _run_background() -> None: compute_func() # STRICTEMENT aucun accès au viewer/layers ici, pragma: no cover — lancement sur thread.
w: FunctionWorker = cast(FunctionWorker, _run_background())
self._worker = w
# s'exécute dans le thread UI
if post_func is not None: w.returned.connect(lambda _ok: post_func())
def _finish(*_args: object) -> None: # UI thread : fin propre
self._worker = None
self._process_done()
w.finished.connect(_finish)
w.errored.connect(lambda e: show_error(f"Error in thread: {e}"))
w.start()
##################################################
def _process_done(self):
"""
Finalise un traitement en réactivant l'interface et mets à jour l'affichage.
Cette méthode est appelée lorsque le traitement est terminé.
Elle réactive l'interface utilisateur (UI), restaure le curseur et effectue les mises à jour nécessaires sur l'interface principale.
Elle doit être appelée depuis le thread principal (GUI).
"""
self._processing = False
self._freeze_ui(False)
show_info("Thread Process done.")
##################################################
def _freeze_ui(self, on: bool) -> None:
"""Gèle/réactive proprement l'UI sans casser la géométrie."""
self.setDisabled(on) # . Au lieu de self.layout().setEnabled(False/True)
self.setUpdatesEnabled(not on) # Stoppe/reprend les repaints
if on: QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
else:
try: QApplication.restoreOverrideCursor()
except RuntimeError: pass # .Aucun curseur à restaurer
# ==================================================
# endregion Threading
# ==================================================
# ==================================================
# region Settings Callback
# ==================================================
##################################################
def _load_setting(self, filename: Path):
"""Chargement d'un fichier de setting."""
if filename.exists():
try:
# Bloque les signaux, agrège les multiples .emit() potentiels :
with self.pt.settings.signal_blocked():
cfg = open_json(str(filename))
show_info(f"Loading the setting file '{filename}'.")
self.pt.settings.update_from_compact_dict(cfg) # self.pt.settings.update_from_dict(cfg) si l'on veut un setting complet
self.pt.settings.localization["Preview"].value = False
self.pt.settings.filtering.deactivate_filters()
except Exception as e:
show_warning(f"Error loading file '{filename}': {e}")
##################################################
def _on_load_setting_btn(self):
"""Action lors d'un clic sur le bouton Load setting."""
filename, _ = QFileDialog.getOpenFileName(None, "Sélectionner un fichier de paramètres", ".", "Fichiers JSON (*.json)")
self._load_setting(Path(filename))
##################################################
def _on_reset_setting_btn(self):
"""Action lors d'un clic sur le bouton Reset setting."""
self.pt.settings.reset()
##################################################
def _on_change_setting(self):
"""Mets à jour le fichier de setting général et la preview à chaque changement de setting."""
# Save settings
save_json(str(SETTINGS_FILE), self.pt.settings.to_compact_dict()) # self.settings.to_dict() si l'on veut un setting complet
self._thread_process(self._preview, self._add_preview_layers)
# ==================================================
# endregion Settings Callback
# ==================================================
# ==================================================
# region Layers Callback
# ==================================================
##################################################
def _remove_layer(self, name: str):
"""Supprime un calque s'il existe et rend silencieuses les erreurs internes à Napari."""
try:
if name in self.viewer.layers: self.viewer.layers.remove(self.viewer.layers[name])
except Exception as e: Ui.print_warning(F"Error when deleting the old layer '{name}' : {e}")
##################################################
def _reset_layer(self):
"""Lors de la mise à jour du batch, le fichier en preview dans Napari est mis à jour."""
self.pt.settings.localization["Preview"].value = False
self.pt.settings.filtering.deactivate_filters()
selected_file = cast(FileList, self.pt.settings.batch["Files"]).get_selected()
if not selected_file:
self.last_file = ""
self.viewer.layers.clear()
return
if self.last_file == selected_file: return
else: self.last_file = selected_file
self.viewer.layers.clear() # Nettoyez tous les layers existants dans le viewer
# Chargez le fichier TIF sélectionné comme un layer Raw dans le viewer
try:
raw_data = open_tif(selected_file)
self.viewer.add_image(raw_data, name="Raw")
show_info(f"Loaded {selected_file} into Napari viewer.")
filters = self.pt.settings.filtering
# Update Max
z, y, x = raw_data.shape
filters.update_limits(x, y, z)
except Exception as e:
show_error(f"Error loading {selected_file}: {e}")
##################################################
def _add_preview_layers(self):
"""Ajoute des calques à Napari pour les localisations sur le plan actuel, précédent et suivant."""
state_args = {
"Past": {"border": 0.2, "edge": 0.5, "color": "cyan", "face": "transparent"},
"Present": {"border": 0.4, "edge": 0.5, "color": "lime", "face": "lime"},
"Future": {"border": 0.2, "edge": 0.5, "color": "orange", "face": "transparent"}
}
for state, points in self._preview_locs.items():
if not self.pt.settings.localization["Preview"].value or points is None or points.size == 0:
self._remove_layer(f"Points {state}")
self._remove_layer(f"ROI {state}")
continue
args = state_args[state]
# Points
l_name = f"Points {state}"
if l_name in self.viewer.layers:
layer = self.viewer.layers[l_name]
layer.data = points # Remplace tous les points
layer.size = 1 # . Remets les différents arguments en cas de nombre de points différents
layer.border_color = args["color"]
layer.border_width = args["border"]
layer.face_color = args["face"]
else: self.viewer.add_points(points, size=1, border_color=args["color"], face_color=args["face"], border_width=args["border"], name=l_name)
self.viewer.layers[l_name].editable = False
# ROIs seulement pour le present
if state != "Present": continue
roi_size = self.pt.settings.localization["ROI Size"].value
roi_shape = self.pt.settings.localization["ROI Shape"].value
half_size = roi_size / 2
if roi_shape == 0: # Ellipses : [[y_center, x_center], [y_radius, x_radius]]
rois = np.array([[[float(y), float(x)], [float(half_size), float(half_size)]] for y, x in points], dtype=np.float32)
s_type = "ellipse"
else: # Rectangles : coins opposés
rois = [[[y - half_size, x - half_size], [y + half_size, x + half_size]] for y, x in points]
s_type = "rectangle"
l_name = f"ROI {state}"
# Si le calque existe, mais n'est pas du bon type, on le supprime
if l_name in self.viewer.layers:
layer = self.viewer.layers[l_name]
layer.data = (rois, len(rois) * [s_type]) # Remplace toutes les formes
layer.edge_color = args["color"] # . Remets les différents arguments en cas de nombre de ROIs différents
layer.edge_width = args["edge"]
layer.face_color = "transparent"
else:
self.viewer.add_shapes(rois, shape_type=s_type, edge_color=args["color"], edge_width=args["edge"], face_color="transparent", name=l_name)
self.viewer.layers[l_name].editable = False
##################################################
def _add_roi_filter_layer(self):
"""Ajoute un calque à Napari pour afficher la zone d'intérêt si le filtre est activé."""
if "Raw" not in self.viewer.layers: return
# Suppression du calque "ROI Filter" s'il existe
l_name = "ROI Filter"
filter_loc = self.pt.settings.filtering["Localization"]
is_xf, is_yf = filter_loc["X"].active, filter_loc["Y"].active
if not is_xf and not is_yf: # . Aucun filtre -> rien à afficher
self._remove_layer(l_name)
return
raw_data = self.viewer.layers["Raw"].data # Récupération du layer Raw
x0, x1 = 0, raw_data.shape[-1] - 1 # . Shape peut-être (Y, X) | (Z, Y, X) | (T, Z, Y, X)
y0, y1 = 0, raw_data.shape[-2] - 1 # . Shape peut-être (Y, X) | (Z, Y, X) | (T, Z, Y, X)
if is_xf:
xf = filter_loc["X"].value
x0, x1 = max(x0, min(xf[0], x1)), max(x0, min(xf[1], x1))
if is_yf:
yf = filter_loc["Y"].value
y0, y1 = max(y0, min(yf[0], y1)), max(y0, min(yf[1], y1))
# Si range dégénéré (ligne/colonne), on peut soit l'accepter, soit ne rien afficher. Ici : si rectangle vide, on ne crée pas de layer.
if x1 <= x0 or y1 <= y0:
self._remove_layer(l_name)
return
# Napari attend un tableau (N, 2) pour un shape, la succession des points à tracer.
rect = [[[y0, x0], [y0, x1], [y1, x1], [y1, x0]]] # . Haut-gauche, haut-droite, bas-droite, bas-gauche
if l_name in self.viewer.layers: self.viewer.layers[l_name].data = rect # Remplace le rectangle
else: # . Création du Calque s'il n'existe pas
layer = self.viewer.add_shapes(rect, shape_type="polygon", name=l_name, edge_color="red", edge_width=0.5, face_color="transparent")
layer.editable = False # . Rendre non éditable (Napari)
layer.visible = True # . L'affiche
##################################################
def _get_actual_image(self, time: int = 0) -> Optional[np.ndarray]:
"""
Récupère l'image actuelle plus ou moins un temps indiqué en paramètres
:param time: Différence de temps entre l'image actuellement affichée et celle désirée.
:return: L'image désirée (image actuellement affichée si time = 0).
"""
if "Raw" not in self.viewer.layers: return None
layer = self.viewer.layers["Raw"] # . Récupération du layer Raw
plane_idx = self.viewer.dims.current_step[0] + time # Récupération de l'index du plan actuellement affiché plus delta de temps
if plane_idx < 0 or plane_idx >= self.viewer.layers["Raw"].data.shape[0]: return None
plane = layer.data[plane_idx] # . Récupération des données du plan affiché
return np.asarray(plane, dtype=np.uint16) # . Renvoie sous le format numpy
##################################################
def _preview(self):
"""Action lors d'un clic sur le bouton de preview."""
if not self.pt.settings.localization["Preview"].value: return
past, present, future = self._get_actual_image(-1), self._get_actual_image(), self._get_actual_image(1)
if present is None: return
s = self.pt.settings.localization.settings
try: t, w, f, fp = (s["Threshold"], s["Watershed"], self.pt.settings.localization.get_fit(), self.pt.settings.localization.get_fit_params())
except Exception: raise
self._preview_locs = {
"Past": None if past is None else self.pt.filter_localizations(self.pt.palm.localization(past, t, w, f, fp))[["Y", "X"]].to_numpy(),
"Present": self.pt.filter_localizations(self.pt.palm.localization(present, t, w, f, fp))[["Y", "X"]].to_numpy(),
"Future": None if future is None else self.pt.filter_localizations(self.pt.palm.localization(future, t, w, f, fp))[["Y", "X"]].to_numpy()
}
# Affichage console (les notifications posent problème en thread externe)
l_past, l_present, l_future = map(lambda x: len(x) if x is not None else 0,
(self._preview_locs.get("Past"), self._preview_locs.get("Present"), self._preview_locs.get("Future")))
print(f"Preview of {l_past + l_present + l_future} detected points ({l_present} on the current frame, "
f"{l_past} on the previous frame, {l_future} on the next frame).")
##################################################
def _auto_threshold(self):
"""Action lors d'un clic sur le bouton auto du seuillage."""
image = self._get_actual_image()
if image is None: return
threshold = self.pt.palm.auto_threshold(image, self.pt.settings.localization.get_fit_params()) # Calcul du seuil automatique
print(f"Auto Threshold: {threshold:.2f}")
# show_info(f"Auto Threshold: {threshold:.2f}") Durant les threads externes, dangereux de faire appel à l'interface
self.pt.settings.localization["Threshold"].value = threshold # . Changement du seuil dans les settings
##################################################
@staticmethod
def _on_select_layer(event):
"""Sélectionne tous les éléments d'un calque Points ou Shapes actif."""
layer = event.value
if layer is None: return
if isinstance(layer, (Shapes, Points)) and layer.data is not None and len(layer.data) > 0:
layer.selected_data = set(range(len(layer.data)))
# ==================================================
# endregion Layers Callback
# ==================================================
# ==================================================
# region Extern Viewer
# ==================================================
##################################################
def _open_hr_viewer(self): # pragma: no cover — Aucun lancement de fenêtre sans controle en CI
"""Ouvre une instance Napari avec le Viewer Haute Résolution, si elle n'existe pas déjà."""
if self.viewer_hr is None:
self.viewer_hr = create_viewerhr(self.pt)
self._bind_viewer_lifecycle("viewer_hr")
##################################################
def _open_3d_viewer(self): # pragma: no cover — Aucun lancement de fenêtre sans controle en CI
"""Ouvre une instance Napari avec le Viewer 3D, si elle n'existe pas déjà."""
if self.viewer_3d is None:
self.viewer_3d = create_viewer3d()
self._bind_viewer_lifecycle("viewer_3d")
##################################################
def _open_graph_viewer(self): # pragma: no cover — Aucun lancement de fenêtre sans controle en CI
"""Ouvre la visionneuse de graphiques, s'il n'existe pas déjà."""
if self.viewer_graph is None:
w = GraphViewerWidget(self.pt)
w.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True)
# Quand le widget est détruit, rémettre la réf à None.
w.destroyed.connect(lambda *_: setattr(self, "viewer_graph", None))
w.resize(1280, 720)
self.viewer_graph = w
# (re)montrer et mettre au premier plan
self.viewer_graph.show()
self.viewer_graph.raise_()
self.viewer_graph.activateWindow()
##################################################
def _bind_viewer_lifecycle(self, viewer_attr: str) -> None: # pragma: no cover — Aucun lancement de fenêtre sans controle en CI
"""Connecte la destruction de la fenêtre Qt d'un viewer Napari à la remise à None."""
viewer = getattr(self, viewer_attr)
if viewer is None: return
qt_window = viewer.window._qt_window # . QMainWindow
qt_window.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose, True) # . Garantit que "close" détruit vraiment la fenêtre.
qt_window.destroyed.connect(lambda *_: setattr(self, viewer_attr, None)) # Quand la fenêtre est détruite, on invalide le pointeur Python.
##################################################
if __name__ == "__main__":
import napari
_viewer = napari.Viewer() # . Crée le viewer Napari
_viewer.title = "PALMTracer" # . Modifier le titre de la fenêtre
_w = PALMTracerWidget(_viewer) # . Crée ton widget en lui passant le viewer
_viewer.window.add_dock_widget(_w, name="Viewer 3D", area="right") # L'ajoute comme dock widget dans la fenêtre Napari
napari.run() # . Lance la boucle Qt gérée par Napari