Module matisse_controller.shamrock_ple.ple

Source code
import os
import pickle
import time
from multiprocessing import Pipe

import numpy as np

import matisse_controller.config as cfg
from matisse_controller.shamrock_ple.ccd import CCD
from matisse_controller.shamrock_ple.plotting import *
from matisse_controller.shamrock_ple.shamrock import Shamrock

ccd: CCD = None
shamrock: Shamrock = None


# TODO: Method to gracefully close all plots
class PLE:
    """PLE scanning functionality with the Andor Shamrock and Newton CCD."""

    def __init__(self, matisse):
        self.matisse = matisse
        self.ple_exit_flag = False
        self.analysis_plot_processes = []
        self.spectrum_plot_processes = []

    @staticmethod
    def load_andor_libs():
        """
        Initialize the interfaces to the Andor Shamrock and Newton CCD. This only needs to be run once, since the two
        devices are global variables.
        """
        global ccd
        global shamrock
        if ccd is None:
            ccd = CCD()
            print('CCD initialized.')
        if shamrock is None:
            shamrock = Shamrock()
            print('Shamrock initialized.')

    @staticmethod
    def clean_up_globals():
        """
        Remove references to the Shamrock and Newton, allowing us to re-initialize them again later.
        """
        global ccd
        global shamrock
        ccd = None
        shamrock = None

    def start_ple_scan(self, scan_name: str, scan_location: str, initial_wavelength: float, final_wavelength: float,
                       step: float, center_wavelength: float, grating_grooves: int, *ccd_args, plot_analysis=False,
                       integration_start=None, integration_end=None, **ccd_kwargs):
        """
        Perform a PLE scan using the Andor Shamrock spectrometer and Newton CCD.

        Generates text files with data from each spectrum taken during the scan, and pickles the Python dictionary of
        all data into {scan_name}.pickle.

        Parameters
        ----------
        scan_name
            a unique name to give the PLE measurement, which will be included in the name of all the data files
        scan_location
            the name of a folder to contain all relevant scan data
        initial_wavelength
            starting wavelength for the PLE scan
        final_wavelength
            ending wavelength for the PLE scan
        step
            the desired change in wavelength between each individual scan
        center_wavelength
            the wavelength at which to set the spectrometer
        grating_grooves
            the number of grooves to use for the spectrometer grating
        plot_analysis
            whether to plot the PLE analysis in real time
        integration_start : float
            the wavelength at which to start integration for real-time analysis plotting
        integration_end : float
            the wavelength at which to stop integration for real-time analysis plotting
        *ccd_args
            args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        **ccd_kwargs
            kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        """
        self.ple_exit_flag = False

        if not scan_name:
            print('WARNING: Name of PLE scan is required.')
            return
        if not scan_location:
            print('WARNING: Location of PLE scan is required.')
            return

        data_file_name = os.path.join(scan_location, f"{scan_name}.pickle")

        if os.path.exists(data_file_name):
            print(f"WARNING: A PLE scan has already been run for '{scan_name}'. Choose a new name and try again.")
            return

        PLE.load_andor_libs()
        print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
        shamrock.set_grating_grooves(grating_grooves)
        shamrock.set_center_wavelength(center_wavelength)
        if self.ple_exit_flag:
            return
        ccd.setup(*ccd_args, **ccd_kwargs)
        wavelengths = np.append(np.arange(initial_wavelength, final_wavelength, step), final_wavelength)
        wavelength_range = abs(round(final_wavelength - initial_wavelength, cfg.get(cfg.WAVEMETER_PRECISION)))
        counter = 1
        file_name = ''

        pl_pipe_in, pl_pipe_out = Pipe()
        pl_plot_process = SpectrumPlotProcess(pipe=pl_pipe_out, daemon=True)
        self.spectrum_plot_processes.append(pl_plot_process)
        pl_plot_process.start()

        if plot_analysis:
            analysis_pipe_in, analysis_pipe_out = Pipe()
            analysis_plot_process = PLEAnalysisPlotProcess(pipe=analysis_pipe_out, daemon=True)
            self.analysis_plot_processes.append(analysis_plot_process)
            analysis_plot_process.start()

        data = {
            'grating_grooves': grating_grooves,
            'center_wavelength': center_wavelength
        }
        for wavelength in wavelengths:
            print(f"Starting acquisition {counter}/{len(wavelengths)}.")
            wavelength = round(float(wavelength), cfg.get(cfg.WAVEMETER_PRECISION))
            self.lock_at_wavelength(wavelength)
            if self.ple_exit_flag:
                print('Received exit signal, saving PLE data.')
                break
            acquisition_data = ccd.take_acquisition()  # FVB mode bins into each column, so this only grabs points along width
            file_name = os.path.join(scan_location, f"{str(counter).zfill(3)}_{scan_name}_{wavelength}nm"
                                                    f"_StepSize_{step}nm_Range_{wavelength_range}nm.txt")
            np.savetxt(file_name, acquisition_data)

            acq_wavelengths = self.pixels_to_wavelengths(range(len(acquisition_data)), center_wavelength, grating_grooves)
            pl_pipe_in.send((acq_wavelengths, acquisition_data))

            if plot_analysis:
                start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                                         center_wavelength, grating_grooves)
                analysis_pipe_in.send((wavelength, sum(acquisition_data[start_pixel:end_pixel])))

            data[wavelength] = acquisition_data
            counter += 1

        pl_pipe_in.send(None)
        if file_name:
            self.plot_single_acquisition(center_wavelength, grating_grooves, data_file=file_name)

        with open(data_file_name, 'wb') as data_file:
            pickle.dump(data, data_file, pickle.HIGHEST_PROTOCOL)
        print('Finished PLE scan.')

    def lock_at_wavelength(self, wavelength: float):
        """Try to lock the Matisse at a given wavelength, waiting to return until we're within a small tolerance."""
        tolerance = 10 ** -cfg.get(cfg.WAVEMETER_PRECISION)
        self.matisse.set_wavelength(wavelength)
        while abs(wavelength - self.matisse.wavemeter_wavelength()) >= tolerance or \
                (self.matisse.is_setting_wavelength or self.matisse.is_scanning_bifi or self.matisse.is_scanning_thin_etalon):
            if self.ple_exit_flag:
                break
            time.sleep(3)

    def stop_ple_tasks(self):
        """Trigger the exit flags to stop running scans and PLE measurements."""
        self.ple_exit_flag = True
        if ccd:
            ccd.exit_flag = True

    def analyze_ple_data(self, analysis_name: str, data_file_path: str, integration_start: float, integration_end: float,
                         background_file_path=''):
        """
        Sum the counts of all spectra for a set of PLE measurements and plot them against wavelength.

        Loads PLE data from a .pickle file and pickles integrated counts for each wavelength into another .pickle file.
        Optionally subtract background from given file name. The background file should be loadable with numpy.loadtxt.

        Parameters
        ----------
        data_file_path
            the path to the .pickle file containing the PLE measurement data
        integration_start
            start of integration region (nm) for tallying the counts
        integration_end
            end of integration region (nm) for tallying the counts
        background_file_path
            the name of a file to use for subtracting background, should be loadable with numpy.loadtxt
        """
        self.ple_exit_flag = False

        if not data_file_path:
            print('WARNING: No data file provided to analyze.')
            return
        if not analysis_name:
            print('WARNING: Name of analysis is required.')
            return

        data_dir = os.path.abspath(os.path.dirname(data_file_path))
        analysis_file_path = os.path.join(data_dir, f"{analysis_name}.pickle")

        if os.path.exists(analysis_file_path):
            print(f"WARNING: An analysis called '{analysis_name}' already exists. Choose a new name and try again.")
            return

        with open(data_file_path, 'rb') as full_data_file:
            scans = pickle.load(full_data_file)

        if background_file_path:
            background_data = np.loadtxt(background_file_path)
        else:
            background_data = None

        center_wavelength = scans.pop('center_wavelength')
        grating_grooves = scans.pop('grating_grooves')
        start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                                 center_wavelength, grating_grooves)
        total_counts = {}
        for wavelength in scans.keys():
            if self.ple_exit_flag:
                print('Received exit signal, saving PLE data.')
                break
            if background_data and background_data.any():
                scans[wavelength] = scans[wavelength].astype(np.double)
                scans[wavelength] -= background_data
            total_counts[wavelength] = sum(scans[wavelength][start_pixel:end_pixel])

        with open(analysis_file_path, 'wb') as analysis_file:
            pickle.dump(total_counts, analysis_file, pickle.HIGHEST_PROTOCOL)

        plot_process = PLEAnalysisPlotProcess(total_counts, daemon=True)
        self.analysis_plot_processes.append(plot_process)
        plot_process.start()

    def plot_ple_analysis_file(self, analysis_file_path: str):
        """Plot the PLE analysis data from the given .pickle file."""
        with open(analysis_file_path, 'rb') as analysis_file:
            data = pickle.load(analysis_file)
        plot_process = PLEAnalysisPlotProcess(data, daemon=True)
        self.analysis_plot_processes.append(plot_process)
        plot_process.start()

    def plot_single_acquisition(self, center_wavelength: float, grating_grooves: int, *ccd_args, data_file=None,
                                **ccd_kwargs):
        """
        Plot a single acquisition from the CCD at the given center wavelength and using the grating with the given
        number of grooves. If a data file name is specified, this will skip reading the CCD and just plot the data
        in that file.

        Parameters
        ----------
        center_wavelength
            the wavelength at which to set the spectrometer
        grating_grooves
            the number of grooves to use for the spectrometer grating
        data_file
            file name containing data to plot - if None, will grab data from the CCD
        *ccd_args
            args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        **ccd_kwargs
            kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        """
        self.ple_exit_flag = False
        if data_file:
            data = np.loadtxt(data_file)
        else:
            PLE.load_andor_libs()
            print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
            shamrock.set_grating_grooves(grating_grooves)
            shamrock.set_center_wavelength(center_wavelength)
            if self.ple_exit_flag:
                return
            ccd.setup(*ccd_args, **ccd_kwargs)
            data = ccd.take_acquisition()

        wavelengths = self.pixels_to_wavelengths(range(len(data)), center_wavelength, grating_grooves)

        plot_process = SpectrumPlotProcess(wavelengths, data, daemon=True)
        self.spectrum_plot_processes.append(plot_process)
        plot_process.start()

    def pixels_to_wavelengths(self, pixels, center_wavelength: float, grating_grooves: int):
        """
        Convert pixels to nanometers using given spectrometer settings.

        Parameters
        ----------
        pixels
            an iterable of pixel indices to be converted to wavelengths
        center_wavelength
            the center wavelength used to take the CCD data
        grating_grooves
            the number of grooves for the grating used to take the CCD data

        Returns
        -------
        ndarray
            an array of wavelengths that each correspond to a pixel on the CCD screen
        """
        nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
        offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
        # Point-slope formula for calculating wavelengths from pixels
        # Use pixel + 1 because indexes range from 0 to 1023, CCD center is at 512 but zero-indexing would put it at 511
        wavelengths = [nm_per_pixel * (pixel + 1 - CCD.WIDTH / 2) + center_wavelength + offset for pixel in pixels]
        return np.array(wavelengths)

    def find_integration_endpoints(self, start_wavelength: float, end_wavelength: float, center_wavelength: float,
                                   grating_grooves: int):
        """
        Convert a starting and ending wavelength to CCD pixels.

        Parameters
        ----------
        start_wavelength
            starting point of integration, in nanometers
        end_wavelength
            ending point of integration, in nanometers
        center_wavelength
            the wavelength at which the spectrometer was set
        grating_grooves
            the number of grooves used for the spectrometer grating

        Returns
        -------
        (int, int)
            the start and end pixels corresponding to the given start and end wavelengths
        """
        nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
        offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
        # Invert pixel -> wavelength conversion
        start_pixel = int(CCD.WIDTH / 2 - 1 + (start_wavelength - center_wavelength - offset) / nm_per_pixel)
        end_pixel = int(CCD.WIDTH / 2 - 1 + (end_wavelength - center_wavelength - offset) / nm_per_pixel)
        return start_pixel, end_pixel

