Module matisse_controller.gui.control_application

Source code
import queue
import sys
import traceback
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import redirect_stdout

from PyQt5.QtCore import pyqtSlot
from PyQt5.QtWidgets import QVBoxLayout, QMainWindow, QWidget, QInputDialog, QMessageBox, QApplication, QDialog, \
    QFileDialog

import matisse_controller.config as cfg
import matisse_controller.matisse as matisse
from matisse_controller.gui import utils
from matisse_controller.gui.dialogs import ConfigurationDialog
from matisse_controller.gui.dialogs.ple_analysis_dialog import PLEAnalysisDialog
from matisse_controller.gui.dialogs.ple_scan_dialog import PLEScanDialog
from matisse_controller.gui.dialogs.single_acquisition_dialog import SingleAcquisitionDialog
from matisse_controller.gui.logging_stream import LoggingStream
from matisse_controller.gui.utils import handled_function, handled_slot
from matisse_controller.gui.widgets import LoggingArea, StatusMonitor
from matisse_controller.matisse import Matisse
from matisse_controller.shamrock_ple import PLE


class ControlApplication(QApplication):
    """
    A comprehensive control center to make use of the APIs provided in this package.
    """

    EXIT_CODE_RESTART = 42  # Answer to the Ultimate Question of Life, the Universe, and Everything
    CONFIRM_WAVELENGTH_CHANGE_THRESHOLD = 10

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Non-handled functions only here
        self.setup_window()
        self.setup_logging()
        self.setup_menus()
        self.setup_slots()

        # Handled functions can go here
        self.setup_matisse()
        self.setup_widgets()

        # Other setup
        self.aboutToQuit.connect(self.clean_up)
        self.work_executor = ThreadPoolExecutor()
        self.matisse_worker: Future = None
        self.ple: PLE = PLE(self.matisse)
        self.ple_scan_worker: Future = None
        self.ple_analysis_worker: Future = None
        self.single_acquisition_worker: Future = None

        container = QWidget()
        container.setLayout(self.layout)
        self.window.setCentralWidget(container)
        self.window.show()

    def setup_window(self):
        self.window = window = QMainWindow()
        self.layout = QVBoxLayout()
        window.setWindowTitle('Matisse Controller')
        window.resize(700, 300)

    def setup_logging(self):
        """Initialize logging queue and redirect stdout to the logging area."""
        self.log_queue = queue.Queue()
        self.log_area = LoggingArea(self.log_queue)
        self.log_area.setReadOnly(True)
        self.layout.addWidget(self.log_area)

        # Set up a context manager to redirect stdout to the log window
        self.log_redirector = redirect_stdout(LoggingStream(self.log_queue))
        self.log_redirector.__enter__()

    def setup_menus(self):
        """Initialization of items in the menu bar go here."""
        menu_bar = self.window.menuBar()

        console_menu = menu_bar.addMenu('Console')
        self.clear_log_area_action = console_menu.addAction('Clear Log')
        self.close_plots_action = console_menu.addAction('Close All Plots')
        self.configuration_action = console_menu.addAction('Configuration')
        reset_menu = console_menu.addMenu('Reset')
        self.reset_all_action = reset_menu.addAction('All')
        self.reset_matisse_motors_action = reset_menu.addAction('Matisse Motors')
        self.reset_matisse_piezos_action = reset_menu.addAction('Stabilization Piezos')
        self.reset_matisse_tasks_action = reset_menu.addAction('Matisse Tasks')
        self.reset_ple_tasks_action = reset_menu.addAction('PLE Tasks')
        self.restart_action = console_menu.addAction('Restart')

        set_menu = menu_bar.addMenu('Set')
        self.set_wavelength_action = set_menu.addAction('Wavelength')
        self.set_bifi_approx_wavelength_action = set_menu.addAction('BiFi Approx. Wavelength')
        self.set_bifi_motor_pos_action = set_menu.addAction('BiFi Motor Position')
        self.set_thin_eta_motor_pos_action = set_menu.addAction('Thin Etalon Motor Position')
        self.set_piezo_eta_pos_action = set_menu.addAction('Piezo Etalon Position')
        self.set_slow_piezo_pos_action = set_menu.addAction('Slow Piezo Position')
        self.set_refcell_pos_action = set_menu.addAction('RefCell Position')

        scan_menu = menu_bar.addMenu('Scan')
        self.bifi_scan_action = scan_menu.addAction('Birefringent Filter')
        self.thin_eta_scan_action = scan_menu.addAction('Thin Etalon')
        self.scan_device_up_action = scan_menu.addAction('Scan Device Up')
        self.scan_device_down_action = scan_menu.addAction('Scan Device Down')
        self.stop_scan_device_action = scan_menu.addAction('Stop Scanning Device')

        stabilization_menu = menu_bar.addMenu('Stabilization')
        toggle_control_loop_menu = stabilization_menu.addMenu('Toggle Control Loop')
        self.slow_pz_control_action = toggle_control_loop_menu.addAction('Slow Piezo')
        self.slow_pz_control_action.setCheckable(True)
        self.thin_eta_control_action = toggle_control_loop_menu.addAction('Thin Etalon')
        self.thin_eta_control_action.setCheckable(True)
        self.piezo_eta_control_action = toggle_control_loop_menu.addAction('Piezo Etalon')
        self.piezo_eta_control_action.setCheckable(True)
        self.fast_pz_control_action = toggle_control_loop_menu.addAction('Fast Piezo')
        self.fast_pz_control_action.setCheckable(True)
        self.lock_laser_action = stabilization_menu.addAction('Toggle Lock Laser')
        self.set_recommended_fast_pz_setpoint_action = stabilization_menu.addAction('Set Recommended Fast Pz Setpoint')
        self.auto_stabilize_action = stabilization_menu.addAction('Toggle Auto Stabilization')

        ple_menu = menu_bar.addMenu('Shamrock')
        self.start_ple_scan_action = ple_menu.addAction('Start PLE Scan')
        self.analyze_ple_action = ple_menu.addAction('Start PLE Analysis')
        self.view_existing_analysis_action = ple_menu.addAction('View PLE Analysis')
        self.single_acquisition_action = ple_menu.addAction('View Single Acquisition')

        self.control_loop_actions = [self.slow_pz_control_action, self.thin_eta_control_action,
                                     self.piezo_eta_control_action, self.fast_pz_control_action]

    def setup_slots(self):
        """
        Connection of Qt signals to Qt slots goes here.

        Please note that execution of slots will block the UI thread by default.
        """
        # Console
        self.clear_log_area_action.triggered.connect(self.clear_log_area)
        self.close_plots_action.triggered.connect(self.close_plots)
        self.configuration_action.triggered.connect(self.open_configuration)
        self.reset_all_action.triggered.connect(self.reset)
        self.reset_matisse_motors_action.triggered.connect(self.reset_motors_only)
        self.reset_matisse_piezos_action.triggered.connect(self.reset_piezos_only)
        self.reset_matisse_tasks_action.triggered.connect(self.reset_matisse_tasks_only)
        self.reset_ple_tasks_action.triggered.connect(self.reset_ple_tasks_only)
        self.restart_action.triggered.connect(self.restart)

        # Set
        self.set_wavelength_action.triggered.connect(self.set_wavelength_dialog)
        self.set_bifi_approx_wavelength_action.triggered.connect(self.set_bifi_approx_wavelength_dialog)
        self.set_bifi_motor_pos_action.triggered.connect(self.set_bifi_motor_pos_dialog)
        self.set_thin_eta_motor_pos_action.triggered.connect(self.set_thin_eta_motor_pos_dialog)
        self.set_piezo_eta_pos_action.triggered.connect(self.set_piezo_eta_pos_dialog)
        self.set_slow_piezo_pos_action.triggered.connect(self.set_slow_piezo_pos_dialog)
        self.set_refcell_pos_action.triggered.connect(self.set_refcell_pos_dialog)

        # Scan
        self.bifi_scan_action.triggered.connect(self.start_bifi_scan)
        self.thin_eta_scan_action.triggered.connect(self.start_thin_etalon_scan)

        # PLE
        self.scan_device_up_action.triggered.connect(self.scan_device_up)
        self.scan_device_down_action.triggered.connect(self.scan_device_down)
        self.stop_scan_device_action.triggered.connect(self.stop_scanning_device)

        # Stabilization
        self.slow_pz_control_action.triggered.connect(self.toggle_slow_piezo_control)
        self.thin_eta_control_action.triggered.connect(self.toggle_thin_etalon_control)
        self.piezo_eta_control_action.triggered.connect(self.toggle_piezo_etalon_control)
        self.fast_pz_control_action.triggered.connect(self.toggle_fast_piezo_control)
        self.lock_laser_action.triggered.connect(self.toggle_lock_laser)
        self.set_recommended_fast_pz_setpoint_action.triggered.connect(self.set_recommended_fast_pz_setpoint)
        self.auto_stabilize_action.triggered.connect(self.toggle_auto_stabilization)

        # Shamrock
        self.start_ple_scan_action.triggered.connect(self.start_ple_scan)
        self.analyze_ple_action.triggered.connect(self.analyze_ple_data)
        self.view_existing_analysis_action.triggered.connect(self.view_existing_analysis)
        self.single_acquisition_action.triggered.connect(self.take_single_acquisition)

    @handled_function
    def setup_widgets(self):
        """Initialize any widgets the UI needs to run correctly."""
        self.status_monitor_queue = queue.Queue()
        self.status_monitor = StatusMonitor(self.matisse, self.status_monitor_queue)
        self.layout.addWidget(self.status_monitor)

    @handled_function
    def setup_matisse(self):
        try:
            self.matisse: Matisse = Matisse()
            if self.matisse.all_control_loops_on() and not self.matisse.is_lock_correction_on():
                self.matisse.start_laser_lock_correction()
        except Exception as err:
            self.matisse: Matisse = None
            raise err

    @pyqtSlot()
    def clean_up(self):
        """
        This method is run before the GUI exits, think of it like `__del__`.

        Don't call this elsewhere unless you know what you're doing.
        """
        self.reset(reset_motors=False, reset_piezos=False)
        del self.matisse

        # Clean up widgets with running threads.
        self.status_monitor.clean_up()
        self.log_area.clean_up()

        PLE.clean_up_globals()

        self.log_redirector.__exit__(None, None, None)

    def error_dialog(self):
        """Display an error dialog box with details of the most recent exception raised."""
        stack = list(traceback.format_exception(*sys.exc_info()))
        # Pick length of longest line in stack, with a cutoff at 185
        desired_width = min(max([len(line) for line in stack]), 185)
        description = stack.pop()
        print(utils.red_text(description), end='')
        # Remove entries for handled_function decorator, for clarity
        stack = list(filter(lambda item: 'in handled_function_wrapper' not in item, stack))
        dialog = QMessageBox(icon=QMessageBox.Critical)
        dialog.setWindowTitle('Error')
        # Adding the underscores is a hack to resize the QMessageBox because it's not normally resizable.
        # This looks good in Windows, haven't tested other platforms. Sorry :(
        dialog.setText(f"{description + '_' * desired_width}\n\n{''.join(stack)}")
        dialog.exec()

    @handled_slot(bool)
    def clear_log_area(self, checked):
        self.log_area.clear()

    @handled_slot(bool)
    def close_plots(self, checked):
        self.matisse.close_all_plots()

    @handled_slot(bool)
    def open_configuration(self, checked):
        dialog = ConfigurationDialog()
        dialog.exec()

    @handled_slot(bool)
    def reset(self, checked=False, reset_motors=True, reset_piezos=True, reset_matisse_tasks=True,
              reset_ple_tasks=True):
        """
        Reset Matisse to a 'good' default state: not locked or stabilizing, motors reset, all tasks finished, etc.

        Also stops any PLE analysis tasks.

        Parameters
        ----------
        checked : bool
            not used
        reset_motors : bool
            whether to reset the Matisse birefringent filter and thin etalon motors to their configured reset positions
        reset_piezos : bool
            whether to reset the Matisse stabilization piezos
        reset_matisse_tasks : bool
            whether to trigger the exit flag and quit any running tasks (like scans)
        reset_ple_tasks : bool
            whether to stop any running PLE-related tasks
        """
        if self.matisse:
            if reset_matisse_tasks:
                self.matisse.exit_flag = True
                if self.matisse_worker is not None and self.matisse_worker.running():
                    print('Waiting for Matisse tasks to complete.')
                    self.matisse_worker.result()
                self.matisse_worker = None
            if self.matisse.is_stabilizing():
                self.matisse.stabilize_off()
            if self.matisse.is_lock_correction_on():
                self.matisse.stop_laser_lock_correction()
            if reset_motors:
                self.matisse.reset_motors()
            if reset_piezos:
                self.matisse.reset_stabilization_piezos()

        if reset_ple_tasks:
            self.ple.stop_ple_tasks()

            if self.ple_scan_worker and self.ple_scan_worker.running():
                print('Waiting for PLE scan to complete.')
                self.ple_scan_worker.result()
            self.ple_scan_worker = None

            if self.ple_analysis_worker and self.ple_analysis_worker.running():
                print('Waiting for PLE analysis to complete.')
                self.ple_analysis_worker.result()
            self.ple_analysis_worker = None

            if self.single_acquisition_worker and self.single_acquisition_worker.running():
                print('Waiting for acquisition to complete.')
                self.single_acquisition_worker.result()
            self.single_acquisition_worker = None

        if self.matisse:
            self.matisse.exit_flag = False

        print('Done.')

    @handled_slot(bool)
    def reset_motors_only(self, checked):
        self.reset(reset_piezos=False, reset_matisse_tasks=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_piezos_only(self, checked):
        self.reset(reset_motors=False, reset_matisse_tasks=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_matisse_tasks_only(self, checked):
        self.reset(reset_motors=False, reset_piezos=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_ple_tasks_only(self, checked):
        self.reset(reset_motors=False, reset_piezos=False, reset_matisse_tasks=False)

    @handled_slot(bool)
    def restart(self, checked):
        self.exit(ControlApplication.EXIT_CODE_RESTART)

    @handled_slot(bool)
    def set_wavelength_dialog(self, checked):
        """
        Open a dialog to set the wavelength of the Matisse. If the difference in wavleength is greater than or equal to
        the wavelength change threshold, display a warning to confirm the change.
        """
        current_wavelength = self.matisse.target_wavelength
        if current_wavelength is None:
            current_wavelength = self.matisse.wavemeter_wavelength()
        target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Wavelength', 'Wavelength (nm): ',
                                                            current_wavelength, decimals=3,
                                                            min=cfg.get(cfg.WAVELENGTH_LOWER_LIMIT),
                                                            max=cfg.get(cfg.WAVELENGTH_UPPER_LIMIT))
        if success:
            if abs(current_wavelength - target_wavelength) >= ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD:
                answer = QMessageBox.warning(self.window, 'Large Wavelength Change',
                                             f"The desired wavelength, {target_wavelength} nm, is more than "
                                             f"{ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD} nm "
                                             'away from the current wavelength. Are you sure?',
                                             QMessageBox.Yes | QMessageBox.No, defaultButton=QMessageBox.No)
                if answer == QMessageBox.No:
                    return

            print(f"Setting wavelength to {target_wavelength} nm...")
            self.run_matisse_task(self.matisse.set_wavelength, target_wavelength)

    @handled_slot(bool)
    def set_bifi_approx_wavelength_dialog(self, checked):
        target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Approx. Wavelength', 'Wavelength (nm): ',
                                                            self.matisse.query('MOTBI:WL?', numeric_result=True))
        if success:
            print(f"Setting BiFi approximate wavelength to {target_wavelength} nm...")
            self.matisse.set_bifi_wavelength(target_wavelength)

    @handled_slot(bool)
    def set_bifi_motor_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getInt(self.window, 'Set BiFi Motor Position', 'Absolute Position:',
                                                  self.matisse.query('MOTBI:POS?', numeric_result=True))
        if success:
            print(f"Setting BiFi motor position to {target_pos}.")
            self.matisse.set_bifi_motor_pos(target_pos)

    @handled_slot(bool)
    def set_thin_eta_motor_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getInt(self.window, 'Set Thin Etalon Motor Position', 'Absolute Position:',
                                                  self.matisse.query('MOTTE:POS?', numeric_result=True))
        if success:
            print(f"Setting thin etalon motor position to {target_pos}.")
            self.matisse.set_thin_etalon_motor_pos(target_pos)

    @handled_slot(bool)
    def set_piezo_eta_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set Piezo Etalon Position', 'Position: ',
                                                     self.matisse.query('PZETL:BASE?', numeric_result=True))
        if success:
            self.matisse.query(f"PZETL:BASE {target_pos}")

    @handled_slot(bool)
    def set_slow_piezo_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set Slow Piezo Position', 'Position: ',
                                                     self.matisse.query('SPZT:NOW?', numeric_result=True))
        if success:
            self.matisse.query(f"SPZT:NOW {target_pos}")

    @handled_slot(bool)
    def set_refcell_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set RefCell Position', 'Position: ',
                                                     self.matisse.query('SCAN:NOW?', numeric_result=True))
        if success:
            self.matisse.query(f"SCAN:NOW {target_pos}")

    @handled_slot(bool)
    def start_bifi_scan(self, checked):
        self.run_matisse_task(self.matisse.birefringent_filter_scan)

    @handled_slot(bool)
    def start_thin_etalon_scan(self, checked):
        self.run_matisse_task(self.matisse.thin_etalon_scan)

    @handled_slot(bool)
    def scan_device_up(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
            self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
            self.matisse.start_scan(matisse.SCAN_MODE_UP)

    @handled_slot(bool)
    def scan_device_down(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
            self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
            self.matisse.start_scan(matisse.SCAN_MODE_DOWN)

    @handled_slot(bool)
    def stop_scanning_device(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.stop_scan()

    @handled_slot(bool)
    def toggle_lock_laser(self, checked):
        if self.matisse.is_lock_correction_on():
            self.matisse.stop_laser_lock_correction()
        else:
            self.matisse.start_laser_lock_correction()

    @handled_slot(bool)
    def set_recommended_fast_pz_setpoint(self, checked):
        self.matisse.set_recommended_fast_piezo_setpoint()

    @handled_slot(bool)
    def toggle_slow_piezo_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} slow piezo.")
        self.slow_pz_control_action.setChecked(not checked)
        self.matisse.set_slow_piezo_control(checked)
        self.slow_pz_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_thin_etalon_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} thin etalon.")
        self.thin_eta_control_action.setChecked(not checked)
        self.matisse.set_thin_etalon_control(checked)
        self.thin_eta_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_piezo_etalon_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} piezo etalon.")
        self.piezo_eta_control_action.setChecked(not checked)
        self.matisse.set_piezo_etalon_control(checked)
        self.piezo_eta_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_fast_piezo_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} fast piezo.")
        self.fast_pz_control_action.setChecked(not checked)
        self.matisse.set_piezo_etalon_control(checked)
        self.fast_pz_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_auto_stabilization(self, checked):
        if self.matisse.is_stabilizing():
            self.matisse.stabilize_off()
        else:
            self.matisse.stabilize_on()

    @handled_slot(bool)
    def start_ple_scan(self, checked):
        if self.ple_scan_worker and self.ple_scan_worker.running():
            print('WARNING: A PLE scan is currently in progress.')
            return

        dialog = PLEScanDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            ple_options = dialog.get_form_data()
            print('Starting PLE scan.')
            self.ple_scan_worker = self.work_executor.submit(self.ple.start_ple_scan, **ple_options)
            self.ple_scan_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def analyze_ple_data(self, checked):
        if self.ple_analysis_worker and self.ple_analysis_worker.running():
            print('WARNING: A PLE analysis is currently in progress.')
            return

        dialog = PLEAnalysisDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            analysis_options = dialog.get_form_data()
            self.ple_analysis_worker = self.work_executor.submit(self.ple.analyze_ple_data, **analysis_options)
            self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def view_existing_analysis(self, checked):
        if self.ple_analysis_worker and self.ple_analysis_worker.running():
            print('WARNING: A PLE analysis is currently in progress.')
            return

        file_path, success = QFileDialog.getOpenFileName(caption='Select Analysis File',
                                                         filter='Pickle file (*.pickle)')
        if success:
            self.ple_analysis_worker = self.work_executor.submit(self.ple.plot_ple_analysis_file, file_path)
            self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def take_single_acquisition(self, checked):
        if self.single_acquisition_worker and self.single_acquisition_worker.running():
            print('WARNING: An acquisition is currently in progress.')
            return

        dialog = SingleAcquisitionDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            acquisition_options = dialog.get_form_data()
            self.single_acquisition_worker = self.work_executor.submit(self.ple.plot_single_acquisition,
                                                                       **acquisition_options)
            self.single_acquisition_worker.add_done_callback(utils.raise_error_from_future)

    def run_matisse_task(self, function, *args, **kwargs) -> bool:
        """
        Run an asynchronous Matisse-related task in the worker thread pool. Only one such task may be run at a time.
        Any task run using this method MUST exit gracefully at some point by checking the Matisse `exit_flag`.

        Parameters
        ----------
        function : function
            the function to run in the thread pool
        *args
            positional arguments to pass to the given function
        **kwargs
            keyword arguments to pass to the given function

        Returns
        -------
        bool
            whether the task was successfully started
        """
        if self.matisse_worker and self.matisse_worker.running():
            print("WARNING: Cannot perform requested action. A Matisse-related task is currently running.")
            return False
        else:
            self.matisse_worker = self.work_executor.submit(function, *args, **kwargs)
            self.matisse_worker.add_done_callback(utils.raise_error_from_future)
            return True


