Module matisse_controller.gui.threads.status_update_thread
Source code
import time
from queue import Queue
from PyQt5.QtCore import QThread, pyqtSignal
import matisse_controller.config as cfg
import matisse_controller.matisse as matisse
from matisse_controller.gui.threads import ExitFlag
from matisse_controller.gui.utils import red_text, orange_text, green_text
class StatusUpdateThread(QThread):
"""
A QThread that periodically reads several pieces of state data and emits all of it in one HTML-formatted string.
The interval between successive updates is specified by a configuration option.
Some messages are colored, like for components that are at or nearing their limits.
Note: Any Qt slots implemented in this class will be executed in the creating thread for instances of this class.
"""
status_read = pyqtSignal(str)
def __init__(self, matisse, messages: Queue, *args, **kwargs):
"""
Parameters
----------
matisse : matisse_controller.matisse.matisse.Matisse
messages
a message queue
*args
args to pass to `QThread.__init__`
**kwargs
kwargs to pass to `QThread.__init__`
"""
super().__init__(*args, **kwargs)
self.matisse = matisse
self.messages = messages
def run(self):
while True:
if self.messages.qsize() == 0:
try:
bifi_pos = self.matisse.query('MOTBI:POS?', numeric_result=True)
thin_eta_pos = self.matisse.query('MOTTE:POS?', numeric_result=True)
refcell_pos, pz_eta_pos, slow_pz_pos = self.matisse.get_stabilizing_piezo_positions()
is_stabilizing = self.matisse.is_stabilizing()
is_scanning = self.matisse.is_scanning()
is_locked = self.matisse.laser_locked()
wavemeter_value = self.matisse.wavemeter_raw_value()
bifi_pos_text = f"BiFi:{bifi_pos}"
thin_eta_pos_text = f"Thin Eta:{thin_eta_pos}"
pz_eta_pos_text = f"Pz Eta:{pz_eta_pos:.3f}"
slow_pz_pos_text = f"Slow Pz:{slow_pz_pos:.3f}"
refcell_pos_text = f"RefCell:{refcell_pos:.3f}"
stabilizing_text = f"Stabilize:{green_text('ON') if is_stabilizing else red_text('OFF')}"
scanning_text = f"Scanning:{green_text('ON') if is_scanning else red_text('OFF')}"
locked_text = f"{green_text('LOCKED') if is_locked else red_text('NO LOCK')}"
wavemeter_text = f"Wavemeter:{wavemeter_value}"
limit_offset = cfg.get(cfg.COMPONENT_LIMIT_OFFSET)
refcell_at_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + limit_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - limit_offset
slow_pz_at_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + limit_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - limit_offset
pz_eta_at_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + limit_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - limit_offset
warn_offset = limit_offset * 2
refcell_near_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + warn_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - warn_offset
slow_pz_near_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + warn_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - warn_offset
pz_eta_near_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + warn_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - warn_offset
if refcell_at_limit:
refcell_pos_text = red_text(refcell_pos_text)
elif refcell_near_limit:
refcell_pos_text = orange_text(refcell_pos_text)
if slow_pz_at_limit:
slow_pz_pos_text = red_text(slow_pz_pos_text)
elif slow_pz_near_limit:
slow_pz_pos_text = orange_text(slow_pz_pos_text)
if pz_eta_at_limit:
pz_eta_pos_text = red_text(pz_eta_pos_text)
elif pz_eta_near_limit:
pz_eta_pos_text = orange_text(pz_eta_pos_text)
status = f"{bifi_pos_text} | {thin_eta_pos_text} | {pz_eta_pos_text} | {slow_pz_pos_text} | {refcell_pos_text} | {stabilizing_text} | {scanning_text} | {locked_text} | {wavemeter_text}"
except Exception:
status = red_text('Error reading system status. Please restart if this issue persists.')
self.status_read.emit(status)
time.sleep(cfg.get(cfg.STATUS_MONITOR_DELAY))
else:
break
def stop(self):
self.messages.put(ExitFlag())
self.wait()
Classes
class StatusUpdateThread (matisse, messages, *args, **kwargs)
-
A QThread that periodically reads several pieces of state data and emits all of it in one HTML-formatted string. The interval between successive updates is specified by a configuration option.
Some messages are colored, like for components that are at or nearing their limits.
Note: Any Qt slots implemented in this class will be executed in the creating thread for instances of this class.
Parameters
matisse
:Matisse
messages
- a message queue
*args
- args to pass to
QThread.__init__
**kwargs
- kwargs to pass to
QThread.__init__
Source code
class StatusUpdateThread(QThread): """ A QThread that periodically reads several pieces of state data and emits all of it in one HTML-formatted string. The interval between successive updates is specified by a configuration option. Some messages are colored, like for components that are at or nearing their limits. Note: Any Qt slots implemented in this class will be executed in the creating thread for instances of this class. """ status_read = pyqtSignal(str) def __init__(self, matisse, messages: Queue, *args, **kwargs): """ Parameters ---------- matisse : matisse_controller.matisse.matisse.Matisse messages a message queue *args args to pass to `QThread.__init__` **kwargs kwargs to pass to `QThread.__init__` """ super().__init__(*args, **kwargs) self.matisse = matisse self.messages = messages def run(self): while True: if self.messages.qsize() == 0: try: bifi_pos = self.matisse.query('MOTBI:POS?', numeric_result=True) thin_eta_pos = self.matisse.query('MOTTE:POS?', numeric_result=True) refcell_pos, pz_eta_pos, slow_pz_pos = self.matisse.get_stabilizing_piezo_positions() is_stabilizing = self.matisse.is_stabilizing() is_scanning = self.matisse.is_scanning() is_locked = self.matisse.laser_locked() wavemeter_value = self.matisse.wavemeter_raw_value() bifi_pos_text = f"BiFi:{bifi_pos}" thin_eta_pos_text = f"Thin Eta:{thin_eta_pos}" pz_eta_pos_text = f"Pz Eta:{pz_eta_pos:.3f}" slow_pz_pos_text = f"Slow Pz:{slow_pz_pos:.3f}" refcell_pos_text = f"RefCell:{refcell_pos:.3f}" stabilizing_text = f"Stabilize:{green_text('ON') if is_stabilizing else red_text('OFF')}" scanning_text = f"Scanning:{green_text('ON') if is_scanning else red_text('OFF')}" locked_text = f"{green_text('LOCKED') if is_locked else red_text('NO LOCK')}" wavemeter_text = f"Wavemeter:{wavemeter_value}" limit_offset = cfg.get(cfg.COMPONENT_LIMIT_OFFSET) refcell_at_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + limit_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - limit_offset slow_pz_at_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + limit_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - limit_offset pz_eta_at_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + limit_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - limit_offset warn_offset = limit_offset * 2 refcell_near_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + warn_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - warn_offset slow_pz_near_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + warn_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - warn_offset pz_eta_near_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + warn_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - warn_offset if refcell_at_limit: refcell_pos_text = red_text(refcell_pos_text) elif refcell_near_limit: refcell_pos_text = orange_text(refcell_pos_text) if slow_pz_at_limit: slow_pz_pos_text = red_text(slow_pz_pos_text) elif slow_pz_near_limit: slow_pz_pos_text = orange_text(slow_pz_pos_text) if pz_eta_at_limit: pz_eta_pos_text = red_text(pz_eta_pos_text) elif pz_eta_near_limit: pz_eta_pos_text = orange_text(pz_eta_pos_text) status = f"{bifi_pos_text} | {thin_eta_pos_text} | {pz_eta_pos_text} | {slow_pz_pos_text} | {refcell_pos_text} | {stabilizing_text} | {scanning_text} | {locked_text} | {wavemeter_text}" except Exception: status = red_text('Error reading system status. Please restart if this issue persists.') self.status_read.emit(status) time.sleep(cfg.get(cfg.STATUS_MONITOR_DELAY)) else: break def stop(self): self.messages.put(ExitFlag()) self.wait()
Ancestors
- PyQt5.QtCore.QThread
- PyQt5.QtCore.QObject
- sip.wrapper
- sip.simplewrapper
Methods
def run(self)
-
run(self)
Source code
def run(self): while True: if self.messages.qsize() == 0: try: bifi_pos = self.matisse.query('MOTBI:POS?', numeric_result=True) thin_eta_pos = self.matisse.query('MOTTE:POS?', numeric_result=True) refcell_pos, pz_eta_pos, slow_pz_pos = self.matisse.get_stabilizing_piezo_positions() is_stabilizing = self.matisse.is_stabilizing() is_scanning = self.matisse.is_scanning() is_locked = self.matisse.laser_locked() wavemeter_value = self.matisse.wavemeter_raw_value() bifi_pos_text = f"BiFi:{bifi_pos}" thin_eta_pos_text = f"Thin Eta:{thin_eta_pos}" pz_eta_pos_text = f"Pz Eta:{pz_eta_pos:.3f}" slow_pz_pos_text = f"Slow Pz:{slow_pz_pos:.3f}" refcell_pos_text = f"RefCell:{refcell_pos:.3f}" stabilizing_text = f"Stabilize:{green_text('ON') if is_stabilizing else red_text('OFF')}" scanning_text = f"Scanning:{green_text('ON') if is_scanning else red_text('OFF')}" locked_text = f"{green_text('LOCKED') if is_locked else red_text('NO LOCK')}" wavemeter_text = f"Wavemeter:{wavemeter_value}" limit_offset = cfg.get(cfg.COMPONENT_LIMIT_OFFSET) refcell_at_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + limit_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - limit_offset slow_pz_at_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + limit_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - limit_offset pz_eta_at_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + limit_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - limit_offset warn_offset = limit_offset * 2 refcell_near_limit = not matisse.REFERENCE_CELL_LOWER_LIMIT + warn_offset < refcell_pos < matisse.REFERENCE_CELL_UPPER_LIMIT - warn_offset slow_pz_near_limit = not matisse.SLOW_PIEZO_LOWER_LIMIT + warn_offset < slow_pz_pos < matisse.SLOW_PIEZO_UPPER_LIMIT - warn_offset pz_eta_near_limit = not matisse.PIEZO_ETALON_LOWER_LIMIT + warn_offset < pz_eta_pos < matisse.PIEZO_ETALON_UPPER_LIMIT - warn_offset if refcell_at_limit: refcell_pos_text = red_text(refcell_pos_text) elif refcell_near_limit: refcell_pos_text = orange_text(refcell_pos_text) if slow_pz_at_limit: slow_pz_pos_text = red_text(slow_pz_pos_text) elif slow_pz_near_limit: slow_pz_pos_text = orange_text(slow_pz_pos_text) if pz_eta_at_limit: pz_eta_pos_text = red_text(pz_eta_pos_text) elif pz_eta_near_limit: pz_eta_pos_text = orange_text(pz_eta_pos_text) status = f"{bifi_pos_text} | {thin_eta_pos_text} | {pz_eta_pos_text} | {slow_pz_pos_text} | {refcell_pos_text} | {stabilizing_text} | {scanning_text} | {locked_text} | {wavemeter_text}" except Exception: status = red_text('Error reading system status. Please restart if this issue persists.') self.status_read.emit(status) time.sleep(cfg.get(cfg.STATUS_MONITOR_DELAY)) else: break
def status_read(...)
def stop(self)
-
Source code
def stop(self): self.messages.put(ExitFlag()) self.wait()