Classes

class PLE (matisse)

PLE scanning functionality with the Andor Shamrock and Newton CCD.

Source code
class PLE:
    """PLE scanning functionality with the Andor Shamrock and Newton CCD."""

    def __init__(self, matisse):
        self.matisse = matisse
        self.ple_exit_flag = False
        self.analysis_plot_processes = []
        self.spectrum_plot_processes = []

    @staticmethod
    def load_andor_libs():
        """
        Initialize the interfaces to the Andor Shamrock and Newton CCD. This only needs to be run once, since the two
        devices are global variables.
        """
        global ccd
        global shamrock
        if ccd is None:
            ccd = CCD()
            print('CCD initialized.')
        if shamrock is None:
            shamrock = Shamrock()
            print('Shamrock initialized.')

    @staticmethod
    def clean_up_globals():
        """
        Remove references to the Shamrock and Newton, allowing us to re-initialize them again later.
        """
        global ccd
        global shamrock
        ccd = None
        shamrock = None

    def start_ple_scan(self, scan_name: str, scan_location: str, initial_wavelength: float, final_wavelength: float,
                       step: float, center_wavelength: float, grating_grooves: int, *ccd_args, plot_analysis=False,
                       integration_start=None, integration_end=None, **ccd_kwargs):
        """
        Perform a PLE scan using the Andor Shamrock spectrometer and Newton CCD.

        Generates text files with data from each spectrum taken during the scan, and pickles the Python dictionary of
        all data into {scan_name}.pickle.

        Parameters
        ----------
        scan_name
            a unique name to give the PLE measurement, which will be included in the name of all the data files
        scan_location
            the name of a folder to contain all relevant scan data
        initial_wavelength
            starting wavelength for the PLE scan
        final_wavelength
            ending wavelength for the PLE scan
        step
            the desired change in wavelength between each individual scan
        center_wavelength
            the wavelength at which to set the spectrometer
        grating_grooves
            the number of grooves to use for the spectrometer grating
        plot_analysis
            whether to plot the PLE analysis in real time
        integration_start : float
            the wavelength at which to start integration for real-time analysis plotting
        integration_end : float
            the wavelength at which to stop integration for real-time analysis plotting
        *ccd_args
            args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        **ccd_kwargs
            kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        """
        self.ple_exit_flag = False

        if not scan_name:
            print('WARNING: Name of PLE scan is required.')
            return
        if not scan_location:
            print('WARNING: Location of PLE scan is required.')
            return

        data_file_name = os.path.join(scan_location, f"{scan_name}.pickle")

        if os.path.exists(data_file_name):
            print(f"WARNING: A PLE scan has already been run for '{scan_name}'. Choose a new name and try again.")
            return

        PLE.load_andor_libs()
        print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
        shamrock.set_grating_grooves(grating_grooves)
        shamrock.set_center_wavelength(center_wavelength)
        if self.ple_exit_flag:
            return
        ccd.setup(*ccd_args, **ccd_kwargs)
        wavelengths = np.append(np.arange(initial_wavelength, final_wavelength, step), final_wavelength)
        wavelength_range = abs(round(final_wavelength - initial_wavelength, cfg.get(cfg.WAVEMETER_PRECISION)))
        counter = 1
        file_name = ''

        pl_pipe_in, pl_pipe_out = Pipe()
        pl_plot_process = SpectrumPlotProcess(pipe=pl_pipe_out, daemon=True)
        self.spectrum_plot_processes.append(pl_plot_process)
        pl_plot_process.start()

        if plot_analysis:
            analysis_pipe_in, analysis_pipe_out = Pipe()
            analysis_plot_process = PLEAnalysisPlotProcess(pipe=analysis_pipe_out, daemon=True)
            self.analysis_plot_processes.append(analysis_plot_process)
            analysis_plot_process.start()

        data = {
            'grating_grooves': grating_grooves,
            'center_wavelength': center_wavelength
        }
        for wavelength in wavelengths:
            print(f"Starting acquisition {counter}/{len(wavelengths)}.")
            wavelength = round(float(wavelength), cfg.get(cfg.WAVEMETER_PRECISION))
            self.lock_at_wavelength(wavelength)
            if self.ple_exit_flag:
                print('Received exit signal, saving PLE data.')
                break
            acquisition_data = ccd.take_acquisition()  # FVB mode bins into each column, so this only grabs points along width
            file_name = os.path.join(scan_location, f"{str(counter).zfill(3)}_{scan_name}_{wavelength}nm"
                                                    f"_StepSize_{step}nm_Range_{wavelength_range}nm.txt")
            np.savetxt(file_name, acquisition_data)

            acq_wavelengths = self.pixels_to_wavelengths(range(len(acquisition_data)), center_wavelength, grating_grooves)
            pl_pipe_in.send((acq_wavelengths, acquisition_data))

            if plot_analysis:
                start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                                         center_wavelength, grating_grooves)
                analysis_pipe_in.send((wavelength, sum(acquisition_data[start_pixel:end_pixel])))

            data[wavelength] = acquisition_data
            counter += 1

        pl_pipe_in.send(None)
        if file_name:
            self.plot_single_acquisition(center_wavelength, grating_grooves, data_file=file_name)

        with open(data_file_name, 'wb') as data_file:
            pickle.dump(data, data_file, pickle.HIGHEST_PROTOCOL)
        print('Finished PLE scan.')

    def lock_at_wavelength(self, wavelength: float):
        """Try to lock the Matisse at a given wavelength, waiting to return until we're within a small tolerance."""
        tolerance = 10 ** -cfg.get(cfg.WAVEMETER_PRECISION)
        self.matisse.set_wavelength(wavelength)
        while abs(wavelength - self.matisse.wavemeter_wavelength()) >= tolerance or \
                (self.matisse.is_setting_wavelength or self.matisse.is_scanning_bifi or self.matisse.is_scanning_thin_etalon):
            if self.ple_exit_flag:
                break
            time.sleep(3)

    def stop_ple_tasks(self):
        """Trigger the exit flags to stop running scans and PLE measurements."""
        self.ple_exit_flag = True
        if ccd:
            ccd.exit_flag = True

    def analyze_ple_data(self, analysis_name: str, data_file_path: str, integration_start: float, integration_end: float,
                         background_file_path=''):
        """
        Sum the counts of all spectra for a set of PLE measurements and plot them against wavelength.

        Loads PLE data from a .pickle file and pickles integrated counts for each wavelength into another .pickle file.
        Optionally subtract background from given file name. The background file should be loadable with numpy.loadtxt.

        Parameters
        ----------
        data_file_path
            the path to the .pickle file containing the PLE measurement data
        integration_start
            start of integration region (nm) for tallying the counts
        integration_end
            end of integration region (nm) for tallying the counts
        background_file_path
            the name of a file to use for subtracting background, should be loadable with numpy.loadtxt
        """
        self.ple_exit_flag = False

        if not data_file_path:
            print('WARNING: No data file provided to analyze.')
            return
        if not analysis_name:
            print('WARNING: Name of analysis is required.')
            return

        data_dir = os.path.abspath(os.path.dirname(data_file_path))
        analysis_file_path = os.path.join(data_dir, f"{analysis_name}.pickle")

        if os.path.exists(analysis_file_path):
            print(f"WARNING: An analysis called '{analysis_name}' already exists. Choose a new name and try again.")
            return

        with open(data_file_path, 'rb') as full_data_file:
            scans = pickle.load(full_data_file)

        if background_file_path:
            background_data = np.loadtxt(background_file_path)
        else:
            background_data = None

        center_wavelength = scans.pop('center_wavelength')
        grating_grooves = scans.pop('grating_grooves')
        start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                                 center_wavelength, grating_grooves)
        total_counts = {}
        for wavelength in scans.keys():
            if self.ple_exit_flag:
                print('Received exit signal, saving PLE data.')
                break
            if background_data and background_data.any():
                scans[wavelength] = scans[wavelength].astype(np.double)
                scans[wavelength] -= background_data
            total_counts[wavelength] = sum(scans[wavelength][start_pixel:end_pixel])

        with open(analysis_file_path, 'wb') as analysis_file:
            pickle.dump(total_counts, analysis_file, pickle.HIGHEST_PROTOCOL)

        plot_process = PLEAnalysisPlotProcess(total_counts, daemon=True)
        self.analysis_plot_processes.append(plot_process)
        plot_process.start()

    def plot_ple_analysis_file(self, analysis_file_path: str):
        """Plot the PLE analysis data from the given .pickle file."""
        with open(analysis_file_path, 'rb') as analysis_file:
            data = pickle.load(analysis_file)
        plot_process = PLEAnalysisPlotProcess(data, daemon=True)
        self.analysis_plot_processes.append(plot_process)
        plot_process.start()

    def plot_single_acquisition(self, center_wavelength: float, grating_grooves: int, *ccd_args, data_file=None,
                                **ccd_kwargs):
        """
        Plot a single acquisition from the CCD at the given center wavelength and using the grating with the given
        number of grooves. If a data file name is specified, this will skip reading the CCD and just plot the data
        in that file.

        Parameters
        ----------
        center_wavelength
            the wavelength at which to set the spectrometer
        grating_grooves
            the number of grooves to use for the spectrometer grating
        data_file
            file name containing data to plot - if None, will grab data from the CCD
        *ccd_args
            args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        **ccd_kwargs
            kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
        """
        self.ple_exit_flag = False
        if data_file:
            data = np.loadtxt(data_file)
        else:
            PLE.load_andor_libs()
            print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
            shamrock.set_grating_grooves(grating_grooves)
            shamrock.set_center_wavelength(center_wavelength)
            if self.ple_exit_flag:
                return
            ccd.setup(*ccd_args, **ccd_kwargs)
            data = ccd.take_acquisition()

        wavelengths = self.pixels_to_wavelengths(range(len(data)), center_wavelength, grating_grooves)

        plot_process = SpectrumPlotProcess(wavelengths, data, daemon=True)
        self.spectrum_plot_processes.append(plot_process)
        plot_process.start()

    def pixels_to_wavelengths(self, pixels, center_wavelength: float, grating_grooves: int):
        """
        Convert pixels to nanometers using given spectrometer settings.

        Parameters
        ----------
        pixels
            an iterable of pixel indices to be converted to wavelengths
        center_wavelength
            the center wavelength used to take the CCD data
        grating_grooves
            the number of grooves for the grating used to take the CCD data

        Returns
        -------
        ndarray
            an array of wavelengths that each correspond to a pixel on the CCD screen
        """
        nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
        offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
        # Point-slope formula for calculating wavelengths from pixels
        # Use pixel + 1 because indexes range from 0 to 1023, CCD center is at 512 but zero-indexing would put it at 511
        wavelengths = [nm_per_pixel * (pixel + 1 - CCD.WIDTH / 2) + center_wavelength + offset for pixel in pixels]
        return np.array(wavelengths)

    def find_integration_endpoints(self, start_wavelength: float, end_wavelength: float, center_wavelength: float,
                                   grating_grooves: int):
        """
        Convert a starting and ending wavelength to CCD pixels.

        Parameters
        ----------
        start_wavelength
            starting point of integration, in nanometers
        end_wavelength
            ending point of integration, in nanometers
        center_wavelength
            the wavelength at which the spectrometer was set
        grating_grooves
            the number of grooves used for the spectrometer grating

        Returns
        -------
        (int, int)
            the start and end pixels corresponding to the given start and end wavelengths
        """
        nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
        offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
        # Invert pixel -> wavelength conversion
        start_pixel = int(CCD.WIDTH / 2 - 1 + (start_wavelength - center_wavelength - offset) / nm_per_pixel)
        end_pixel = int(CCD.WIDTH / 2 - 1 + (end_wavelength - center_wavelength - offset) / nm_per_pixel)
        return start_pixel, end_pixel