def main():
    exit_code = ControlApplication.EXIT_CODE_RESTART
    while exit_code == ControlApplication.EXIT_CODE_RESTART:
        gui = ControlApplication([])
        exit_code = gui.exec()
        del gui


if __name__ == '__main__':
    main()

Functions

def main()
Source code
def main():
    exit_code = ControlApplication.EXIT_CODE_RESTART
    while exit_code == ControlApplication.EXIT_CODE_RESTART:
        gui = ControlApplication([])
        exit_code = gui.exec()
        del gui

Classes

class ControlApplication (*args, **kwargs)

A comprehensive control center to make use of the APIs provided in this package.

Source code
class ControlApplication(QApplication):
    """
    A comprehensive control center to make use of the APIs provided in this package.
    """

    EXIT_CODE_RESTART = 42  # Answer to the Ultimate Question of Life, the Universe, and Everything
    CONFIRM_WAVELENGTH_CHANGE_THRESHOLD = 10

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Non-handled functions only here
        self.setup_window()
        self.setup_logging()
        self.setup_menus()
        self.setup_slots()

        # Handled functions can go here
        self.setup_matisse()
        self.setup_widgets()

        # Other setup
        self.aboutToQuit.connect(self.clean_up)
        self.work_executor = ThreadPoolExecutor()
        self.matisse_worker: Future = None
        self.ple: PLE = PLE(self.matisse)
        self.ple_scan_worker: Future = None
        self.ple_analysis_worker: Future = None
        self.single_acquisition_worker: Future = None

        container = QWidget()
        container.setLayout(self.layout)
        self.window.setCentralWidget(container)
        self.window.show()

    def setup_window(self):
        self.window = window = QMainWindow()
        self.layout = QVBoxLayout()
        window.setWindowTitle('Matisse Controller')
        window.resize(700, 300)

    def setup_logging(self):
        """Initialize logging queue and redirect stdout to the logging area."""
        self.log_queue = queue.Queue()
        self.log_area = LoggingArea(self.log_queue)
        self.log_area.setReadOnly(True)
        self.layout.addWidget(self.log_area)

        # Set up a context manager to redirect stdout to the log window
        self.log_redirector = redirect_stdout(LoggingStream(self.log_queue))
        self.log_redirector.__enter__()

    def setup_menus(self):
        """Initialization of items in the menu bar go here."""
        menu_bar = self.window.menuBar()

        console_menu = menu_bar.addMenu('Console')
        self.clear_log_area_action = console_menu.addAction('Clear Log')
        self.close_plots_action = console_menu.addAction('Close All Plots')
        self.configuration_action = console_menu.addAction('Configuration')
        reset_menu = console_menu.addMenu('Reset')
        self.reset_all_action = reset_menu.addAction('All')
        self.reset_matisse_motors_action = reset_menu.addAction('Matisse Motors')
        self.reset_matisse_piezos_action = reset_menu.addAction('Stabilization Piezos')
        self.reset_matisse_tasks_action = reset_menu.addAction('Matisse Tasks')
        self.reset_ple_tasks_action = reset_menu.addAction('PLE Tasks')
        self.restart_action = console_menu.addAction('Restart')

        set_menu = menu_bar.addMenu('Set')
        self.set_wavelength_action = set_menu.addAction('Wavelength')
        self.set_bifi_approx_wavelength_action = set_menu.addAction('BiFi Approx. Wavelength')
        self.set_bifi_motor_pos_action = set_menu.addAction('BiFi Motor Position')
        self.set_thin_eta_motor_pos_action = set_menu.addAction('Thin Etalon Motor Position')
        self.set_piezo_eta_pos_action = set_menu.addAction('Piezo Etalon Position')
        self.set_slow_piezo_pos_action = set_menu.addAction('Slow Piezo Position')
        self.set_refcell_pos_action = set_menu.addAction('RefCell Position')

        scan_menu = menu_bar.addMenu('Scan')
        self.bifi_scan_action = scan_menu.addAction('Birefringent Filter')
        self.thin_eta_scan_action = scan_menu.addAction('Thin Etalon')
        self.scan_device_up_action = scan_menu.addAction('Scan Device Up')
        self.scan_device_down_action = scan_menu.addAction('Scan Device Down')
        self.stop_scan_device_action = scan_menu.addAction('Stop Scanning Device')

        stabilization_menu = menu_bar.addMenu('Stabilization')
        toggle_control_loop_menu = stabilization_menu.addMenu('Toggle Control Loop')
        self.slow_pz_control_action = toggle_control_loop_menu.addAction('Slow Piezo')
        self.slow_pz_control_action.setCheckable(True)
        self.thin_eta_control_action = toggle_control_loop_menu.addAction('Thin Etalon')
        self.thin_eta_control_action.setCheckable(True)
        self.piezo_eta_control_action = toggle_control_loop_menu.addAction('Piezo Etalon')
        self.piezo_eta_control_action.setCheckable(True)
        self.fast_pz_control_action = toggle_control_loop_menu.addAction('Fast Piezo')
        self.fast_pz_control_action.setCheckable(True)
        self.lock_laser_action = stabilization_menu.addAction('Toggle Lock Laser')
        self.set_recommended_fast_pz_setpoint_action = stabilization_menu.addAction('Set Recommended Fast Pz Setpoint')
        self.auto_stabilize_action = stabilization_menu.addAction('Toggle Auto Stabilization')

        ple_menu = menu_bar.addMenu('Shamrock')
        self.start_ple_scan_action = ple_menu.addAction('Start PLE Scan')
        self.analyze_ple_action = ple_menu.addAction('Start PLE Analysis')
        self.view_existing_analysis_action = ple_menu.addAction('View PLE Analysis')
        self.single_acquisition_action = ple_menu.addAction('View Single Acquisition')

        self.control_loop_actions = [self.slow_pz_control_action, self.thin_eta_control_action,
                                     self.piezo_eta_control_action, self.fast_pz_control_action]

    def setup_slots(self):
        """
        Connection of Qt signals to Qt slots goes here.

        Please note that execution of slots will block the UI thread by default.
        """
        # Console
        self.clear_log_area_action.triggered.connect(self.clear_log_area)
        self.close_plots_action.triggered.connect(self.close_plots)
        self.configuration_action.triggered.connect(self.open_configuration)
        self.reset_all_action.triggered.connect(self.reset)
        self.reset_matisse_motors_action.triggered.connect(self.reset_motors_only)
        self.reset_matisse_piezos_action.triggered.connect(self.reset_piezos_only)
        self.reset_matisse_tasks_action.triggered.connect(self.reset_matisse_tasks_only)
        self.reset_ple_tasks_action.triggered.connect(self.reset_ple_tasks_only)
        self.restart_action.triggered.connect(self.restart)

        # Set
        self.set_wavelength_action.triggered.connect(self.set_wavelength_dialog)
        self.set_bifi_approx_wavelength_action.triggered.connect(self.set_bifi_approx_wavelength_dialog)
        self.set_bifi_motor_pos_action.triggered.connect(self.set_bifi_motor_pos_dialog)
        self.set_thin_eta_motor_pos_action.triggered.connect(self.set_thin_eta_motor_pos_dialog)
        self.set_piezo_eta_pos_action.triggered.connect(self.set_piezo_eta_pos_dialog)
        self.set_slow_piezo_pos_action.triggered.connect(self.set_slow_piezo_pos_dialog)
        self.set_refcell_pos_action.triggered.connect(self.set_refcell_pos_dialog)

        # Scan
        self.bifi_scan_action.triggered.connect(self.start_bifi_scan)
        self.thin_eta_scan_action.triggered.connect(self.start_thin_etalon_scan)

        # PLE
        self.scan_device_up_action.triggered.connect(self.scan_device_up)
        self.scan_device_down_action.triggered.connect(self.scan_device_down)
        self.stop_scan_device_action.triggered.connect(self.stop_scanning_device)

        # Stabilization
        self.slow_pz_control_action.triggered.connect(self.toggle_slow_piezo_control)
        self.thin_eta_control_action.triggered.connect(self.toggle_thin_etalon_control)
        self.piezo_eta_control_action.triggered.connect(self.toggle_piezo_etalon_control)
        self.fast_pz_control_action.triggered.connect(self.toggle_fast_piezo_control)
        self.lock_laser_action.triggered.connect(self.toggle_lock_laser)
        self.set_recommended_fast_pz_setpoint_action.triggered.connect(self.set_recommended_fast_pz_setpoint)
        self.auto_stabilize_action.triggered.connect(self.toggle_auto_stabilization)

        # Shamrock
        self.start_ple_scan_action.triggered.connect(self.start_ple_scan)
        self.analyze_ple_action.triggered.connect(self.analyze_ple_data)
        self.view_existing_analysis_action.triggered.connect(self.view_existing_analysis)
        self.single_acquisition_action.triggered.connect(self.take_single_acquisition)

    @handled_function
    def setup_widgets(self):
        """Initialize any widgets the UI needs to run correctly."""
        self.status_monitor_queue = queue.Queue()
        self.status_monitor = StatusMonitor(self.matisse, self.status_monitor_queue)
        self.layout.addWidget(self.status_monitor)

    @handled_function
    def setup_matisse(self):
        try:
            self.matisse: Matisse = Matisse()
            if self.matisse.all_control_loops_on() and not self.matisse.is_lock_correction_on():
                self.matisse.start_laser_lock_correction()
        except Exception as err:
            self.matisse: Matisse = None
            raise err

    @pyqtSlot()
    def clean_up(self):
        """
        This method is run before the GUI exits, think of it like `__del__`.

        Don't call this elsewhere unless you know what you're doing.
        """
        self.reset(reset_motors=False, reset_piezos=False)
        del self.matisse

        # Clean up widgets with running threads.
        self.status_monitor.clean_up()
        self.log_area.clean_up()

        PLE.clean_up_globals()

        self.log_redirector.__exit__(None, None, None)

    def error_dialog(self):
        """Display an error dialog box with details of the most recent exception raised."""
        stack = list(traceback.format_exception(*sys.exc_info()))
        # Pick length of longest line in stack, with a cutoff at 185
        desired_width = min(max([len(line) for line in stack]), 185)
        description = stack.pop()
        print(utils.red_text(description), end='')
        # Remove entries for handled_function decorator, for clarity
        stack = list(filter(lambda item: 'in handled_function_wrapper' not in item, stack))
        dialog = QMessageBox(icon=QMessageBox.Critical)
        dialog.setWindowTitle('Error')
        # Adding the underscores is a hack to resize the QMessageBox because it's not normally resizable.
        # This looks good in Windows, haven't tested other platforms. Sorry :(
        dialog.setText(f"{description + '_' * desired_width}\n\n{''.join(stack)}")
        dialog.exec()

    @handled_slot(bool)
    def clear_log_area(self, checked):
        self.log_area.clear()

    @handled_slot(bool)
    def close_plots(self, checked):
        self.matisse.close_all_plots()

    @handled_slot(bool)
    def open_configuration(self, checked):
        dialog = ConfigurationDialog()
        dialog.exec()

    @handled_slot(bool)
    def reset(self, checked=False, reset_motors=True, reset_piezos=True, reset_matisse_tasks=True,
              reset_ple_tasks=True):
        """
        Reset Matisse to a 'good' default state: not locked or stabilizing, motors reset, all tasks finished, etc.

        Also stops any PLE analysis tasks.

        Parameters
        ----------
        checked : bool
            not used
        reset_motors : bool
            whether to reset the Matisse birefringent filter and thin etalon motors to their configured reset positions
        reset_piezos : bool
            whether to reset the Matisse stabilization piezos
        reset_matisse_tasks : bool
            whether to trigger the exit flag and quit any running tasks (like scans)
        reset_ple_tasks : bool
            whether to stop any running PLE-related tasks
        """
        if self.matisse:
            if reset_matisse_tasks:
                self.matisse.exit_flag = True
                if self.matisse_worker is not None and self.matisse_worker.running():
                    print('Waiting for Matisse tasks to complete.')
                    self.matisse_worker.result()
                self.matisse_worker = None
            if self.matisse.is_stabilizing():
                self.matisse.stabilize_off()
            if self.matisse.is_lock_correction_on():
                self.matisse.stop_laser_lock_correction()
            if reset_motors:
                self.matisse.reset_motors()
            if reset_piezos:
                self.matisse.reset_stabilization_piezos()

        if reset_ple_tasks:
            self.ple.stop_ple_tasks()

            if self.ple_scan_worker and self.ple_scan_worker.running():
                print('Waiting for PLE scan to complete.')
                self.ple_scan_worker.result()
            self.ple_scan_worker = None

            if self.ple_analysis_worker and self.ple_analysis_worker.running():
                print('Waiting for PLE analysis to complete.')
                self.ple_analysis_worker.result()
            self.ple_analysis_worker = None

            if self.single_acquisition_worker and self.single_acquisition_worker.running():
                print('Waiting for acquisition to complete.')
                self.single_acquisition_worker.result()
            self.single_acquisition_worker = None

        if self.matisse:
            self.matisse.exit_flag = False

        print('Done.')

    @handled_slot(bool)
    def reset_motors_only(self, checked):
        self.reset(reset_piezos=False, reset_matisse_tasks=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_piezos_only(self, checked):
        self.reset(reset_motors=False, reset_matisse_tasks=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_matisse_tasks_only(self, checked):
        self.reset(reset_motors=False, reset_piezos=False, reset_ple_tasks=False)

    @handled_slot(bool)
    def reset_ple_tasks_only(self, checked):
        self.reset(reset_motors=False, reset_piezos=False, reset_matisse_tasks=False)

    @handled_slot(bool)
    def restart(self, checked):
        self.exit(ControlApplication.EXIT_CODE_RESTART)

    @handled_slot(bool)
    def set_wavelength_dialog(self, checked):
        """
        Open a dialog to set the wavelength of the Matisse. If the difference in wavleength is greater than or equal to
        the wavelength change threshold, display a warning to confirm the change.
        """
        current_wavelength = self.matisse.target_wavelength
        if current_wavelength is None:
            current_wavelength = self.matisse.wavemeter_wavelength()
        target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Wavelength', 'Wavelength (nm): ',
                                                            current_wavelength, decimals=3,
                                                            min=cfg.get(cfg.WAVELENGTH_LOWER_LIMIT),
                                                            max=cfg.get(cfg.WAVELENGTH_UPPER_LIMIT))
        if success:
            if abs(current_wavelength - target_wavelength) >= ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD:
                answer = QMessageBox.warning(self.window, 'Large Wavelength Change',
                                             f"The desired wavelength, {target_wavelength} nm, is more than "
                                             f"{ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD} nm "
                                             'away from the current wavelength. Are you sure?',
                                             QMessageBox.Yes | QMessageBox.No, defaultButton=QMessageBox.No)
                if answer == QMessageBox.No:
                    return

            print(f"Setting wavelength to {target_wavelength} nm...")
            self.run_matisse_task(self.matisse.set_wavelength, target_wavelength)

    @handled_slot(bool)
    def set_bifi_approx_wavelength_dialog(self, checked):
        target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Approx. Wavelength', 'Wavelength (nm): ',
                                                            self.matisse.query('MOTBI:WL?', numeric_result=True))
        if success:
            print(f"Setting BiFi approximate wavelength to {target_wavelength} nm...")
            self.matisse.set_bifi_wavelength(target_wavelength)

    @handled_slot(bool)
    def set_bifi_motor_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getInt(self.window, 'Set BiFi Motor Position', 'Absolute Position:',
                                                  self.matisse.query('MOTBI:POS?', numeric_result=True))
        if success:
            print(f"Setting BiFi motor position to {target_pos}.")
            self.matisse.set_bifi_motor_pos(target_pos)

    @handled_slot(bool)
    def set_thin_eta_motor_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getInt(self.window, 'Set Thin Etalon Motor Position', 'Absolute Position:',
                                                  self.matisse.query('MOTTE:POS?', numeric_result=True))
        if success:
            print(f"Setting thin etalon motor position to {target_pos}.")
            self.matisse.set_thin_etalon_motor_pos(target_pos)

    @handled_slot(bool)
    def set_piezo_eta_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set Piezo Etalon Position', 'Position: ',
                                                     self.matisse.query('PZETL:BASE?', numeric_result=True))
        if success:
            self.matisse.query(f"PZETL:BASE {target_pos}")

    @handled_slot(bool)
    def set_slow_piezo_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set Slow Piezo Position', 'Position: ',
                                                     self.matisse.query('SPZT:NOW?', numeric_result=True))
        if success:
            self.matisse.query(f"SPZT:NOW {target_pos}")

    @handled_slot(bool)
    def set_refcell_pos_dialog(self, checked):
        target_pos, success = QInputDialog.getDouble(self.window, 'Set RefCell Position', 'Position: ',
                                                     self.matisse.query('SCAN:NOW?', numeric_result=True))
        if success:
            self.matisse.query(f"SCAN:NOW {target_pos}")

    @handled_slot(bool)
    def start_bifi_scan(self, checked):
        self.run_matisse_task(self.matisse.birefringent_filter_scan)

    @handled_slot(bool)
    def start_thin_etalon_scan(self, checked):
        self.run_matisse_task(self.matisse.thin_etalon_scan)

    @handled_slot(bool)
    def scan_device_up(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
            self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
            self.matisse.start_scan(matisse.SCAN_MODE_UP)

    @handled_slot(bool)
    def scan_device_down(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
            self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
            self.matisse.start_scan(matisse.SCAN_MODE_DOWN)

    @handled_slot(bool)
    def stop_scanning_device(self, checked):
        if self.matisse.is_stabilizing():
            print('WARNING: Auto-stabilize is on. Disable it and try again.')
        else:
            self.matisse.stop_scan()

    @handled_slot(bool)
    def toggle_lock_laser(self, checked):
        if self.matisse.is_lock_correction_on():
            self.matisse.stop_laser_lock_correction()
        else:
            self.matisse.start_laser_lock_correction()

    @handled_slot(bool)
    def set_recommended_fast_pz_setpoint(self, checked):
        self.matisse.set_recommended_fast_piezo_setpoint()

    @handled_slot(bool)
    def toggle_slow_piezo_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} slow piezo.")
        self.slow_pz_control_action.setChecked(not checked)
        self.matisse.set_slow_piezo_control(checked)
        self.slow_pz_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_thin_etalon_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} thin etalon.")
        self.thin_eta_control_action.setChecked(not checked)
        self.matisse.set_thin_etalon_control(checked)
        self.thin_eta_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_piezo_etalon_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} piezo etalon.")
        self.piezo_eta_control_action.setChecked(not checked)
        self.matisse.set_piezo_etalon_control(checked)
        self.piezo_eta_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_fast_piezo_control(self, checked):
        print(f"{'Locking' if checked else 'Unlocking'} fast piezo.")
        self.fast_pz_control_action.setChecked(not checked)
        self.matisse.set_piezo_etalon_control(checked)
        self.fast_pz_control_action.setChecked(checked)

    @handled_slot(bool)
    def toggle_auto_stabilization(self, checked):
        if self.matisse.is_stabilizing():
            self.matisse.stabilize_off()
        else:
            self.matisse.stabilize_on()

    @handled_slot(bool)
    def start_ple_scan(self, checked):
        if self.ple_scan_worker and self.ple_scan_worker.running():
            print('WARNING: A PLE scan is currently in progress.')
            return

        dialog = PLEScanDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            ple_options = dialog.get_form_data()
            print('Starting PLE scan.')
            self.ple_scan_worker = self.work_executor.submit(self.ple.start_ple_scan, **ple_options)
            self.ple_scan_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def analyze_ple_data(self, checked):
        if self.ple_analysis_worker and self.ple_analysis_worker.running():
            print('WARNING: A PLE analysis is currently in progress.')
            return

        dialog = PLEAnalysisDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            analysis_options = dialog.get_form_data()
            self.ple_analysis_worker = self.work_executor.submit(self.ple.analyze_ple_data, **analysis_options)
            self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def view_existing_analysis(self, checked):
        if self.ple_analysis_worker and self.ple_analysis_worker.running():
            print('WARNING: A PLE analysis is currently in progress.')
            return

        file_path, success = QFileDialog.getOpenFileName(caption='Select Analysis File',
                                                         filter='Pickle file (*.pickle)')
        if success:
            self.ple_analysis_worker = self.work_executor.submit(self.ple.plot_ple_analysis_file, file_path)
            self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)

    @handled_slot(bool)
    def take_single_acquisition(self, checked):
        if self.single_acquisition_worker and self.single_acquisition_worker.running():
            print('WARNING: An acquisition is currently in progress.')
            return

        dialog = SingleAcquisitionDialog(parent=self.window)
        if dialog.exec() == QDialog.Accepted:
            acquisition_options = dialog.get_form_data()
            self.single_acquisition_worker = self.work_executor.submit(self.ple.plot_single_acquisition,
                                                                       **acquisition_options)
            self.single_acquisition_worker.add_done_callback(utils.raise_error_from_future)

    def run_matisse_task(self, function, *args, **kwargs) -> bool:
        """
        Run an asynchronous Matisse-related task in the worker thread pool. Only one such task may be run at a time.
        Any task run using this method MUST exit gracefully at some point by checking the Matisse `exit_flag`.

        Parameters
        ----------
        function : function
            the function to run in the thread pool
        *args
            positional arguments to pass to the given function
        **kwargs
            keyword arguments to pass to the given function

        Returns
        -------
        bool
            whether the task was successfully started
        """
        if self.matisse_worker and self.matisse_worker.running():
            print("WARNING: Cannot perform requested action. A Matisse-related task is currently running.")
            return False
        else:
            self.matisse_worker = self.work_executor.submit(function, *args, **kwargs)
            self.matisse_worker.add_done_callback(utils.raise_error_from_future)
            return True

