From 88d921d238aea2e4c9faacb2999c58634899f659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Weber?= Date: Mon, 4 May 2026 18:02:00 +0200 Subject: [PATCH 1/2] implementing a base class and async live mode --- README.rst | 3 +- pyproject.toml | 6 +- .../plugins_2D/daq_2Dviewer_Thorlabs_DCx.py | 174 -------- .../plugins_2D/daq_2Dviewer_Thorlabs_TSI.py | 359 ++-------------- .../plugins_2D/daq_2Dviewer_UC480.py | 55 +++ .../hardware/camera_base.py | 392 ++++++++++++++++++ 6 files changed, 480 insertions(+), 509 deletions(-) delete mode 100644 src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_DCx.py create mode 100644 src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py create mode 100644 src/pymodaq_plugins_thorlabs/hardware/camera_base.py diff --git a/README.rst b/README.rst index 6c4e153..95ecd8c 100644 --- a/README.rst +++ b/README.rst @@ -59,8 +59,9 @@ Viewer1D Viewer2D ++++++++ -* **Thorlabs_DCx**: Thorlabs CCD camera. Tested with DCC3240M. +* **Thorlabs_DCx**: Thorlabs CCD camera. Tested with DCC3240M. (Deprecated use UC480 below) * **Thorlabs_TSI**: sCMOS camera series Zelux, Kiralux, Quantalux. +* **UC480**: Interface for simple camera of the thorlabs uc480 series or IDS µeye Installation instructions ========================= diff --git a/pyproject.toml b/pyproject.toml index f968a09..828cb5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,10 +15,8 @@ description = 'Set of PyMoDAQ plugins for instruments from Thorlabs (Kinesis K10 dependencies = [ "pythonnet", "pywin32", - "nicelib", - "instrumental-lib", - "pylablib==1.4.1", - 'pymodaq>=5.0.0', + "pylablib", + 'pymodaq>=5.1.10', 'opencv-python', 'elliptec', 'pymodaq_utils', diff --git a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_DCx.py b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_DCx.py deleted file mode 100644 index f960614..0000000 --- a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_DCx.py +++ /dev/null @@ -1,174 +0,0 @@ -from easydict import EasyDict as edict -from pymodaq.utils.daq_utils import getLineInfo - -from pymodaq_utils.logger import set_logger, get_module_name -from pymodaq_utils.utils import ThreadCommand -from pymodaq_gui.parameter import Parameter - -from pymodaq.utils.data import DataFromPlugins, Axis -from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base, comon_parameters, main -from instrumental import instrument, list_instruments, Q_ - -# This is a (probably bad) way of importing the stuff needed to get exposure range -import instrumental.drivers.cameras.uc480 as uc480module - - -class DAQ_2DViewer_Thorlabs_DCx(DAQ_Viewer_base): - """This plugin is intended for Thorlabs DCx cameras series. - It should not be compatible with Thorlabs scientific cameras. - - This plugin use the instrumental library: - https://instrumental-lib.readthedocs.io/en/stable/ - - The class we use is defined here: - https://github.com/mabuchilab/Instrumental/blob/master/instrumental/drivers/ - cameras/uc480.py - - Prerequisite - ------------ - This plugin works with Windows 10. - Installation procedure can be found here: - https://instrumental-lib.readthedocs.io/en/stable/uc480-cameras.html - In particular, the ThorCam software should be installed and the .dll libraries - folder (where you can find uc480_64.dll) should be added in the environment - PATH variable. - In principle the dependencies (pywin32, nicelib) should be installed - automatically while installing the plugin. - """ - - # Look for plugged cameras and get the serial numbers - plugged_cameras = list_instruments(module='cameras.uc480') - serial_numbers = [] - for paramset in plugged_cameras: - camera = instrument(paramset, reopen_policy='reuse') - serial_numbers.append(camera.serial.decode("utf-8")) - - params = comon_parameters + [ - {'title': 'Serial number:', 'name': 'serial_number', 'type': 'list', 'limits': serial_numbers}, - {'title': 'Exposure (ms):', 'name': 'exposure', 'type': 'float', 'value': 0}, - {'title': 'Gain:', 'name': 'master_gain', 'type': 'int', 'value': 0, "limits": [0, 100]}, - {'title': 'Gain Boost:', 'name': 'gain_boost', 'type': 'bool', 'value': False}, - {'title': 'gamma', 'name': 'gamma', 'type': 'int', 'value': 0}, - {'title': 'Color Mode', 'name': 'colormode', 'type': 'str', 'value': 'mono8', "readonly": True}, - ] - - def __init__(self, parent=None, params_state=None): - super().__init__(parent, params_state) - - self.x_axis = None - self.y_axis = None - - self.controller = None - - def commit_settings(self, param): - """ - """ - if param.name() == 'exposure': - # For some reason exposure is dealt specially in the instrumental lib - self.controller._set_exposure(Q_(param.value(), 'ms')) - self.settings.child('exposure').setValue(self.controller._get_exposure().m_as('ms')) - elif param.name() in ['master_gain', 'gain_boost', 'gamma']: - # All settings without units can be dealt as a single case - setattr(self.controller, param.name(), param.value()) - self.settings.child(param.name()).setValue(getattr(self.controller, param.name())) - - def ini_detector(self, controller=None): - """Detector communication initialization - Parameters - ---------- - controller: (object) custom object of a PyMoDAQ plugin (Slave case). - None if only one detector by controller (Master case) - Returns - ------- - self.status (edict): with initialization status: three fields: - * info (str) - * controller (object) initialized controller - * initialized: (bool): False if initialization failed otherwise True - """ - - try: - self.status.update(edict(initialized=False, info="", x_axis=None, - y_axis=None, controller=None)) - if not self.is_master: - if controller is None: - raise Exception('no controller has been defined externally while' - 'this detector is a slave one') - else: - self.controller = controller - else: - camera_serial = self.settings.child('serial_number').value() - plugged_cameras = list_instruments(module='cameras.uc480') - selected_camera = None - # Find the paramset that has the selected serial number - for paramset in plugged_cameras: - camera = instrument(paramset, reopen_policy='reuse') - if camera.serial.decode("utf-8") == camera_serial: - selected_camera = camera - - self.controller = selected_camera - - # Getting the current settings from the instrument. - # Exposure is weird - self.settings.child('exposure').setValue(self.controller._get_exposure().m_as('ms')) - for paraname in ['master_gain', 'gain_boost', 'gamma']: - paramvalue = getattr(self.controller, paraname) - self.settings.child(paraname).setValue(paramvalue) - - # Getting the range of exposure possible. I think it changes with other settings of the camera so it's - # probably not ideal to set it like this. The _dev calls are needed because instrumental does not - # natively exposes these parameters like it does e.g. for _get_exposure() - - rangemin = self.controller._dev.Exposure(uc480module.lib.IS_EXPOSURE_CMD_GET_EXPOSURE_RANGE_MIN) - rangemax = self.controller._dev.Exposure(uc480module.lib.IS_EXPOSURE_CMD_GET_EXPOSURE_RANGE_MAX) - self.settings.child('exposure').setOpts(limits=[rangemin, rangemax]) - - self.status.info = "Detector initialized" - self.status.initialized = True - self.status.controller = self.controller - return self.status - - except Exception as e: - self.emit_status( - ThreadCommand('Update_Status', [getLineInfo() + str(e), 'log'])) - self.status.info = getLineInfo() + str(e) - self.status.initialized = False - return self.status - - def close(self): - """ - Terminate the communication protocol - """ - self.controller.close() - - def grab_data(self, Naverage=1, **kwargs): - """ - Parameters - ---------- - Naverage: (int) Number of hardware averaging - kwargs: (dict) of others optionals arguments - """ - #The instrumental library seems really broken AF unfortunately so I have to use this to acquire a frame otherwise - #it just uses the default values... - kwds = {'exposure_time': Q_(self.settings.child('exposure').value(), 'ms'), - 'gain': self.settings.child('master_gain').value()} - - data = self.controller.grab_image(**kwds) - - if len(data.shape) > 2: - data_list = [data[..., ind] for ind in range(data.shape[2])] - else: - data_list = [data] - - # data = self.controller.grab_image(exposure_time=Q_(self.settings.child('exposure').value(), 'ms')) - self.data_grabed_signal.emit([DataFromPlugins(name='Thorcam', data=data_list, - dim='Data2D')]) - - def stop(self): - - self.controller.stop_live_video() - # self.emit_status(ThreadCommand('Update_Status', ['Some info you want to log'])) - return '' - - -if __name__ == '__main__': - main(__file__, init=False) \ No newline at end of file diff --git a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py index 6c10ebd..60747e9 100644 --- a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py +++ b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py @@ -1,14 +1,10 @@ -import cv2 +from pymodaq.control_modules.viewer_utility_classes import comon_parameters, main -from pymodaq_utils.logger import set_logger, get_module_name -from pymodaq_utils.utils import ThreadCommand -from pymodaq_gui.parameter import Parameter +from pylablib.devices import Thorlabs -from pymodaq.utils.data import DataFromPlugins, Axis -from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base, comon_parameters, main +from pymodaq_plugins_thorlabs.hardware.camera_base import CameraBase, cam_params -from pylablib.devices import Thorlabs """ note: If the package is not working, this may be due to the use by pylablib of the ftd2xx.dll from ftdi (converting usb to COM port) through the kind of deprecated https://github.com/ftd2xx/ftd2xx package. Anyway, this one cannot (in some @@ -18,17 +14,24 @@ That should solve this particular issue encountered on some win10 computers """ -from qtpy import QtWidgets, QtCore -import numpy as np -from time import perf_counter - -class DAQ_2DViewer_Thorlabs_TSI(DAQ_Viewer_base): +class DAQ_2DViewer_Thorlabs_TSI(CameraBase): """ - Plugin for Thorlabs scientific sCMOS cameras such as Kiralux or Zelux. It has been tested with Thorlabs Zelux camera on Windows. - Building on pylablib driver, information about it can be found here : https://pylablib.readthedocs.io/en/stable/devices/Thorlabs_TLCamera.html + Plugin for either Thorlabs cameras uc480type or IDS µeye. + + This is the interface used in multiple cameras, including many simple Thorlabs and IDS cameras. It has been tested with IDS SC2592R12M and Thorlabs DCC1545M. - As in pylablib, the plugin will look for DLLs in the default Thorcam installation folder. Specifying a custom DLL folder is not implemented yet. + Essentially identical interface is available under two different implementations: + either as Thorlabs uc480 or as IDS uEye. Both of these seem to cover exactly the same cameras, + both are freely available from the manufacturers, and both implement exactly the same functionality. + However, these interfaces are not interchangeable, and each camera will only interact with one of + them depending on which driver it happens to use (usually based on which of the software packages + was installed last). Hence, if you have both ThorCam and IDS Software Suite installed, you would + need to check both interfaces. Normally, the interface should correspond to the software which can + connect to the camera (either ThorCam or uEye Cockpit). + + Building on pylablib driver, information about it can be found here: + https://pylablib.readthedocs.io/en/latest/devices/uc480.html#cameras-uc480 The plugin provides binning functionality as well as ROI (region of interest) selection, which are on handled the hardware side. To use ROIs, click on "Show/Hide ROI selection area" in the viewer panel (icon with dashed rectangle). @@ -36,328 +39,24 @@ class DAQ_2DViewer_Thorlabs_TSI(DAQ_Viewer_base): The "Clear ROI+Bin" button resets to default cameras parameters: no binning and full frame. """ - - serialnumbers = Thorlabs.list_cameras_tlcam() - - params = comon_parameters + [ - {'title': 'Camera name:', 'name': 'camera_name', 'type': 'str', 'value': '', 'readonly': True}, - {'title': 'Serial number:', 'name': 'serial_number', 'type': 'list', 'limits': serialnumbers}, - #{'title': 'Sensor type:', 'name': 'sensor', 'type': 'str', 'value': '', 'readonly': True}, - #this will be used once pylablib accepts PR52 - {'title': 'Sensor type:', 'name': 'sensor', 'type': 'list', 'limits': ['Monochrome', 'Bayer']}, - {'title': 'Ouput Color:', 'name': 'output_color', 'type': 'list', 'limits': ['RGB', 'MonoChrome']}, - {'title': 'Update ROI', 'name': 'update_roi', 'type': 'bool_push', 'value': False}, - {'title': 'Clear ROI+Bin', 'name': 'clear_roi', 'type': 'bool_push', 'value': False}, - {'title': 'X binning', 'name': 'x_binning', 'type': 'int', 'value': 1}, - {'title': 'Y binning', 'name': 'y_binning', 'type': 'int', 'value': 1}, - {'title': 'Image width', 'name': 'hdet', 'type': 'int', 'value': 1, 'readonly': True}, - {'title': 'Image height', 'name': 'vdet', 'type': 'int', 'value': 1, 'readonly': True}, - {'title': 'Timing', 'name': 'timing_opts', 'type': 'group', 'children': - [{'title': 'Exposure Time (ms)', 'name': 'exposure_time', 'type': 'int', 'value': 1}, - {'title': 'Compute FPS', 'name': 'fps_on', 'type': 'bool', 'value': True}, - {'title': 'FPS', 'name': 'fps', 'type': 'float', 'value': 0.0, 'readonly': True}] - } - ] - - callback_signal = QtCore.Signal() + serial_numbers = Thorlabs.list_cameras_tlcam() + serial_params = [{'title': 'Serial number:', 'name': 'serial_number', 'type': 'list', 'limits': serial_numbers}] + params = comon_parameters + serial_params + cam_params def ini_attributes(self): + super().ini_attributes() self.controller: Thorlabs.ThorlabsTLCamera = None - self.x_axis = None - self.y_axis = None - self.last_tick = 0.0 # time counter used to compute FPS - self.fps = 0.0 - - self.data_shape: str = '' - self.callback_thread = None - - # Disable "use ROI" option to avoid confusion with other buttons - #self.settings.child('ROIselect', 'use_ROI').setOpts(visible=False) - - def commit_settings(self, param: Parameter): - """Apply the consequences of a change of value in the detector settings - - Parameters - ---------- - param: Parameter - A given parameter (within detector_settings) whose value has been changed by the user - """ - if param.name() == "exposure_time": - self.controller.set_exposure(param.value()/1000) - - if param.name() == "fps_on": - self.settings.child('timing_opts', 'fps').setOpts(visible=param.value()) - - if param.name() == "update_roi": - if param.value(): # Switching on ROI - - # We handle ROI and binning separately for clarity - (old_x, _, old_y, _, xbin, ybin) = self.controller.get_roi() # Get current binning - - # Values need to be rescaled by binning factor and shifted by current x0,y0 to be correct. - new_x = (old_x + self.settings.child('ROIselect', 'x0').value())*xbin - new_y = (old_y + self.settings.child('ROIselect', 'y0').value())*xbin - new_width = self.settings.child('ROIselect', 'width').value()*ybin - new_height = self.settings.child('ROIselect', 'height').value()*ybin - - new_roi = (new_x, new_width, xbin, new_y, new_height, ybin) - self.update_rois(new_roi) - # recenter rectangle - self.settings.child('ROIselect', 'x0').setValue(0) - self.settings.child('ROIselect', 'y0').setValue(0) - param.setValue(False) - - if param.name() in ['x_binning', 'y_binning']: - # We handle ROI and binning separately for clarity - (x0, w, y0, h, *_) = self.controller.get_roi() # Get current ROI - xbin = self.settings.child('x_binning').value() - ybin = self.settings.child('y_binning').value() - new_roi = (x0, w, xbin, y0, h, ybin) - self.update_rois(new_roi) - - if param.name() == "clear_roi": - if param.value(): # Switching on ROI - wdet, hdet = self.controller.get_detector_size() - # self.settings.child('ROIselect', 'x0').setValue(0) - # self.settings.child('ROIselect', 'width').setValue(wdet) - self.settings.child('x_binning').setValue(1) - # - # self.settings.child('ROIselect', 'y0').setValue(0) - # new_height = self.settings.child('ROIselect', 'height').setValue(hdet) - self.settings.child('y_binning').setValue(1) - - new_roi = (0, wdet, 1, 0, hdet, 1) - self.update_rois(new_roi) - param.setValue(False) - - def ini_detector(self, controller=None): - """Detector communication initialization - - Parameters - ---------- - controller: (object) - custom object of a PyMoDAQ plugin (Slave case). None if only one actuator/detector by controller - (Master case) - - Returns - ------- - info: str - initialized: bool - False if initialization failed otherwise True - """ + def ini_detector_custom(self, controller=None): # Initialize camera class if not self.settings.child('serial_number').value() == '': - self.ini_detector_init(old_controller=controller, - new_controller=Thorlabs.ThorlabsTLCamera(self.settings.child('serial_number').value())) - else: - raise Exception('No compatible Thorlabs scientific camera was found.') - - device_info = self.controller.get_device_info() - - # Get camera name - self.settings.child('camera_name').setValue(device_info.name) - - # this will be used once pylablib accepts PR52 - # # Get Sensor Type - # self.settings.child('sensor').setValue(device_info.sensor_type) - - if 'monochrome' in self.settings['sensor'].lower(): - self.settings.child('output_color').setValue('MonoChrome') - self.settings.child('output_color').setOpts(visible=False) - - # Set exposure time - self.controller.set_exposure(self.settings.child('timing_opts', 'exposure_time').value()/1000) - - # FPS visibility - self.settings.child('timing_opts', 'fps').setOpts(visible=self.settings.child('timing_opts', 'fps_on').value()) - - # Update image parameters - (*_, hbin, vbin) = self.controller.get_roi() - height, width = self.controller.get_data_dimensions() - self.settings.child('x_binning').setValue(hbin) - self.settings.child('y_binning').setValue(vbin) - self.settings.child('hdet').setValue(width) - self.settings.child('vdet').setValue(height) - - # Way to define a wait function with arguments - wait_func = lambda: self.controller.wait_for_frame(since='lastread', nframes=1, timeout=20.0) - callback = ThorlabsCallback(wait_func) - - self.callback_thread = QtCore.QThread() # creation of a Qt5 thread - callback.moveToThread(self.callback_thread) # callback object will live within this thread - callback.data_sig.connect( - self.emit_data) # when the wait for acquisition returns (with data taken), emit_data will be fired - - self.callback_signal.connect(callback.wait_for_acquisition) - self.callback_thread.callback = callback - self.callback_thread.start() - - self._prepare_view() - - info = "Initialized camera" - initialized = True - return info, initialized - - def _prepare_view(self): - """Preparing a data viewer by emitting temporary data. Typically, needs to be called whenever the - ROIs are changed""" - # wx = self.settings.child('rois', 'width').value() - # wy = self.settings.child('rois', 'height').value() - # bx = self.settings.child('rois', 'x_binning').value() - # by = self.settings.child('rois', 'y_binning').value() - # - # sizex = wx // bx - # sizey = wy // by - height, width = self.controller.get_data_dimensions() - - self.settings.child('hdet').setValue(width) - self.settings.child('vdet').setValue(height) - mock_data = np.zeros((height, width)) - - if width != 1 and height != 1: - data_shape = 'Data2D' - else: - data_shape = 'Data1D' - - if data_shape != self.data_shape: - self.data_shape = data_shape - # init the viewers - self.data_grabed_signal_temp.emit([DataFromPlugins(name='Thorlabs Camera', - data=[np.squeeze(mock_data)], - dim=self.data_shape, - labels=[f'ThorCam_{self.data_shape}'])]) - QtWidgets.QApplication.processEvents() - - def update_rois(self, new_roi): - # In pylablib, ROIs compare as tuples - (new_x, new_width, new_xbinning, new_y, new_height, new_ybinning) = new_roi - if new_roi != self.controller.get_roi(): - # self.controller.set_attribute_value("ROIs",[new_roi]) - self.controller.set_roi(hstart=new_x, hend=new_x + new_width, vstart=new_y, vend=new_y + new_height, - hbin=new_xbinning, vbin=new_ybinning) - self.emit_status(ThreadCommand('Update_Status', [f'Changed ROI: {new_roi}'])) - self.controller.clear_acquisition() - self.controller.setup_acquisition() - # Finally, prepare view for displaying the new data - self._prepare_view() - - def grab_data(self, Naverage=1, **kwargs): - """ - Grabs the data. Synchronous method (kinda). - ---------- - Naverage: (int) Number of averaging - kwargs: (dict) of others optionals arguments - """ - try: - # Warning, acquisition_in_progress returns 1,0 and not a real bool - if not self.controller.acquisition_in_progress(): - self.controller.clear_acquisition() - self.controller.start_acquisition(nframes=10) - #Then start the acquisition - self.callback_signal.emit() # will trigger the wait for acquisition - - except Exception as e: - self.emit_status(ThreadCommand('Update_Status', [str(e), "log"])) - - def emit_data(self): - """ Function used to emit data obtained by callback. - - Parameter - --------- - status: bool - If True a frame is available, If False, a Timeout occured while waiting for the frame - - See Also - -------- - daq_utils.ThreadCommand - """ - try: - # Get data from buffer - frame = self.controller.read_newest_image() - # Emit the frame. - if frame is not None: # happens for last frame when stopping camera - if self.settings['output_color'] == 'RGB': - rgb_image = cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2RGB) - self.data_grabed_signal.emit([DataFromPlugins(name='Thorlabs Camera', - data=[np.squeeze(rgb_image[..., ind]) for ind in - range(3)], - dim=self.data_shape, - labels=[f'ThorCam_{self.data_shape}'])]) - else: - if 'monochrome' in self.settings['sensor'].lower(): - self.data_grabed_signal.emit([DataFromPlugins(name='Thorlabs Camera', - data=[np.squeeze(frame)], - dim=self.data_shape, - labels=[f'ThorCam_{self.data_shape}'])]) - else: - grey_image = cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2GRAY) - self.data_grabed_signal.emit([DataFromPlugins(name='Thorlabs Camera', - data=[np.squeeze(grey_image)], - dim=self.data_shape, - labels=[f'ThorCam_{self.data_shape}'])]) - - if self.settings.child('timing_opts', 'fps_on').value(): - self.update_fps() - - # To make sure that timed events are executed in continuous grab mode - QtWidgets.QApplication.processEvents() - - except Exception as e: - self.emit_status(ThreadCommand('Update_Status', [str(e), 'log'])) - - def update_fps(self): - current_tick = perf_counter() - frame_time = current_tick-self.last_tick - - if self.last_tick != 0.0 and frame_time != 0.0: - # We don't update FPS for the first frame, and we also avoid divisions by zero - - if self.fps == 0.0: - self.fps = 1 / frame_time + if self.is_master: + self.controller = Thorlabs.ThorlabsTLCamera(self.settings.child('serial_number').value()) else: - # If we already have an FPS calculated, we smooth its evolution - self.fps = 0.9 * self.fps + 0.1 / frame_time - - self.last_tick = current_tick - - # Update reading - self.settings.child('timing_opts', 'fps').setValue(round(self.fps, 1)) - - def close(self): - """ - Terminate the communication protocol - """ - # Terminate the communication - self.controller.close() - self.controller = None # Garbage collect the controller - self.status.initialized = False - self.status.controller = None - self.status.info = "" - - def stop(self): - """Stop the acquisition.""" - self.controller.clear_acquisition() - return '' - - -class ThorlabsCallback(QtCore.QObject): - """Callback object """ - data_sig = QtCore.Signal() - - def __init__(self, wait_fn): - super().__init__() - # Set the wait function - self.wait_fn = wait_fn - - def wait_for_acquisition(self): - try: - new_data = self.wait_fn() - if new_data is not False: # will be returned if the main thread called CancelWait - self.data_sig.emit() - except Thorlabs.ThorlabsTimeoutError: - pass + self.controller = controller + else: + raise Exception('No compatible Thorlabs TSI camera was found.') if __name__ == '__main__': - main(__file__, init=True) + main(__file__, init=False) diff --git a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py new file mode 100644 index 0000000..eacfa2e --- /dev/null +++ b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py @@ -0,0 +1,55 @@ + +from pymodaq.control_modules.viewer_utility_classes import comon_parameters, main + +from pylablib.devices import uc480 + +from pymodaq_plugins_thorlabs.hardware.camera_base import CameraBase, cam_params + + +class DAQ_2DViewer_UC480(CameraBase): + """ + Plugin for either Thorlabs cameras uc480type or IDS µeye. + + This is the interface used in multiple cameras, including many simple Thorlabs and IDS cameras. It has been tested with IDS SC2592R12M and Thorlabs DCC1545M. + + Essentially identical interface is available under two different implementations: + either as Thorlabs uc480 or as IDS uEye. Both of these seem to cover exactly the same cameras, + both are freely available from the manufacturers, and both implement exactly the same functionality. + However, these interfaces are not interchangeable, and each camera will only interact with one of + them depending on which driver it happens to use (usually based on which of the software packages + was installed last). Hence, if you have both ThorCam and IDS Software Suite installed, you would + need to check both interfaces. Normally, the interface should correspond to the software which can + connect to the camera (either ThorCam or uEye Cockpit). + + Building on pylablib driver, information about it can be found here: + https://pylablib.readthedocs.io/en/latest/devices/uc480.html#cameras-uc480 + + The plugin provides binning functionality as well as ROI (region of interest) selection, which are on handled the hardware side. + To use ROIs, click on "Show/Hide ROI selection area" in the viewer panel (icon with dashed rectangle). + Position the rectangle as you wish, either with mouse or by entering coordinates, then click "Update ROI" button. + + The "Clear ROI+Bin" button resets to default cameras parameters: no binning and full frame. + """ + serial_numbers = [cam_info.serial_number for cam_info in uc480.list_cameras()] + serial_params = [{'title': 'Serial number:', 'name': 'serial_number', 'type': 'list', 'limits': serial_numbers}] + + params = comon_parameters + serial_params + cam_params + + def ini_attributes(self): + super().ini_attributes() + self.controller: uc480.UC480Camera = None + + def ini_detector_custom(self, controller=None): + # Initialize camera class + if not self.settings.child('serial_number').value() == '': + if self.is_master: + self.controller = uc480.UC480Camera( + dev_id=uc480.UC480Camera.find_by_serial(self.settings.child('serial_number').value())) + else: + self.controller = controller + else: + raise Exception('No compatible Thorlabs UC480 camera was found.') + + +if __name__ == '__main__': + main(__file__, init=False) diff --git a/src/pymodaq_plugins_thorlabs/hardware/camera_base.py b/src/pymodaq_plugins_thorlabs/hardware/camera_base.py new file mode 100644 index 0000000..40ea14a --- /dev/null +++ b/src/pymodaq_plugins_thorlabs/hardware/camera_base.py @@ -0,0 +1,392 @@ +import cv2 +from pymodaq_utils.logger import set_logger, get_module_name +from pymodaq_utils.utils import ThreadCommand +from pymodaq_gui.parameter import Parameter +try: + from pymodaq_gui.plotting.items.roi import RoiInfo # pymodaq > 5.1.x +except ImportError: + from pymodaq_gui.plotting.utils.plot_utils import RoiInfo + +from pymodaq.utils.data import DataFromPlugins, Axis +from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base, comon_parameters, main + +from qtpy import QtWidgets, QtCore +import numpy as np +from time import perf_counter + + +cam_params = [ + {'title': 'Camera name:', 'name': 'camera_name', 'type': 'str', 'value': '', 'readonly': True}, + {'title': 'Sensor type:', 'name': 'sensor', 'type': 'list', 'limits': ['Monochrome', 'Bayer']}, + {'title': 'Ouput Color:', 'name': 'output_color', 'type': 'list', 'limits': ['RGB', 'MonoChrome']}, + {'title': 'ROI', 'name': 'roi', 'type': 'group', 'children': [ + {'title': 'Update ROI from Viewer', 'name': 'update_roi', 'type': 'led', 'value': False}, + {'title': 'Apply ROI', 'name': 'apply_roi', 'type': 'led', 'value': False}, + {'title': 'Clear ROI+Bin', 'name': 'clear_roi', 'type': 'bool_push', 'value': False}, + {'title': 'ROI:', 'name': 'roi_slices', 'type': 'str', 'value': ''}, + {'title': 'X binning', 'name': 'x_binning', 'type': 'int', 'value': 1}, + {'title': 'Y binning', 'name': 'y_binning', 'type': 'int', 'value': 1}, + ], }, + {'title': 'Image width', 'name': 'hdet', 'type': 'int', 'value': 1, 'readonly': True}, + {'title': 'Image height', 'name': 'vdet', 'type': 'int', 'value': 1, 'readonly': True}, + {'title': 'Timing', 'name': 'timing_opts', 'type': 'group', 'children': + [{'title': 'Exposure Time (ms)', 'name': 'exposure_time', 'type': 'int', 'value': 1}, + {'title': 'Compute FPS', 'name': 'fps_on', 'type': 'bool', 'value': True}, + {'title': 'FPS', 'name': 'fps', 'type': 'float', 'value': 0.0, 'readonly': True}] + } +] + + +class CameraBase(DAQ_Viewer_base): + """ + Base implementation for Camera using pylablib framework. Works for TSI and uc480 thorlabs camera + """ + serial_numbers = [] + + serial_params = [{'title': 'Serial number:', 'name': 'serial_number', 'type': 'list', 'limits': serial_numbers}] + + params = comon_parameters + serial_params + cam_params + + callback_signal = QtCore.Signal(bool) + live_mode_available = True + + def ini_attributes(self): + self.controller = None + self.callback_thread: QtCore.QThread = None + + self.x_axis: Axis = None + self.y_axis: Axis = None + + self.roi_select_info: RoiInfo = None + + self.last_tick = 0.0 # time counter used to compute FPS + self.fps = 0.0 + + self.data_shape: str = '' + + + def roi_select(self, roi_info: RoiInfo, ind_viewer: int = 0): + """ Automatically called when a user use the RoiSelect ROi from a 2D viewer""" + self.roi_select_info = roi_info + self.roi_select_viewer_index = ind_viewer + + if self.settings['roi', 'update_roi']: + self.settings['roi', 'roi_slices'] = str(roi_info.to_slices()) + if self.settings['roi', 'apply_roi']: + self.apply_roi() + + def apply_roi(self): + roi_info = RoiInfo.from_slices(eval(self.settings['roi', 'roi_slices'])) + new_roi = (roi_info.origin[1], roi_info.size[1], self.settings['roi', 'x_binning'], + roi_info.origin[0], roi_info.size[0], self.settings['roi', 'y_binning']) + self.update_rois(new_roi) + + def compute_axes(self): + (hstart, hend, vstart, vend, hbin, vbin) = self.controller.get_roi() + slices = [slice(vstart, vend, vbin), slice(hstart, hend, hbin)] + self.settings.child('roi', 'roi_slices').setValue(str(slices)) + roi_info = RoiInfo.from_slices(slices) + + self.x_axis = Axis('x_axis', offset=roi_info.origin[1], + scaling=self.settings['roi', 'x_binning'], + size=int(roi_info.size[1]), + index=1) + self.y_axis = Axis('y_axis', offset=roi_info.origin[0], + scaling=self.settings['roi', 'y_binning'], + size=int(roi_info.size[0]), + index=0) + + def clear_roi(self): + wdet, hdet = self.controller.get_detector_size() + self.settings.child('roi', 'x_binning').setValue(1) + self.settings.child('roi', 'y_binning').setValue(1) + + new_roi = (0, wdet, 1, 0, hdet, 1) + self.update_rois(new_roi) + + def update_rois(self, new_roi): + # In pylablib, ROIs compare as tuples + (new_x, new_width, new_xbinning, new_y, new_height, new_ybinning) = new_roi + if new_roi != self.controller.get_roi(): + # self.controller.set_attribute_value("ROIs",[new_roi]) + self.controller.set_roi(hstart=new_x, hend=new_x + new_width, vstart=new_y, vend=new_y + new_height, + hbin=new_xbinning, vbin=new_ybinning) + self.emit_status(ThreadCommand('Update_Status', [f'Changed ROI: {new_roi}'])) + self.controller.clear_acquisition() + self.controller.setup_acquisition() + # Finally, prepare view for displaying the new data + self._prepare_view() + self.compute_axes() + + def commit_settings(self, param: Parameter): + """Apply the consequences of a change of value in the detector settings + + Parameters + ---------- + param: Parameter + A given parameter (within detector_settings) whose value has been changed by the user + """ + if param.name() == "exposure_time": + self.controller.set_exposure(param.value()/1000) + + if param.name() == "fps_on": + self.settings.child('timing_opts', 'fps').setOpts(visible=param.value()) + + if param.name() == "apply_roi": + if param.value(): # Switching on ROI + self.apply_roi() + else: + self.clear_roi() + + if param.name() in ['x_binning', 'y_binning']: + # We handle ROI and binning separately for clarity + (x0, w, y0, h, *_) = self.controller.get_roi() # Get current ROI + xbin = self.settings['roi', 'x_binning'] + ybin = self.settings['roi', 'y_binning'] + new_roi = (x0, w, xbin, y0, h, ybin) + self.update_rois(new_roi) + + if param.name() == "clear_roi": + if param.value(): # Switching on ROI + self.clear_roi() + param.setValue(False) + + def ini_detector_custom(self, controller=None): + raise NotImplementedError + + def ini_detector(self, controller=None): + """Detector communication initialization + + Parameters + ---------- + controller: (object) + custom object of a PyMoDAQ plugin (Slave case). None if only one actuator/detector by controller + (Master case) + + Returns + ------- + info: str + initialized: bool + False if initialization failed otherwise True + """ + self.ini_detector_custom(controller) + + self.get_device_info() + self.get_set_color() + self.get_set_main_parameters() + self.setup_callback_thread() + + info = "Initialized camera" + initialized = True + return info, initialized + + def get_device_info(self): + + device_info = self.controller.get_device_info() + + # Get camera name/model + if hasattr(device_info, 'name'): + self.settings.child('camera_name').setValue(device_info.name) + elif hasattr(device_info, 'model'): + self.settings.child('camera_name').setValue(device_info.model) + + def get_set_color(self): + if 'monochrome' in self.settings['sensor'].lower(): + self.settings.child('output_color').setValue('MonoChrome') + self.settings.child('output_color').setOpts(visible=False) + + def get_set_main_parameters(self): + # Set exposure time + self.controller.set_exposure(self.settings['timing_opts', 'exposure_time']/1000) + + # FPS visibility + self.settings.child('timing_opts', 'fps').setOpts(visible=self.settings['timing_opts', 'fps_on']) + + # get roi limits + self.controller.get_roi_limits() + + # Update image parameters + (hstart, hend, vstart, vend, hbin, vbin) = self.controller.get_roi() + height, width = self.controller.get_data_dimensions() + self.settings.child('roi', 'x_binning').setValue(hbin) + self.settings.child('roi', 'y_binning').setValue(vbin) + self.settings.child('hdet').setValue(width) + self.settings.child('vdet').setValue(height) + slices = [slice(vstart, vend, vbin), slice(hstart, hend, hbin)] + self.settings.child('roi', 'roi_slices').setValue(str(slices)) + self.compute_axes() + + def setup_callback_thread(self): + # Way to define a wait function with arguments + wait_func = lambda: self.controller.wait_for_frame(since='lastread', nframes=1, timeout=20.0) + callback = ThorlabsCallback(wait_func) + + self.callback_thread = QtCore.QThread() # creation of a Qt5 thread + callback.moveToThread(self.callback_thread) # callback object will live within this thread + callback.data_sig.connect( + self.emit_data) # when the wait for acquisition returns (with data taken), emit_data will be fired + + self.callback_signal.connect(callback.set_do_grab) + self.callback_thread.callback = callback + self.callback_thread.start() + + self._prepare_view() + + + def _prepare_view(self): + """Preparing a data viewer by emitting temporary data. Typically, needs to be called whenever the + ROIs are changed""" + + height, width = self.controller.get_data_dimensions() + + self.settings.child('hdet').setValue(width) + self.settings.child('vdet').setValue(height) + mock_data = np.zeros((height, width)) + + if width != 1 and height != 1: + data_shape = 'Data2D' + else: + data_shape = 'Data1D' + + if data_shape != self.data_shape: + self.data_shape = data_shape + # init the viewers + self.data_grabed_signal_temp.emit([DataFromPlugins(name='Thorlabs Camera', + data=[np.squeeze(mock_data)], + dim=self.data_shape, + labels=[f'ThorCam_{self.data_shape}'])]) + QtWidgets.QApplication.processEvents() + + def grab_data(self, Naverage=1, **kwargs): + """ + Grabs the data. ASynchronous method (kinda). + ---------- + Naverage: (int) Number of averaging + kwargs: (dict) of others optionals arguments + """ + try: + # Warning, acquisition_in_progress returns 1,0 and not a real bool + if not kwargs.get('live', False): + self.emit_data(self.controller.snap()) + else: + if not self.controller.acquisition_in_progress(): + self.controller.clear_acquisition() + self.controller.start_acquisition(nframes=10) + #Then start the acquisition + self.callback_signal.emit(True) # will trigger the wait for acquisition + + except Exception as e: + self.emit_status(ThreadCommand('Update_Status', [str(e), "log"])) + + def emit_data(self, frame: np.ndarray=None): + """ Function used to emit data obtained by callback. + + Parameter + --------- + status: bool + If True a frame is available, If False, a Timeout occurred while waiting for the frame + + See Also + -------- + daq_utils.ThreadCommand + """ + try: + # Get data from buffer + if frame is None: + frame = self.controller.read_newest_image() + # Emit the frame. + if frame is not None: # happens for last frame when stopping camera + if self.settings['output_color'] == 'RGB': + rgb_image = cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2RGB) + data_arrays = [np.atleast_1d(rgb_image[..., ind]) for ind in range(3)] + else: + if 'monochrome' in self.settings['sensor'].lower(): + data_arrays = [np.atleast_1d(frame)] + else: + data_arrays = [np.atleast_1d(cv2.cvtColor(frame, cv2.COLOR_BAYER_BG2GRAY))] + + self.data_grabed_signal.emit([DataFromPlugins(name='Thorlabs Camera', + data=data_arrays, + dim=self.data_shape, + labels=[f'ThorCam_{self.data_shape}'], + axes=[self.x_axis, self.y_axis])]) + if self.settings.child('timing_opts', 'fps_on').value(): + self.update_fps() + + # To make sure that timed events are executed in continuous grab mode + QtWidgets.QApplication.processEvents() + + except Exception as e: + self.emit_status(ThreadCommand('Update_Status', [str(e), 'log'])) + + def update_fps(self): + current_tick = perf_counter() + frame_time = current_tick-self.last_tick + + if self.last_tick != 0.0 and frame_time != 0.0: + # We don't update FPS for the first frame, and we also avoid divisions by zero + + if self.fps == 0.0: + self.fps = 1 / frame_time + else: + # If we already have an FPS calculated, we smooth its evolution + self.fps = 0.9 * self.fps + 0.1 / frame_time + + self.last_tick = current_tick + + # Update reading + self.settings.child('timing_opts', 'fps').setValue(round(self.fps, 1)) + + def close(self): + """ + Terminate the communication protocol + """ + # Terminate the communication + + self.stop() + + self.callback_thread.quit() + self.callback_thread.wait() + + self.controller.close() + self.controller = None # Garbage collect the controller + self.status.initialized = False + self.status.controller = None + self.status.info = "" + + def stop(self): + """Stop the acquisition.""" + self.callback_signal.emit(False) + QtWidgets.QApplication.processEvents() + + self.controller.clear_acquisition() + return '' + + +class ThorlabsCallback(QtCore.QObject): + """Callback object """ + data_sig = QtCore.Signal() + + def __init__(self, wait_fn): + super().__init__() + # Set the wait function + self.wait_fn = wait_fn + self.do_grab = True + + def set_do_grab(self, do_grab=True): + self.do_grab = do_grab + if do_grab: + self.wait_for_acquisition() + + def wait_for_acquisition(self): + while self.do_grab: + try: + new_data = self.wait_fn() + if new_data is not False: # will be returned if the main thread called CancelWait + self.data_sig.emit() + except Exception as e: + pass + QtWidgets.QApplication.processEvents() + + + From accfdf3c335cce03b95b9f3b4cbf4b25c13ecaf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Weber?= Date: Tue, 5 May 2026 09:38:35 +0200 Subject: [PATCH 2/2] changed frame wait mode to 'now' make the plugin more stable --- .../plugins_2D/daq_2Dviewer_Thorlabs_TSI.py | 13 ++-------- .../plugins_2D/daq_2Dviewer_UC480.py | 3 ++- .../hardware/camera_base.py | 25 +++++++++++-------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py index 60747e9..7e90111 100644 --- a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py +++ b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_Thorlabs_TSI.py @@ -17,21 +17,12 @@ class DAQ_2DViewer_Thorlabs_TSI(CameraBase): """ - Plugin for either Thorlabs cameras uc480type or IDS µeye. + Plugin for TSI SCMOS Thorlabs cameras - This is the interface used in multiple cameras, including many simple Thorlabs and IDS cameras. It has been tested with IDS SC2592R12M and Thorlabs DCC1545M. - Essentially identical interface is available under two different implementations: - either as Thorlabs uc480 or as IDS uEye. Both of these seem to cover exactly the same cameras, - both are freely available from the manufacturers, and both implement exactly the same functionality. - However, these interfaces are not interchangeable, and each camera will only interact with one of - them depending on which driver it happens to use (usually based on which of the software packages - was installed last). Hence, if you have both ThorCam and IDS Software Suite installed, you would - need to check both interfaces. Normally, the interface should correspond to the software which can - connect to the camera (either ThorCam or uEye Cockpit). Building on pylablib driver, information about it can be found here: - https://pylablib.readthedocs.io/en/latest/devices/uc480.html#cameras-uc480 + https://pylablib.readthedocs.io/en/latest/devices/ The plugin provides binning functionality as well as ROI (region of interest) selection, which are on handled the hardware side. To use ROIs, click on "Show/Hide ROI selection area" in the viewer panel (icon with dashed rectangle). diff --git a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py index eacfa2e..6a887f9 100644 --- a/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py +++ b/src/pymodaq_plugins_thorlabs/daq_viewer_plugins/plugins_2D/daq_2Dviewer_UC480.py @@ -10,7 +10,8 @@ class DAQ_2DViewer_UC480(CameraBase): """ Plugin for either Thorlabs cameras uc480type or IDS µeye. - This is the interface used in multiple cameras, including many simple Thorlabs and IDS cameras. It has been tested with IDS SC2592R12M and Thorlabs DCC1545M. + This is the interface used in multiple cameras, including many simple Thorlabs and IDS cameras. + It has been tested with IDS SC2592R12M and Thorlabs DCC1545M. Essentially identical interface is available under two different implementations: either as Thorlabs uc480 or as IDS uEye. Both of these seem to cover exactly the same cameras, diff --git a/src/pymodaq_plugins_thorlabs/hardware/camera_base.py b/src/pymodaq_plugins_thorlabs/hardware/camera_base.py index 40ea14a..abb8516 100644 --- a/src/pymodaq_plugins_thorlabs/hardware/camera_base.py +++ b/src/pymodaq_plugins_thorlabs/hardware/camera_base.py @@ -33,7 +33,12 @@ [{'title': 'Exposure Time (ms)', 'name': 'exposure_time', 'type': 'int', 'value': 1}, {'title': 'Compute FPS', 'name': 'fps_on', 'type': 'bool', 'value': True}, {'title': 'FPS', 'name': 'fps', 'type': 'float', 'value': 0.0, 'readonly': True}] - } + }, + {'title': 'Buffer', 'name': 'buffer', 'type': 'group', 'children': [ + {'title': 'Size:', 'name': 'size', 'type': 'int', 'value': 10}, + {'title': 'mode:', 'name': 'mode', 'type': 'list', 'value': 'now', + 'limits': ['now', 'lastread', 'lastwait', 'start']}, + ]}, ] @@ -218,8 +223,11 @@ def get_set_main_parameters(self): def setup_callback_thread(self): # Way to define a wait function with arguments - wait_func = lambda: self.controller.wait_for_frame(since='lastread', nframes=1, timeout=20.0) + wait_func = lambda: self.controller.wait_for_frame(since=self.settings['buffer', 'mode'], + nframes=1, timeout=20.0) callback = ThorlabsCallback(wait_func) + self.settings.child('buffer', 'mode').setReadonly(True) + self.callback_thread = QtCore.QThread() # creation of a Qt5 thread callback.moveToThread(self.callback_thread) # callback object will live within this thread @@ -271,7 +279,7 @@ def grab_data(self, Naverage=1, **kwargs): else: if not self.controller.acquisition_in_progress(): self.controller.clear_acquisition() - self.controller.start_acquisition(nframes=10) + self.controller.start_acquisition(nframes=self.settings['buffer', 'size']) #Then start the acquisition self.callback_signal.emit(True) # will trigger the wait for acquisition @@ -344,15 +352,12 @@ def close(self): # Terminate the communication self.stop() - - self.callback_thread.quit() - self.callback_thread.wait() + if self.callback_thread is not None: + self.callback_thread.quit() + self.callback_thread.wait() self.controller.close() - self.controller = None # Garbage collect the controller - self.status.initialized = False - self.status.controller = None - self.status.info = "" + self.settings.child('buffer', 'mode').setReadonly(False) def stop(self): """Stop the acquisition."""