Static methods

def clean_up_globals()

Remove references to the Shamrock and Newton, allowing us to re-initialize them again later.

Source code
@staticmethod
def clean_up_globals():
    """
    Remove references to the Shamrock and Newton, allowing us to re-initialize them again later.
    """
    global ccd
    global shamrock
    ccd = None
    shamrock = None
def load_andor_libs()

Initialize the interfaces to the Andor Shamrock and Newton CCD. This only needs to be run once, since the two devices are global variables.

Source code
@staticmethod
def load_andor_libs():
    """
    Initialize the interfaces to the Andor Shamrock and Newton CCD. This only needs to be run once, since the two
    devices are global variables.
    """
    global ccd
    global shamrock
    if ccd is None:
        ccd = CCD()
        print('CCD initialized.')
    if shamrock is None:
        shamrock = Shamrock()
        print('Shamrock initialized.')

Methods

def analyze_ple_data(self, analysis_name, data_file_path, integration_start, integration_end, background_file_path='')

Sum the counts of all spectra for a set of PLE measurements and plot them against wavelength.

Loads PLE data from a .pickle file and pickles integrated counts for each wavelength into another .pickle file. Optionally subtract background from given file name. The background file should be loadable with numpy.loadtxt.

Parameters

data_file_path
the path to the .pickle file containing the PLE measurement data
integration_start
start of integration region (nm) for tallying the counts
integration_end
end of integration region (nm) for tallying the counts
background_file_path
the name of a file to use for subtracting background, should be loadable with numpy.loadtxt
Source code
def analyze_ple_data(self, analysis_name: str, data_file_path: str, integration_start: float, integration_end: float,
                     background_file_path=''):
    """
    Sum the counts of all spectra for a set of PLE measurements and plot them against wavelength.

    Loads PLE data from a .pickle file and pickles integrated counts for each wavelength into another .pickle file.
    Optionally subtract background from given file name. The background file should be loadable with numpy.loadtxt.

    Parameters
    ----------
    data_file_path
        the path to the .pickle file containing the PLE measurement data
    integration_start
        start of integration region (nm) for tallying the counts
    integration_end
        end of integration region (nm) for tallying the counts
    background_file_path
        the name of a file to use for subtracting background, should be loadable with numpy.loadtxt
    """
    self.ple_exit_flag = False

    if not data_file_path:
        print('WARNING: No data file provided to analyze.')
        return
    if not analysis_name:
        print('WARNING: Name of analysis is required.')
        return

    data_dir = os.path.abspath(os.path.dirname(data_file_path))
    analysis_file_path = os.path.join(data_dir, f"{analysis_name}.pickle")

    if os.path.exists(analysis_file_path):
        print(f"WARNING: An analysis called '{analysis_name}' already exists. Choose a new name and try again.")
        return

    with open(data_file_path, 'rb') as full_data_file:
        scans = pickle.load(full_data_file)

    if background_file_path:
        background_data = np.loadtxt(background_file_path)
    else:
        background_data = None

    center_wavelength = scans.pop('center_wavelength')
    grating_grooves = scans.pop('grating_grooves')
    start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                             center_wavelength, grating_grooves)
    total_counts = {}
    for wavelength in scans.keys():
        if self.ple_exit_flag:
            print('Received exit signal, saving PLE data.')
            break
        if background_data and background_data.any():
            scans[wavelength] = scans[wavelength].astype(np.double)
            scans[wavelength] -= background_data
        total_counts[wavelength] = sum(scans[wavelength][start_pixel:end_pixel])

    with open(analysis_file_path, 'wb') as analysis_file:
        pickle.dump(total_counts, analysis_file, pickle.HIGHEST_PROTOCOL)

    plot_process = PLEAnalysisPlotProcess(total_counts, daemon=True)
    self.analysis_plot_processes.append(plot_process)
    plot_process.start()