Ancestors

  • PyQt5.QtWidgets.QApplication
  • PyQt5.QtGui.QGuiApplication
  • PyQt5.QtCore.QCoreApplication
  • PyQt5.QtCore.QObject
  • sip.wrapper
  • sip.simplewrapper

Class variables

var CONFIRM_WAVELENGTH_CHANGE_THRESHOLD
var EXIT_CODE_RESTART

Methods

def analyze_ple_data(self, checked)
Source code
@handled_slot(bool)
def analyze_ple_data(self, checked):
    if self.ple_analysis_worker and self.ple_analysis_worker.running():
        print('WARNING: A PLE analysis is currently in progress.')
        return

    dialog = PLEAnalysisDialog(parent=self.window)
    if dialog.exec() == QDialog.Accepted:
        analysis_options = dialog.get_form_data()
        self.ple_analysis_worker = self.work_executor.submit(self.ple.analyze_ple_data, **analysis_options)
        self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)
def clean_up(self)

This method is run before the GUI exits, think of it like __del__.

Don't call this elsewhere unless you know what you're doing.

Source code
@pyqtSlot()
def clean_up(self):
    """
    This method is run before the GUI exits, think of it like `__del__`.

    Don't call this elsewhere unless you know what you're doing.
    """
    self.reset(reset_motors=False, reset_piezos=False)
    del self.matisse

    # Clean up widgets with running threads.
    self.status_monitor.clean_up()
    self.log_area.clean_up()

    PLE.clean_up_globals()

    self.log_redirector.__exit__(None, None, None)