def find_integration_endpoints(self, start_wavelength, end_wavelength, center_wavelength, grating_grooves)

Convert a starting and ending wavelength to CCD pixels.

Parameters

start_wavelength
starting point of integration, in nanometers
end_wavelength
ending point of integration, in nanometers
center_wavelength
the wavelength at which the spectrometer was set
grating_grooves
the number of grooves used for the spectrometer grating

Returns

(int, int) the start and end pixels corresponding to the given start and end wavelengths

Source code
def find_integration_endpoints(self, start_wavelength: float, end_wavelength: float, center_wavelength: float,
                               grating_grooves: int):
    """
    Convert a starting and ending wavelength to CCD pixels.

    Parameters
    ----------
    start_wavelength
        starting point of integration, in nanometers
    end_wavelength
        ending point of integration, in nanometers
    center_wavelength
        the wavelength at which the spectrometer was set
    grating_grooves
        the number of grooves used for the spectrometer grating

    Returns
    -------
    (int, int)
        the start and end pixels corresponding to the given start and end wavelengths
    """
    nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
    offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
    # Invert pixel -> wavelength conversion
    start_pixel = int(CCD.WIDTH / 2 - 1 + (start_wavelength - center_wavelength - offset) / nm_per_pixel)
    end_pixel = int(CCD.WIDTH / 2 - 1 + (end_wavelength - center_wavelength - offset) / nm_per_pixel)
    return start_pixel, end_pixel
def lock_at_wavelength(self, wavelength)

Try to lock the Matisse at a given wavelength, waiting to return until we're within a small tolerance.

Source code
def lock_at_wavelength(self, wavelength: float):
    """Try to lock the Matisse at a given wavelength, waiting to return until we're within a small tolerance."""
    tolerance = 10 ** -cfg.get(cfg.WAVEMETER_PRECISION)
    self.matisse.set_wavelength(wavelength)
    while abs(wavelength - self.matisse.wavemeter_wavelength()) >= tolerance or \
            (self.matisse.is_setting_wavelength or self.matisse.is_scanning_bifi or self.matisse.is_scanning_thin_etalon):
        if self.ple_exit_flag:
            break
        time.sleep(3)
def pixels_to_wavelengths(self, pixels, center_wavelength, grating_grooves)

Convert pixels to nanometers using given spectrometer settings.

Parameters

pixels
an iterable of pixel indices to be converted to wavelengths
center_wavelength
the center wavelength used to take the CCD data
grating_grooves
the number of grooves for the grating used to take the CCD data

Returns

ndarray
an array of wavelengths that each correspond to a pixel on the CCD screen
Source code
def pixels_to_wavelengths(self, pixels, center_wavelength: float, grating_grooves: int):
    """
    Convert pixels to nanometers using given spectrometer settings.

    Parameters
    ----------
    pixels
        an iterable of pixel indices to be converted to wavelengths
    center_wavelength
        the center wavelength used to take the CCD data
    grating_grooves
        the number of grooves for the grating used to take the CCD data

    Returns
    -------
    ndarray
        an array of wavelengths that each correspond to a pixel on the CCD screen
    """
    nm_per_pixel = Shamrock.GRATINGS_NM_PER_PIXEL[grating_grooves]
    offset = Shamrock.GRATINGS_OFFSET_NM[grating_grooves]
    # Point-slope formula for calculating wavelengths from pixels
    # Use pixel + 1 because indexes range from 0 to 1023, CCD center is at 512 but zero-indexing would put it at 511
    wavelengths = [nm_per_pixel * (pixel + 1 - CCD.WIDTH / 2) + center_wavelength + offset for pixel in pixels]
    return np.array(wavelengths)