def clear_log_area(self, checked)
Source code
@handled_slot(bool)
def clear_log_area(self, checked):
    self.log_area.clear()
def close_plots(self, checked)
Source code
@handled_slot(bool)
def close_plots(self, checked):
    self.matisse.close_all_plots()
def error_dialog(self)

Display an error dialog box with details of the most recent exception raised.

Source code
def error_dialog(self):
    """Display an error dialog box with details of the most recent exception raised."""
    stack = list(traceback.format_exception(*sys.exc_info()))
    # Pick length of longest line in stack, with a cutoff at 185
    desired_width = min(max([len(line) for line in stack]), 185)
    description = stack.pop()
    print(utils.red_text(description), end='')
    # Remove entries for handled_function decorator, for clarity
    stack = list(filter(lambda item: 'in handled_function_wrapper' not in item, stack))
    dialog = QMessageBox(icon=QMessageBox.Critical)
    dialog.setWindowTitle('Error')
    # Adding the underscores is a hack to resize the QMessageBox because it's not normally resizable.
    # This looks good in Windows, haven't tested other platforms. Sorry :(
    dialog.setText(f"{description + '_' * desired_width}\n\n{''.join(stack)}")
    dialog.exec()
def open_configuration(self, checked)
Source code
@handled_slot(bool)
def open_configuration(self, checked):
    dialog = ConfigurationDialog()
    dialog.exec()