def plot_ple_analysis_file(self, analysis_file_path)

Plot the PLE analysis data from the given .pickle file.

Source code
def plot_ple_analysis_file(self, analysis_file_path: str):
    """Plot the PLE analysis data from the given .pickle file."""
    with open(analysis_file_path, 'rb') as analysis_file:
        data = pickle.load(analysis_file)
    plot_process = PLEAnalysisPlotProcess(data, daemon=True)
    self.analysis_plot_processes.append(plot_process)
    plot_process.start()
def plot_single_acquisition(self, center_wavelength, grating_grooves, *ccd_args, data_file=None, **ccd_kwargs)

Plot a single acquisition from the CCD at the given center wavelength and using the grating with the given number of grooves. If a data file name is specified, this will skip reading the CCD and just plot the data in that file.

Parameters

center_wavelength
the wavelength at which to set the spectrometer
grating_grooves
the number of grooves to use for the spectrometer grating
data_file
file name containing data to plot - if None, will grab data from the CCD
*ccd_args
args to pass to CCD.setup()
**ccd_kwargs
kwargs to pass to CCD.setup()
Source code
def plot_single_acquisition(self, center_wavelength: float, grating_grooves: int, *ccd_args, data_file=None,
                            **ccd_kwargs):
    """
    Plot a single acquisition from the CCD at the given center wavelength and using the grating with the given
    number of grooves. If a data file name is specified, this will skip reading the CCD and just plot the data
    in that file.

    Parameters
    ----------
    center_wavelength
        the wavelength at which to set the spectrometer
    grating_grooves
        the number of grooves to use for the spectrometer grating
    data_file
        file name containing data to plot - if None, will grab data from the CCD
    *ccd_args
        args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
    **ccd_kwargs
        kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
    """
    self.ple_exit_flag = False
    if data_file:
        data = np.loadtxt(data_file)
    else:
        PLE.load_andor_libs()
        print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
        shamrock.set_grating_grooves(grating_grooves)
        shamrock.set_center_wavelength(center_wavelength)
        if self.ple_exit_flag:
            return
        ccd.setup(*ccd_args, **ccd_kwargs)
        data = ccd.take_acquisition()

    wavelengths = self.pixels_to_wavelengths(range(len(data)), center_wavelength, grating_grooves)

    plot_process = SpectrumPlotProcess(wavelengths, data, daemon=True)
    self.spectrum_plot_processes.append(plot_process)
    plot_process.start()