def reset(self, checked=False, reset_motors=True, reset_piezos=True, reset_matisse_tasks=True, reset_ple_tasks=True)

Reset Matisse to a 'good' default state: not locked or stabilizing, motors reset, all tasks finished, etc.

Also stops any PLE analysis tasks.

Parameters

checked : bool
not used
reset_motors : bool
whether to reset the Matisse birefringent filter and thin etalon motors to their configured reset positions
reset_piezos : bool
whether to reset the Matisse stabilization piezos
reset_matisse_tasks : bool
whether to trigger the exit flag and quit any running tasks (like scans)
reset_ple_tasks : bool
whether to stop any running PLE-related tasks
Source code
@handled_slot(bool)
def reset(self, checked=False, reset_motors=True, reset_piezos=True, reset_matisse_tasks=True,
          reset_ple_tasks=True):
    """
    Reset Matisse to a 'good' default state: not locked or stabilizing, motors reset, all tasks finished, etc.

    Also stops any PLE analysis tasks.

    Parameters
    ----------
    checked : bool
        not used
    reset_motors : bool
        whether to reset the Matisse birefringent filter and thin etalon motors to their configured reset positions
    reset_piezos : bool
        whether to reset the Matisse stabilization piezos
    reset_matisse_tasks : bool
        whether to trigger the exit flag and quit any running tasks (like scans)
    reset_ple_tasks : bool
        whether to stop any running PLE-related tasks
    """
    if self.matisse:
        if reset_matisse_tasks:
            self.matisse.exit_flag = True
            if self.matisse_worker is not None and self.matisse_worker.running():
                print('Waiting for Matisse tasks to complete.')
                self.matisse_worker.result()
            self.matisse_worker = None
        if self.matisse.is_stabilizing():
            self.matisse.stabilize_off()
        if self.matisse.is_lock_correction_on():
            self.matisse.stop_laser_lock_correction()
        if reset_motors:
            self.matisse.reset_motors()
        if reset_piezos:
            self.matisse.reset_stabilization_piezos()

    if reset_ple_tasks:
        self.ple.stop_ple_tasks()

        if self.ple_scan_worker and self.ple_scan_worker.running():
            print('Waiting for PLE scan to complete.')
            self.ple_scan_worker.result()
        self.ple_scan_worker = None

        if self.ple_analysis_worker and self.ple_analysis_worker.running():
            print('Waiting for PLE analysis to complete.')
            self.ple_analysis_worker.result()
        self.ple_analysis_worker = None

        if self.single_acquisition_worker and self.single_acquisition_worker.running():
            print('Waiting for acquisition to complete.')
            self.single_acquisition_worker.result()
        self.single_acquisition_worker = None

    if self.matisse:
        self.matisse.exit_flag = False

    print('Done.')