def start_ple_scan(self, scan_name, scan_location, initial_wavelength, final_wavelength, step, center_wavelength, grating_grooves, *ccd_args, plot_analysis=False, integration_start=None, integration_end=None, **ccd_kwargs)

Perform a PLE scan using the Andor Shamrock spectrometer and Newton CCD.

Generates text files with data from each spectrum taken during the scan, and pickles the Python dictionary of all data into {scan_name}.pickle.

Parameters

scan_name
a unique name to give the PLE measurement, which will be included in the name of all the data files
scan_location
the name of a folder to contain all relevant scan data
initial_wavelength
starting wavelength for the PLE scan
final_wavelength
ending wavelength for the PLE scan
step
the desired change in wavelength between each individual scan
center_wavelength
the wavelength at which to set the spectrometer
grating_grooves
the number of grooves to use for the spectrometer grating
plot_analysis
whether to plot the PLE analysis in real time
integration_start : float
the wavelength at which to start integration for real-time analysis plotting
integration_end : float
the wavelength at which to stop integration for real-time analysis plotting
*ccd_args
args to pass to CCD.setup()
**ccd_kwargs
kwargs to pass to CCD.setup()
Source code
def start_ple_scan(self, scan_name: str, scan_location: str, initial_wavelength: float, final_wavelength: float,
                   step: float, center_wavelength: float, grating_grooves: int, *ccd_args, plot_analysis=False,
                   integration_start=None, integration_end=None, **ccd_kwargs):
    """
    Perform a PLE scan using the Andor Shamrock spectrometer and Newton CCD.

    Generates text files with data from each spectrum taken during the scan, and pickles the Python dictionary of
    all data into {scan_name}.pickle.

    Parameters
    ----------
    scan_name
        a unique name to give the PLE measurement, which will be included in the name of all the data files
    scan_location
        the name of a folder to contain all relevant scan data
    initial_wavelength
        starting wavelength for the PLE scan
    final_wavelength
        ending wavelength for the PLE scan
    step
        the desired change in wavelength between each individual scan
    center_wavelength
        the wavelength at which to set the spectrometer
    grating_grooves
        the number of grooves to use for the spectrometer grating
    plot_analysis
        whether to plot the PLE analysis in real time
    integration_start : float
        the wavelength at which to start integration for real-time analysis plotting
    integration_end : float
        the wavelength at which to stop integration for real-time analysis plotting
    *ccd_args
        args to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
    **ccd_kwargs
        kwargs to pass to `matisse_controller.shamrock_ple.ccd.CCD.setup`
    """
    self.ple_exit_flag = False

    if not scan_name:
        print('WARNING: Name of PLE scan is required.')
        return
    if not scan_location:
        print('WARNING: Location of PLE scan is required.')
        return

    data_file_name = os.path.join(scan_location, f"{scan_name}.pickle")

    if os.path.exists(data_file_name):
        print(f"WARNING: A PLE scan has already been run for '{scan_name}'. Choose a new name and try again.")
        return

    PLE.load_andor_libs()
    print(f"Setting spectrometer grating to {grating_grooves} grvs and center wavelength to {center_wavelength}...")
    shamrock.set_grating_grooves(grating_grooves)
    shamrock.set_center_wavelength(center_wavelength)
    if self.ple_exit_flag:
        return
    ccd.setup(*ccd_args, **ccd_kwargs)
    wavelengths = np.append(np.arange(initial_wavelength, final_wavelength, step), final_wavelength)
    wavelength_range = abs(round(final_wavelength - initial_wavelength, cfg.get(cfg.WAVEMETER_PRECISION)))
    counter = 1
    file_name = ''

    pl_pipe_in, pl_pipe_out = Pipe()
    pl_plot_process = SpectrumPlotProcess(pipe=pl_pipe_out, daemon=True)
    self.spectrum_plot_processes.append(pl_plot_process)
    pl_plot_process.start()

    if plot_analysis:
        analysis_pipe_in, analysis_pipe_out = Pipe()
        analysis_plot_process = PLEAnalysisPlotProcess(pipe=analysis_pipe_out, daemon=True)
        self.analysis_plot_processes.append(analysis_plot_process)
        analysis_plot_process.start()

    data = {
        'grating_grooves': grating_grooves,
        'center_wavelength': center_wavelength
    }
    for wavelength in wavelengths:
        print(f"Starting acquisition {counter}/{len(wavelengths)}.")
        wavelength = round(float(wavelength), cfg.get(cfg.WAVEMETER_PRECISION))
        self.lock_at_wavelength(wavelength)
        if self.ple_exit_flag:
            print('Received exit signal, saving PLE data.')
            break
        acquisition_data = ccd.take_acquisition()  # FVB mode bins into each column, so this only grabs points along width
        file_name = os.path.join(scan_location, f"{str(counter).zfill(3)}_{scan_name}_{wavelength}nm"
                                                f"_StepSize_{step}nm_Range_{wavelength_range}nm.txt")
        np.savetxt(file_name, acquisition_data)

        acq_wavelengths = self.pixels_to_wavelengths(range(len(acquisition_data)), center_wavelength, grating_grooves)
        pl_pipe_in.send((acq_wavelengths, acquisition_data))

        if plot_analysis:
            start_pixel, end_pixel = self.find_integration_endpoints(integration_start, integration_end,
                                                                     center_wavelength, grating_grooves)
            analysis_pipe_in.send((wavelength, sum(acquisition_data[start_pixel:end_pixel])))

        data[wavelength] = acquisition_data
        counter += 1

    pl_pipe_in.send(None)
    if file_name:
        self.plot_single_acquisition(center_wavelength, grating_grooves, data_file=file_name)

    with open(data_file_name, 'wb') as data_file:
        pickle.dump(data, data_file, pickle.HIGHEST_PROTOCOL)
    print('Finished PLE scan.')
def stop_ple_tasks(self)

Trigger the exit flags to stop running scans and PLE measurements.

Source code
def stop_ple_tasks(self):
    """Trigger the exit flags to stop running scans and PLE measurements."""
    self.ple_exit_flag = True
    if ccd:
        ccd.exit_flag = True