def reset_matisse_tasks_only(self, checked)
Source code
@handled_slot(bool)
def reset_matisse_tasks_only(self, checked):
    self.reset(reset_motors=False, reset_piezos=False, reset_ple_tasks=False)
def reset_motors_only(self, checked)
Source code
@handled_slot(bool)
def reset_motors_only(self, checked):
    self.reset(reset_piezos=False, reset_matisse_tasks=False, reset_ple_tasks=False)
def reset_piezos_only(self, checked)
Source code
@handled_slot(bool)
def reset_piezos_only(self, checked):
    self.reset(reset_motors=False, reset_matisse_tasks=False, reset_ple_tasks=False)
def reset_ple_tasks_only(self, checked)
Source code
@handled_slot(bool)
def reset_ple_tasks_only(self, checked):
    self.reset(reset_motors=False, reset_piezos=False, reset_matisse_tasks=False)
def restart(self, checked)
Source code
@handled_slot(bool)
def restart(self, checked):
    self.exit(ControlApplication.EXIT_CODE_RESTART)
def run_matisse_task(self, function, *args, **kwargs)

Run an asynchronous Matisse-related task in the worker thread pool. Only one such task may be run at a time. Any task run using this method MUST exit gracefully at some point by checking the Matisse exit_flag.

Parameters

function : function
the function to run in the thread pool
*args
positional arguments to pass to the given function
**kwargs
keyword arguments to pass to the given function

Returns

bool
whether the task was successfully started
Source code
def run_matisse_task(self, function, *args, **kwargs) -> bool:
    """
    Run an asynchronous Matisse-related task in the worker thread pool. Only one such task may be run at a time.
    Any task run using this method MUST exit gracefully at some point by checking the Matisse `exit_flag`.

    Parameters
    ----------
    function : function
        the function to run in the thread pool
    *args
        positional arguments to pass to the given function
    **kwargs
        keyword arguments to pass to the given function

    Returns
    -------
    bool
        whether the task was successfully started
    """
    if self.matisse_worker and self.matisse_worker.running():
        print("WARNING: Cannot perform requested action. A Matisse-related task is currently running.")
        return False
    else:
        self.matisse_worker = self.work_executor.submit(function, *args, **kwargs)
        self.matisse_worker.add_done_callback(utils.raise_error_from_future)
        return True
def scan_device_down(self, checked)
Source code
@handled_slot(bool)
def scan_device_down(self, checked):
    if self.matisse.is_stabilizing():
        print('WARNING: Auto-stabilize is on. Disable it and try again.')
    else:
        self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
        self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
        self.matisse.start_scan(matisse.SCAN_MODE_DOWN)
def scan_device_up(self, checked)
Source code
@handled_slot(bool)
def scan_device_up(self, checked):
    if self.matisse.is_stabilizing():
        print('WARNING: Auto-stabilize is on. Disable it and try again.')
    else:
        self.matisse.query(f"SCAN:RISINGSPEED {cfg.get(cfg.REFCELL_SCAN_RISING_SPEED)}")
        self.matisse.query(f"SCAN:FALLINGSPEED {cfg.get(cfg.REFCELL_SCAN_FALLING_SPEED)}")
        self.matisse.start_scan(matisse.SCAN_MODE_UP)
def set_bifi_approx_wavelength_dialog(self, checked)
Source code
@handled_slot(bool)
def set_bifi_approx_wavelength_dialog(self, checked):
    target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Approx. Wavelength', 'Wavelength (nm): ',
                                                        self.matisse.query('MOTBI:WL?', numeric_result=True))
    if success:
        print(f"Setting BiFi approximate wavelength to {target_wavelength} nm...")
        self.matisse.set_bifi_wavelength(target_wavelength)
def set_bifi_motor_pos_dialog(self, checked)
Source code
@handled_slot(bool)
def set_bifi_motor_pos_dialog(self, checked):
    target_pos, success = QInputDialog.getInt(self.window, 'Set BiFi Motor Position', 'Absolute Position:',
                                              self.matisse.query('MOTBI:POS?', numeric_result=True))
    if success:
        print(f"Setting BiFi motor position to {target_pos}.")
        self.matisse.set_bifi_motor_pos(target_pos)
def set_piezo_eta_pos_dialog(self, checked)
Source code
@handled_slot(bool)
def set_piezo_eta_pos_dialog(self, checked):
    target_pos, success = QInputDialog.getDouble(self.window, 'Set Piezo Etalon Position', 'Position: ',
                                                 self.matisse.query('PZETL:BASE?', numeric_result=True))
    if success:
        self.matisse.query(f"PZETL:BASE {target_pos}")
Source code
@handled_slot(bool)
def set_recommended_fast_pz_setpoint(self, checked):
    self.matisse.set_recommended_fast_piezo_setpoint()
def set_refcell_pos_dialog(self, checked)
Source code
@handled_slot(bool)
def set_refcell_pos_dialog(self, checked):
    target_pos, success = QInputDialog.getDouble(self.window, 'Set RefCell Position', 'Position: ',
                                                 self.matisse.query('SCAN:NOW?', numeric_result=True))
    if success:
        self.matisse.query(f"SCAN:NOW {target_pos}")
def set_slow_piezo_pos_dialog(self, checked)
Source code
@handled_slot(bool)
def set_slow_piezo_pos_dialog(self, checked):
    target_pos, success = QInputDialog.getDouble(self.window, 'Set Slow Piezo Position', 'Position: ',
                                                 self.matisse.query('SPZT:NOW?', numeric_result=True))
    if success:
        self.matisse.query(f"SPZT:NOW {target_pos}")
def set_thin_eta_motor_pos_dialog(self, checked)
Source code
@handled_slot(bool)
def set_thin_eta_motor_pos_dialog(self, checked):
    target_pos, success = QInputDialog.getInt(self.window, 'Set Thin Etalon Motor Position', 'Absolute Position:',
                                              self.matisse.query('MOTTE:POS?', numeric_result=True))
    if success:
        print(f"Setting thin etalon motor position to {target_pos}.")
        self.matisse.set_thin_etalon_motor_pos(target_pos)
def set_wavelength_dialog(self, checked)

Open a dialog to set the wavelength of the Matisse. If the difference in wavleength is greater than or equal to the wavelength change threshold, display a warning to confirm the change.

Source code
@handled_slot(bool)
def set_wavelength_dialog(self, checked):
    """
    Open a dialog to set the wavelength of the Matisse. If the difference in wavleength is greater than or equal to
    the wavelength change threshold, display a warning to confirm the change.
    """
    current_wavelength = self.matisse.target_wavelength
    if current_wavelength is None:
        current_wavelength = self.matisse.wavemeter_wavelength()
    target_wavelength, success = QInputDialog.getDouble(self.window, 'Set Wavelength', 'Wavelength (nm): ',
                                                        current_wavelength, decimals=3,
                                                        min=cfg.get(cfg.WAVELENGTH_LOWER_LIMIT),
                                                        max=cfg.get(cfg.WAVELENGTH_UPPER_LIMIT))
    if success:
        if abs(current_wavelength - target_wavelength) >= ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD:
            answer = QMessageBox.warning(self.window, 'Large Wavelength Change',
                                         f"The desired wavelength, {target_wavelength} nm, is more than "
                                         f"{ControlApplication.CONFIRM_WAVELENGTH_CHANGE_THRESHOLD} nm "
                                         'away from the current wavelength. Are you sure?',
                                         QMessageBox.Yes | QMessageBox.No, defaultButton=QMessageBox.No)
            if answer == QMessageBox.No:
                return

        print(f"Setting wavelength to {target_wavelength} nm...")
        self.run_matisse_task(self.matisse.set_wavelength, target_wavelength)
def setup_logging(self)

Initialize logging queue and redirect stdout to the logging area.

Source code
def setup_logging(self):
    """Initialize logging queue and redirect stdout to the logging area."""
    self.log_queue = queue.Queue()
    self.log_area = LoggingArea(self.log_queue)
    self.log_area.setReadOnly(True)
    self.layout.addWidget(self.log_area)

    # Set up a context manager to redirect stdout to the log window
    self.log_redirector = redirect_stdout(LoggingStream(self.log_queue))
    self.log_redirector.__enter__()
def setup_matisse(self)
Source code
@handled_function
def setup_matisse(self):
    try:
        self.matisse: Matisse = Matisse()
        if self.matisse.all_control_loops_on() and not self.matisse.is_lock_correction_on():
            self.matisse.start_laser_lock_correction()
    except Exception as err:
        self.matisse: Matisse = None
        raise err
def setup_menus(self)

Initialization of items in the menu bar go here.

Source code
def setup_menus(self):
    """Initialization of items in the menu bar go here."""
    menu_bar = self.window.menuBar()

    console_menu = menu_bar.addMenu('Console')
    self.clear_log_area_action = console_menu.addAction('Clear Log')
    self.close_plots_action = console_menu.addAction('Close All Plots')
    self.configuration_action = console_menu.addAction('Configuration')
    reset_menu = console_menu.addMenu('Reset')
    self.reset_all_action = reset_menu.addAction('All')
    self.reset_matisse_motors_action = reset_menu.addAction('Matisse Motors')
    self.reset_matisse_piezos_action = reset_menu.addAction('Stabilization Piezos')
    self.reset_matisse_tasks_action = reset_menu.addAction('Matisse Tasks')
    self.reset_ple_tasks_action = reset_menu.addAction('PLE Tasks')
    self.restart_action = console_menu.addAction('Restart')

    set_menu = menu_bar.addMenu('Set')
    self.set_wavelength_action = set_menu.addAction('Wavelength')
    self.set_bifi_approx_wavelength_action = set_menu.addAction('BiFi Approx. Wavelength')
    self.set_bifi_motor_pos_action = set_menu.addAction('BiFi Motor Position')
    self.set_thin_eta_motor_pos_action = set_menu.addAction('Thin Etalon Motor Position')
    self.set_piezo_eta_pos_action = set_menu.addAction('Piezo Etalon Position')
    self.set_slow_piezo_pos_action = set_menu.addAction('Slow Piezo Position')
    self.set_refcell_pos_action = set_menu.addAction('RefCell Position')

    scan_menu = menu_bar.addMenu('Scan')
    self.bifi_scan_action = scan_menu.addAction('Birefringent Filter')
    self.thin_eta_scan_action = scan_menu.addAction('Thin Etalon')
    self.scan_device_up_action = scan_menu.addAction('Scan Device Up')
    self.scan_device_down_action = scan_menu.addAction('Scan Device Down')
    self.stop_scan_device_action = scan_menu.addAction('Stop Scanning Device')

    stabilization_menu = menu_bar.addMenu('Stabilization')
    toggle_control_loop_menu = stabilization_menu.addMenu('Toggle Control Loop')
    self.slow_pz_control_action = toggle_control_loop_menu.addAction('Slow Piezo')
    self.slow_pz_control_action.setCheckable(True)
    self.thin_eta_control_action = toggle_control_loop_menu.addAction('Thin Etalon')
    self.thin_eta_control_action.setCheckable(True)
    self.piezo_eta_control_action = toggle_control_loop_menu.addAction('Piezo Etalon')
    self.piezo_eta_control_action.setCheckable(True)
    self.fast_pz_control_action = toggle_control_loop_menu.addAction('Fast Piezo')
    self.fast_pz_control_action.setCheckable(True)
    self.lock_laser_action = stabilization_menu.addAction('Toggle Lock Laser')
    self.set_recommended_fast_pz_setpoint_action = stabilization_menu.addAction('Set Recommended Fast Pz Setpoint')
    self.auto_stabilize_action = stabilization_menu.addAction('Toggle Auto Stabilization')

    ple_menu = menu_bar.addMenu('Shamrock')
    self.start_ple_scan_action = ple_menu.addAction('Start PLE Scan')
    self.analyze_ple_action = ple_menu.addAction('Start PLE Analysis')
    self.view_existing_analysis_action = ple_menu.addAction('View PLE Analysis')
    self.single_acquisition_action = ple_menu.addAction('View Single Acquisition')

    self.control_loop_actions = [self.slow_pz_control_action, self.thin_eta_control_action,
                                 self.piezo_eta_control_action, self.fast_pz_control_action]
def setup_slots(self)

Connection of Qt signals to Qt slots goes here.

Please note that execution of slots will block the UI thread by default.

Source code
def setup_slots(self):
    """
    Connection of Qt signals to Qt slots goes here.

    Please note that execution of slots will block the UI thread by default.
    """
    # Console
    self.clear_log_area_action.triggered.connect(self.clear_log_area)
    self.close_plots_action.triggered.connect(self.close_plots)
    self.configuration_action.triggered.connect(self.open_configuration)
    self.reset_all_action.triggered.connect(self.reset)
    self.reset_matisse_motors_action.triggered.connect(self.reset_motors_only)
    self.reset_matisse_piezos_action.triggered.connect(self.reset_piezos_only)
    self.reset_matisse_tasks_action.triggered.connect(self.reset_matisse_tasks_only)
    self.reset_ple_tasks_action.triggered.connect(self.reset_ple_tasks_only)
    self.restart_action.triggered.connect(self.restart)

    # Set
    self.set_wavelength_action.triggered.connect(self.set_wavelength_dialog)
    self.set_bifi_approx_wavelength_action.triggered.connect(self.set_bifi_approx_wavelength_dialog)
    self.set_bifi_motor_pos_action.triggered.connect(self.set_bifi_motor_pos_dialog)
    self.set_thin_eta_motor_pos_action.triggered.connect(self.set_thin_eta_motor_pos_dialog)
    self.set_piezo_eta_pos_action.triggered.connect(self.set_piezo_eta_pos_dialog)
    self.set_slow_piezo_pos_action.triggered.connect(self.set_slow_piezo_pos_dialog)
    self.set_refcell_pos_action.triggered.connect(self.set_refcell_pos_dialog)

    # Scan
    self.bifi_scan_action.triggered.connect(self.start_bifi_scan)
    self.thin_eta_scan_action.triggered.connect(self.start_thin_etalon_scan)

    # PLE
    self.scan_device_up_action.triggered.connect(self.scan_device_up)
    self.scan_device_down_action.triggered.connect(self.scan_device_down)
    self.stop_scan_device_action.triggered.connect(self.stop_scanning_device)

    # Stabilization
    self.slow_pz_control_action.triggered.connect(self.toggle_slow_piezo_control)
    self.thin_eta_control_action.triggered.connect(self.toggle_thin_etalon_control)
    self.piezo_eta_control_action.triggered.connect(self.toggle_piezo_etalon_control)
    self.fast_pz_control_action.triggered.connect(self.toggle_fast_piezo_control)
    self.lock_laser_action.triggered.connect(self.toggle_lock_laser)
    self.set_recommended_fast_pz_setpoint_action.triggered.connect(self.set_recommended_fast_pz_setpoint)
    self.auto_stabilize_action.triggered.connect(self.toggle_auto_stabilization)

    # Shamrock
    self.start_ple_scan_action.triggered.connect(self.start_ple_scan)
    self.analyze_ple_action.triggered.connect(self.analyze_ple_data)
    self.view_existing_analysis_action.triggered.connect(self.view_existing_analysis)
    self.single_acquisition_action.triggered.connect(self.take_single_acquisition)
def setup_widgets(self)

Initialize any widgets the UI needs to run correctly.

Source code
@handled_function
def setup_widgets(self):
    """Initialize any widgets the UI needs to run correctly."""
    self.status_monitor_queue = queue.Queue()
    self.status_monitor = StatusMonitor(self.matisse, self.status_monitor_queue)
    self.layout.addWidget(self.status_monitor)
def setup_window(self)
Source code
def setup_window(self):
    self.window = window = QMainWindow()
    self.layout = QVBoxLayout()
    window.setWindowTitle('Matisse Controller')
    window.resize(700, 300)
def start_bifi_scan(self, checked)
Source code
@handled_slot(bool)
def start_bifi_scan(self, checked):
    self.run_matisse_task(self.matisse.birefringent_filter_scan)
def start_ple_scan(self, checked)
Source code
@handled_slot(bool)
def start_ple_scan(self, checked):
    if self.ple_scan_worker and self.ple_scan_worker.running():
        print('WARNING: A PLE scan is currently in progress.')
        return

    dialog = PLEScanDialog(parent=self.window)
    if dialog.exec() == QDialog.Accepted:
        ple_options = dialog.get_form_data()
        print('Starting PLE scan.')
        self.ple_scan_worker = self.work_executor.submit(self.ple.start_ple_scan, **ple_options)
        self.ple_scan_worker.add_done_callback(utils.raise_error_from_future)
def start_thin_etalon_scan(self, checked)
Source code
@handled_slot(bool)
def start_thin_etalon_scan(self, checked):
    self.run_matisse_task(self.matisse.thin_etalon_scan)
def stop_scanning_device(self, checked)
Source code
@handled_slot(bool)
def stop_scanning_device(self, checked):
    if self.matisse.is_stabilizing():
        print('WARNING: Auto-stabilize is on. Disable it and try again.')
    else:
        self.matisse.stop_scan()
def take_single_acquisition(self, checked)
Source code
@handled_slot(bool)
def take_single_acquisition(self, checked):
    if self.single_acquisition_worker and self.single_acquisition_worker.running():
        print('WARNING: An acquisition is currently in progress.')
        return

    dialog = SingleAcquisitionDialog(parent=self.window)
    if dialog.exec() == QDialog.Accepted:
        acquisition_options = dialog.get_form_data()
        self.single_acquisition_worker = self.work_executor.submit(self.ple.plot_single_acquisition,
                                                                   **acquisition_options)
        self.single_acquisition_worker.add_done_callback(utils.raise_error_from_future)
def toggle_auto_stabilization(self, checked)
Source code
@handled_slot(bool)
def toggle_auto_stabilization(self, checked):
    if self.matisse.is_stabilizing():
        self.matisse.stabilize_off()
    else:
        self.matisse.stabilize_on()
def toggle_fast_piezo_control(self, checked)
Source code
@handled_slot(bool)
def toggle_fast_piezo_control(self, checked):
    print(f"{'Locking' if checked else 'Unlocking'} fast piezo.")
    self.fast_pz_control_action.setChecked(not checked)
    self.matisse.set_piezo_etalon_control(checked)
    self.fast_pz_control_action.setChecked(checked)
def toggle_lock_laser(self, checked)
Source code
@handled_slot(bool)
def toggle_lock_laser(self, checked):
    if self.matisse.is_lock_correction_on():
        self.matisse.stop_laser_lock_correction()
    else:
        self.matisse.start_laser_lock_correction()
def toggle_piezo_etalon_control(self, checked)
Source code
@handled_slot(bool)
def toggle_piezo_etalon_control(self, checked):
    print(f"{'Locking' if checked else 'Unlocking'} piezo etalon.")
    self.piezo_eta_control_action.setChecked(not checked)
    self.matisse.set_piezo_etalon_control(checked)
    self.piezo_eta_control_action.setChecked(checked)
def toggle_slow_piezo_control(self, checked)
Source code
@handled_slot(bool)
def toggle_slow_piezo_control(self, checked):
    print(f"{'Locking' if checked else 'Unlocking'} slow piezo.")
    self.slow_pz_control_action.setChecked(not checked)
    self.matisse.set_slow_piezo_control(checked)
    self.slow_pz_control_action.setChecked(checked)
def toggle_thin_etalon_control(self, checked)
Source code
@handled_slot(bool)
def toggle_thin_etalon_control(self, checked):
    print(f"{'Locking' if checked else 'Unlocking'} thin etalon.")
    self.thin_eta_control_action.setChecked(not checked)
    self.matisse.set_thin_etalon_control(checked)
    self.thin_eta_control_action.setChecked(checked)
def view_existing_analysis(self, checked)
Source code
@handled_slot(bool)
def view_existing_analysis(self, checked):
    if self.ple_analysis_worker and self.ple_analysis_worker.running():
        print('WARNING: A PLE analysis is currently in progress.')
        return

    file_path, success = QFileDialog.getOpenFileName(caption='Select Analysis File',
                                                     filter='Pickle file (*.pickle)')
    if success:
        self.ple_analysis_worker = self.work_executor.submit(self.ple.plot_ple_analysis_file, file_path)
        self.ple_analysis_worker.add_done_callback(utils.raise_error_from_future)