diff --git a/nottcontrol/camera/scify.py b/nottcontrol/camera/scify.py index 3bf37d1..56a1438 100644 --- a/nottcontrol/camera/scify.py +++ b/nottcontrol/camera/scify.py @@ -159,6 +159,7 @@ def process_frame(self): def load_roi_config(self, config): self.roi_config = [] + i = 1 for roi_widget in self.roi_widgets: try: roi_config = self.load_roi_from_config(config, roi_widget.name) @@ -166,6 +167,7 @@ def load_roi_config(self, config): print(f'Failed to load roi configuration for {roi_widget.name}, using default') roi_config = Roi(i*100, 600, 50,50) roi_widget.setConfig(roi_config) + i += 1 def load_roi_from_config(self, config, adr): roi_string = config['CAMERA'][adr] diff --git a/nottcontrol/config.py b/nottcontrol/config.py index 4404018..4cfe1f2 100644 --- a/nottcontrol/config.py +++ b/nottcontrol/config.py @@ -4,6 +4,7 @@ class Config: def __init__(self, path:str): self._path = path self.config_parser = ConfigParser() + self.config_parser.optionxform = str # Preserve case sensitivity self.config_parser.read(path) def __getitem__(self, key): diff --git a/nottcontrol/lucid/__init__.py b/nottcontrol/lucid/__init__.py new file mode 100644 index 0000000..653f8a8 --- /dev/null +++ b/nottcontrol/lucid/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Dec 17 17:13:16 2025 + +@author: Thomas +""" + +from pathlib import Path +import os +from nottcontrol.config import Config + +_parent = Path(__file__).parent +_config_path_lucid = os.path.join(_parent, "cfg/config.cfg") +config_lucid = Config(_config_path_lucid) diff --git a/nottcontrol/lucid/cfg/config.cfg b/nottcontrol/lucid/cfg/config.cfg new file mode 100644 index 0000000..ea3c9ee --- /dev/null +++ b/nottcontrol/lucid/cfg/config.cfg @@ -0,0 +1,114 @@ +[connection] +im_ip = 10.33.179.61 +pup_ip = 10.33.179.45 + +[stream_im] +StreamAutoNegotiatePacketSize = True +StreamPacketResendEnable = True + +[stream_pup] +StreamAutoNegotiatePacketSize = True +StreamPacketResendEnable = True + +[readout_im] +ExposureAuto = Off +AcquisitionFrameRateEnable = True +AcquisitionFrameRate = 1.4 +ExposureTimeSelector = Common +ExposureTime = 56318.0 +GainAuto = Off +Gain = 12.0 +BlackLevel = 0.0 +GammaEnable = True +Gamma = 1.0 +Width = 200 +Height = 300 +OffsetX = 1800 +OffsetY = 1000 + +[readout_pup] +ExposureAuto = Off +AcquisitionFrameRateEnable = True +AcquisitionFrameRate = 0.5 +ExposureTimeSelector = Common +ExposureTime = 1959860.944 +GainAuto = Off +Gain = 48.0 +BlackLevel = 0.0 +GammaEnable = True +Gamma = 1.0 +Width = 3072 +Height = 2048 +OffsetX = 0 +OffsetY = 0 + +[fit_pup] +# Standard deviation of Gaussian kernel used for image smoothening +sigma = 50 +# Amounts of pixels per bin +mybinx = 4 +mybiny = 4 +# Percentile thresholds for gradient-based edge detection +perc_grad_low = 80 +perc_grad_high = 95 +# Percentile threshold for intensity-based filtering of high gradient pixels +perc_int = 80 + +[fit_im] +# Amount of pixels per bin +mybinx = 2 +mybiny = 2 +# Percentile thresholds for gradient-based edge detection +perc_grad_low = 99.9 +perc_grad_high = 100 +# Percentile threshold for intensity-based filtering of high gradient pixels +perc_int = 99.9 + +[ref_pup] +# (pos_x,pos_y,radius) in px : TO BE COMPLETED +beam1 = (1482,734,621) +beam2 = (1412,718,677) +beam3 = (1474,696,666) +beam4 = (1291,624,696) + +[ref_im] +# (pos_x,pos_y,radius) in px : TO BE COMPLETED +beam1 = (92,86,4) +beam2 = (88,136,4) +beam3 = (84,184,5) +beam4 = (76,236,4) + +[convert_dict] +im_ip = String +pup_ip = String +StreamAutoNegotiatePacketSize = Bool +StreamPacketResendEnable = Bool +ExposureAuto = String +AcquisitionFrameRateEnable = Bool +AcquisitionFrameRate = Float +ExposureTimeSelector = String +ExposureTime = Float +GainAuto = String +Gain = Float +BlackLevel = Float +GammaEnable = Bool +Gamma = Float +Width = Int +Height = Int +OffsetX = Int +OffsetY = Int +sigma = Int +mybinx = Int +mybiny = Int +perc_grad_low = Float +perc_grad_high = Float +perc_int = Float +N = Int +rfit = Int +beam1 = IntTuple +beam2 = IntTuple +beam3 = IntTuple +beam4 = IntTuple + + + diff --git a/nottcontrol/lucid/lib/lucid_utils.py b/nottcontrol/lucid/lib/lucid_utils.py new file mode 100644 index 0000000..b2b99be --- /dev/null +++ b/nottcontrol/lucid/lib/lucid_utils.py @@ -0,0 +1,615 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Dec 17 13:36:18 2025 + +@author: Thomas +""" + +#---------# +# Imports # +#---------# + +# General +import time +import ast +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.patches import Circle +# Imports for visible camera (lucid) interfacing +from arena_api.system import system +# Imports for centroid fitting +from scipy.ndimage import gaussian_filter, sobel +from lmfit import Parameters, minimize +from astropy.modeling import models, fitting + +#---------------# +# Configuration # +#---------------# + +# Loading config +from nottcontrol.lucid import config_lucid +# Dictionary for type conversions +convert_dict = dict(config_lucid['convert_dict']) + +def convert(dic,conversion_dic): + """ + The .cfg file format does not allow for specification of parameter type. + This function converts the parameter values (retrieved from the .cfg file as strings) to the type that is specified in the input conversion_dic. + """ + # Converting to correct types + for key in dic.keys(): + if conversion_dic[key] == "Bool": + dic[key] = bool(dic[key]) + if conversion_dic[key] == "Int": + dic[key] = int(dic[key]) + if conversion_dic[key] == "Float": + dic[key] = float(dic[key]) + if conversion_dic[key] == "String": + dic[key] = str(dic[key]) + if conversion_dic[key] == "IntTuple": + dic[key] = ast.literal_eval(dic[key]) + return dic + +# Connectivity parameters +im_ip = str(config_lucid['connection']['im_ip']) +pup_ip = str(config_lucid['connection']['pup_ip']) +# Camera parameters +stream_im = convert(dict(config_lucid['stream_im']),convert_dict) +stream_pup = convert(dict(config_lucid['stream_pup']),convert_dict) +readout_im = convert(dict(config_lucid['readout_im']),convert_dict) +readout_pup = convert(dict(config_lucid['readout_pup']),convert_dict) +stream_params = {"im_cam":stream_im, "pup_cam":stream_pup} +readout_params = {"im_cam":readout_im, "pup_cam":readout_pup} +# Fitting parameters +fit_im = convert(dict(config_lucid['fit_im']),convert_dict) +fit_pup = convert(dict(config_lucid['fit_pup']),convert_dict) +fit_params = {"im_cam":fit_im, "pup_cam":fit_pup} +# Beam centroid positions and radius in reference, injecting state +ref_im = convert(dict(config_lucid['ref_im']),convert_dict) +ref_pup = convert(dict(config_lucid['ref_pup']),convert_dict) +ref_state = {"im_cam":ref_im, "pup_cam":ref_pup} + +class Utils: + ''' + Class that bundles functionalities related to the lucid visible cameras installed in the pupil and image plane. + Functionalities include: + > Creating and managing camera connections via the lucid-provided "Arena API" + > Changing the configuration (frame size, exposure time, ...) of connected cameras + > Streaming frames from the cameras by exchange of buffers + > Fitting camera frames for beam centroid positions + > Providing visual feedback + + Example use case: + + import lucid_utils + with lucid_utils.Utils() as utils: + + utils.start_streaming("im_cam") + frame,width,height = utils.get_frame("im_cam") + utils.stop_streaming("im_cam") + + ''' + + def __init__(self): + + self.devices = {} + self.streaming = {"im_cam": False, "pup_cam": False} + + def __enter__(self): + """Connect to both cameras and create associated devices for interfacing. Install default streaming configuration parameters.""" + + # Connecting to cameras and creating devices + tries = 0 + tries_max = 6 + sleep_time_secs = 10 + while tries < tries_max: # Wait no longer than 60 seconds for the devices to be connected via Ethernet + # Seek for visible cameras on the network + cam_device_infos = [] + for device_info in system.device_infos: + if device_info["model"] == "PHX064S-M": + cam_device_infos.append(device_info) + + # Both cameras have been detected on network: + if len(cam_device_infos) == 2: + # Create devices + try: + for cam_device_info in cam_device_infos: + if cam_device_info["ip"] == im_ip: + im_cam_device = system.create_device(cam_device_info)[0] + self.devices["im_cam"] = im_cam_device + print("Device used for image plane:",im_cam_device) + elif cam_device_info["ip"] == pup_ip: + pup_cam_device = system.create_device(cam_device_info)[0] + self.devices["pup_cam"] = pup_cam_device + print("Device used for pupil plane:",pup_cam_device) + + # Installing default readout and streaming configuration + for name in self.devices.keys(): + self.configure_camera_readout(name,**readout_params[name]) + self.configure_camera_stream(name,**stream_params[name]) + + # Return self to user to allow for utils access. + return self + + except Exception as e: + self._clean() + raise e + + else: + print(f'Try {tries+1} of {tries_max}: waiting for {sleep_time_secs} seconds for both visible cameras to be connected via Ethernet.') + for sec_count in range(sleep_time_secs): + time.sleep(1) + print(f'{sec_count+1 } seconds passed ', '.' * sec_count, end='\r') + tries += 1 + + raise Exception('Could not find both visible cameras on the network. Please check their Ethernet connection and try again.') + + def __exit__(self,exc_type,exc_value,traceback): + """Stop any ongoing buffer streaming, close all devices.""" + for name in self.devices.keys(): + self.stop_streaming(name) + self._clean() + # Returning False as to not suppress any error messages. + return False + + def _clean(self): + """Destroy all opened devices.""" + for device in self.devices.values(): + system.destroy_device(device) + print("All devices closed.") + self.devices = {} + + #------------------------------# + # Camera configuration control # + #------------------------------# + + def configure_camera_readout(self,name,**params): + """Set the readout configuration parameters for camera "name.""" + + if not isinstance(name,str): + name = str(name) + + if name not in self.devices.keys(): + raise Exception(f"A camera device with name {name} does not exist.") + + device = self.devices[name] + nodemap = device.nodemap + + for param, value in params.items(): + + if not isinstance(param,str): + param = str(param) + + if param in nodemap.feature_names: + nodemap[param].value = value + else: + print(f"Parameter {param} not found in the nodemap of camera {name}.") + + print(f"Camera {name} readout parameters configured.") + + def configure_camera_stream(self,name,**params): + """Set the streaming configuration parameters for camera "name".""" + + if not isinstance(name,str): + name = str(name) + + if name not in self.devices.keys(): + raise Exception(f"A camera device with name {name} does not exist.") + + device = self.devices[name] + stream_nodemap = device.tl_stream_nodemap + + for param, value in params.items(): + + if not isinstance(param,str): + param = str(param) + + if param in stream_nodemap.feature_names: + stream_nodemap[param].value = value + else: + print(f"Parameter {param} not found in the stream nodemap of camera {name}.") + + print(f"Camera {name} streaming parameters configured.") + + def get_camera_info(self,name,param): + """Return the value corresponding to the given "param" for camera "name", in either stream or readout nodemaps.""" + + if not isinstance(name,str): + name = str(name) + if not isinstance(param,str): + param = str(param) + + device = self.devices[name] + nodemap = device.nodemap + stream_nodemap = device.tl_stream_nodemap + try: + val = nodemap[param].value + except: + val = stream_nodemap[param].value + + print(f'Camera {name} has parameter {param} set to {val}.') + + + #-----------------------------------------# + # Streaming control and frame acquisition # + #-----------------------------------------# + + def start_streaming(self,name): + """Start streaming on camera "name".""" + + if not isinstance(name,str): + name = str(name) + + if self.streaming[name]: + print(f"Camera {name} is already streaming.") + else: + device = self.devices[name] + device.start_stream() + print(f"Camera {name} started streaming.") + + self.streaming[name] = True + + def stop_streaming(self,name): + """Stop streaming on camera "name".""" + + if not isinstance(name,str): + name = str(name) + + if self.streaming[name]: + device = self.devices[name] + device.stop_stream() + print(f"Camera {name} stopped streaming.") + else: + print(f"Camera {name} is not streaming.") + + self.streaming[name] = False + + def get_frame(self,name): # To be checked : Is "with device.start_stream()" superior? + """Retrieve a frame (and its width & height) from camera "name".""" + + if not isinstance(name,str): + name = str(name) + + device = self.devices[name] + nodemap = device.nodemap + + if not self.streaming[name]: + raise Exception(f'Camera {name} is not streaming, cannot read frames.') + + buffer = device.get_buffer() + frame = np.array(buffer.data, dtype=np.uint8) + frame = frame.reshape(buffer.height, buffer.width) + # Width and height + w = nodemap["Width"].value + h = nodemap["Height"].value + # Requeue to release buffer memory + device.requeue_buffer(buffer) + + return frame,w,h + + def get_fit(self,name,beam_nr,visual_feedback): + """Fit for the centroid position and radius of a single beam that is visible on camera "name". + beam_nr is either 1,2,3 or 4. Beams are numbered counting towards the bench edge; beam 1 is the innermost one, beam 4 the outermost one. + If "visual_feedback" is True, the frame and identified centroid / beam size are plotted. + """ + + if not isinstance(name,str): + name = str(name) + + if name == "im_cam": + return self.get_fit_im(beam_nr,visual_feedback,**fit_params[name]) + if name == "pup_cam": + return self.get_fit_pup(beam_nr,visual_feedback,**fit_params[name]) + else: + raise Exception(f"Camera with name {name} not recognized. Please specify either 'im_cam' or 'pup_cam' as name.") + + def get_fit_im(self,beam_nr,visual_feedback,**params): + """ + Fit for the centroid position and radius of a single beam that is visible on the image camera. + beam_nr is either 1,2,3 or 4. Beams are numbered counting towards the bench edge; beam 1 is the innermost one, beam 4 the outermost one. + If "visual_feedback" is True, the frame and identified centroid / beam size are plotted. + """ + # Unpack parameters + mybinx,mybiny,perc_grad_low,perc_grad_high,perc_int = params["mybinx"],params["mybiny"],params["perc_grad_low"],params["perc_grad_high"],params["perc_int"] + # Name of considered beam + beam_name = "beam"+str(beam_nr) + # Reference state of considered beam + ref = ref_state["im_cam"][beam_name] + # Binning function + def bin_frame(data, binning_x, binning_y): + h, w = data.shape + bins_x = h // binning_x + bins_y = w // binning_y + a = data.reshape(bins_x, binning_x, bins_y, binning_y).mean(axis=(1,3)) + return a + + #--------------# + # Taking frame # + #--------------# + myframe,w,h = self.get_frame("im_cam") + myframe_bin = bin_frame(myframe,mybinx,mybiny) + #------------------------------------------------------------# + # Detecting the beam edge by gradient and intensity criteria # + #------------------------------------------------------------# + # Calculating gradients (Sobel operator) + grad_x = sobel(myframe_bin,axis=1) + grad_y = sobel(myframe_bin,axis=0) + grad_mag = np.sqrt(grad_x**2 + grad_y**2) + # Determining the pixels with the highest gradient + thresh_grad_low = np.percentile(grad_mag,perc_grad_low) + thresh_grad_high = np.percentile(grad_mag,perc_grad_high) + mask_grad_low = grad_mag >= thresh_grad_low + mask_grad_high = grad_mag <= thresh_grad_high + mask_grad = mask_grad_low & mask_grad_high + # Determining the pixels with the highest intensity + thresh_int = np.percentile(myframe_bin,perc_int) + mask_int = myframe_bin >= thresh_int + # Determining the pixels that belong to the beam edge by convolving above two criteria + mask_edge = mask_grad & mask_int + #-----------------# + # Initial guesses # + #-----------------# + # Guess for centroid position + rows, cols = np.where(mask_edge) + centroid_x = np.mean(cols) + centroid_y = np.mean(rows) + print("Centroid guess xy: ", centroid_x*mybinx, centroid_y*mybiny) + # Guess for beam radius + radius = np.hypot(np.std(rows),np.std(cols)) + print("Radius guess: ", radius*mybinx) + # Guess for total flux in binned beam + x, y = np.meshgrid(np.arange(w/mybinx), np.arange(h/mybiny), ) + mask_circle = np.hypot(x-centroid_x,y-centroid_y) < radius + flux = np.sum(myframe_bin[mask_circle]) + print("Flux guess: ", flux*mybinx*mybiny) + # Detector noise estimate (TBD) + amin, amax = np.percentile(myframe, 99.5.), np.percentile(myframe, 99.9) + amask = (myframe <= amax) * (myframe >= amin) + noise = np.std(myframe[amask]) + print("Noise estimation: ", noise) + #---------# + # Fitting # + #---------# + param = Parameters() + param.add("x_loc", centroid_x, min=0, max=w) + param.add("y_loc", centroid_y, min=0, max=h) + param.add("radius", radius, min=0.01*radius, max=5*radius, vary=True) + param.add("flux", flux, min = 0.01*flux, max = 5*flux, vary=True) + + refearray = np.zeros_like(myframe_bin) + + def disc(aparam): + model = np.zeros(refearray.shape) + xx, yy = np.meshgrid(np.arange(model.shape[1]), np.arange(model.shape[0]), ) + rad = np.hypot(xx-aparam["x_loc"], yy-aparam["y_loc"]) + mod = np.exp(-(rad**2/aparam["radius"]**2)**8) + mod = aparam["flux"] / mod.sum() * mod + return mod + + def residual(aparam, data, error): + model = disc(aparam) + myresid = (data - model)**2/error**2 + return myresid.flatten() + + res = minimize(residual, param, args=[myframe_bin, noise]) + + #-------------# + # Fit results # + #-------------# + # Fitted centroid & radius + centroid_x_fit = res.params["x_loc"]*mybinx + centroid_y_fit = res.params["y_loc"]*mybiny + # Only considering circular beams + radius_fit = res.params["radius"]*mybinx + + #-----------------# + # Visual feedback # + #-----------------# + if visual_feedback: + + fig = plt.figure(figsize=(15,10)) + ax = fig.add_subplot(111) + img = ax.imshow(myframe) + + fit_beam = Circle((centroid_x_fit, centroid_y_fit), radius_fit, + color='blue', fill=False, linewidth=2,ls=":",label="Current") + ref_beam = Circle((ref[0], ref[1]), ref[2], + color='red', fill=False, linewidth=2,label="Reference") + + ax.add_patch(fit_beam) + ax.add_patch(ref_beam) + + ax.scatter(centroid_x_fit,centroid_y_fit,color="blue",s=8) + ax.scatter(ref[0],ref[1],color="red",s=8) + + # Set tick labels + Nticks = 10 + xticks = np.linspace(0,w-1,Nticks) + yticks = np.linspace(0,h-1,Nticks) + labelsx = np.round(np.linspace(0,w-1,Nticks)*2.4,0) + labelsy = np.round(np.linspace(0,h-1,Nticks)*2.4,0) + ax.axes.get_xaxis().set_ticks(xticks) + ax.axes.get_yaxis().set_ticks(yticks) + ax.set_xticklabels(labelsx) + ax.set_yticklabels(labelsy) + + # Add axis labels + ax.set_xlabel('Relative Position (um)', fontsize=14) + ax.set_ylabel('Relative Position (um)', fontsize=14) + + clb = plt.colorbar(img) + clb.ax.set_title('Counts',fontsize=12) + ax.legend() + ax.grid(color="white",linestyle="--",linewidth=0.5) + + fig.suptitle("Image camera view", fontsize=24) + fig.canvas.draw() + fig.canvas.flush_events() + fig.show() + + return centroid_x_fit,centroid_y_fit,radius_fit + + def get_fit_pup(self,beam_nr,visual_feedback,**params): + """ + Fit for the centroid position and radius of a single beam that is visible on the pupil camera. + beam_nr is either 1,2,3 or 4. Beams are numbered counting towards the bench edge; beam 1 is the innermost one, beam 4 the outermost one. + If "visual_feedback" is True, the frame and identified centroid / beam size are plotted. + """ + # Unpack parameters + sigma,mybinx,mybiny,perc_grad_low,perc_grad_high,perc_int = params["sigma"],params["mybinx"],params["mybiny"],params["perc_grad_low"],params["perc_grad_high"],params["perc_int"] + # Name of considered beam + beam_name = "beam"+str(beam_nr) + # Reference state of considered beam + ref = ref_state["pup_cam"][beam_name] + # Binning function + def bin_frame(data, binning_x, binning_y): + h, w = data.shape + bins_x = h // binning_x + bins_y = w // binning_y + a = data.reshape(bins_x, binning_x, bins_y, binning_y).mean(axis=(1,3)) + return a + + #---------------------------------------# + # Taking frame, smoothening and binning # + #---------------------------------------# + myframe,w,h = self.get_frame("pup_cam") + myframe_smooth = gaussian_filter(myframe,sigma) + myframe_smooth_bin = bin_frame(myframe_smooth,mybinx,mybiny) + myframe_bin = bin_frame(myframe,mybinx,mybiny) + #------------------------------------------------------------# + # Detecting the beam edge by gradient and intensity criteria # + #------------------------------------------------------------# + # Calculating gradients (Sobel operator) + grad_x = sobel(myframe_smooth_bin,axis=1) + grad_y = sobel(myframe_smooth_bin,axis=0) + grad_mag = np.sqrt(grad_x**2 + grad_y**2) + # Determining the pixels with the highest gradient + thresh_grad_low = np.percentile(grad_mag,perc_grad_low) + thresh_grad_high = np.percentile(grad_mag,perc_grad_high) + mask_grad_low = grad_mag >= thresh_grad_low + mask_grad_high = grad_mag <= thresh_grad_high + mask_grad = mask_grad_low & mask_grad_high + # Determining the pixels with the highest intensity + thresh_int = np.percentile(myframe_smooth_bin,perc_int) + mask_int = myframe_smooth_bin >= thresh_int + # Determining the pixels that belong to the beam edge by convolving above two criteria + mask_edge = mask_grad & mask_int + #-----------------# + # Initial guesses # + #-----------------# + # Guess for centroid position + rows, cols = np.where(mask_edge) + centroid_x = np.mean(cols) + centroid_y = np.mean(rows) + print("Centroid guess xy: ", centroid_x*mybinx, centroid_y*mybiny) + # Guess for beam radius + radius = np.hypot(np.std(rows),np.std(cols)) + print("Radius guess: ", radius*mybinx) + # Guess for total flux in binned beam + x, y = np.meshgrid(np.arange(w/mybinx), np.arange(h/mybiny), ) + mask_circle = np.hypot(x-centroid_x,y-centroid_y) < radius + flux = np.sum(myframe_bin[mask_circle]) + print("Flux guess: ", flux*mybinx*mybiny) + # Detector noise estimate (TBD) + amin, amax = np.percentile(myframe, 55.), np.percentile(myframe, 65.) + amask = (myframe <= amax) * (myframe >= amin) + noise = np.std(myframe[amask]) + print("Noise estimation: ", noise) + #---------# + # Fitting # + #---------# + param = Parameters() + param.add("x_loc", centroid_x, min=0, max=w) + param.add("y_loc", centroid_y, min=0, max=h) + param.add("radius", radius, min=0.5*radius, max=1.5*radius, vary=True) + param.add("flux", flux, min = 0.5*flux, max = 1.5*flux, vary=True) + + refearray = np.zeros_like(myframe_bin) + + def disc(aparam): + model = np.zeros(refearray.shape) + xx, yy = np.meshgrid(np.arange(model.shape[1]), np.arange(model.shape[0]), ) + rad = np.hypot(xx-aparam["x_loc"], yy-aparam["y_loc"]) + mod = np.exp(-(rad**2/aparam["radius"]**2)**8) + mod = aparam["flux"] / mod.sum() * mod + return mod + + def residual(aparam, data, error): + model = disc(aparam) + myresid = (data - model)**2/error**2 + return myresid.flatten() + + res = minimize(residual, param, args=[myframe_bin, noise]) + + #-------------# + # Fit results # + #-------------# + # Fitted centroid & radius + centroid_x_fit = res.params["x_loc"]*mybinx + centroid_y_fit = res.params["y_loc"]*mybiny + # Only considering circular beams + radius_fit = res.params["radius"]*mybinx + + #-----------------# + # Visual feedback # + #-----------------# + if visual_feedback: + + fig = plt.figure(figsize=(15,10)) + ax = fig.add_subplot(111) + img = ax.imshow(myframe) + + fit_beam = Circle((centroid_x_fit, centroid_y_fit), radius_fit, + color='blue', fill=False, linewidth=2,ls=":",label="Current") + ref_beam = Circle((ref[0], ref[1]), ref[2], + color='red', fill=False, linewidth=2,label="Reference") + + ax.add_patch(fit_beam) + ax.add_patch(ref_beam) + + ax.scatter(centroid_x_fit,centroid_y_fit,color="blue") + ax.scatter(ref[0],ref[1],color="red") + + # Set tick labels + Nticks = 10 + xticks = np.linspace(0,w-1,Nticks) + yticks = np.linspace(0,h-1,Nticks) + labelsx = np.round(np.linspace(0,w-1,Nticks)*2.4,0) + labelsy = np.round(np.linspace(0,h-1,Nticks)*2.4,0) + ax.axes.get_xaxis().set_ticks(xticks) + ax.axes.get_yaxis().set_ticks(yticks) + ax.set_xticklabels(labelsx) + ax.set_yticklabels(labelsy) + + # Add axis labels + ax.set_xlabel('Relative Position (um)', fontsize=14) + ax.set_ylabel('Relative Position (um)', fontsize=14) + + clb = plt.colorbar(img) + clb.ax.set_title('Counts',fontsize=12) + ax.legend() + ax.grid(color="white",linestyle="--",linewidth=0.5) + + fig.suptitle("Pupil camera view", fontsize=24) + fig.canvas.draw() + fig.canvas.flush_events() + fig.show() + + return centroid_x_fit,centroid_y_fit,radius_fit + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nottcontrol/script/cfg/config.cfg b/nottcontrol/script/cfg/config.cfg index fb0a641..c75b304 100644 --- a/nottcontrol/script/cfg/config.cfg +++ b/nottcontrol/script/cfg/config.cfg @@ -12,15 +12,17 @@ t_write = 20 # ------- # # bool_UT is True when NOTT is connected to the UTs. bool_UT = True +# bool_offset is True when actuator motions should incorporate empirical offsets, retrieved from the simulated grids. +bool_offset = False # --------# # Spirals | # ------- # # If (photo - noise_mean) / noise_std > fac_loc, the spiral is not started (localization spiral). fac_loc = 200 # How much should the SNR improvement be for injection to be claimed (localization spiraling)? -SNR_inj = 3 +SNR_inj = 10 # How much samples of dt_sample should have a SNR improvement > SNR_inj for injection to be called (localization spiral) -Ncrit = 3 +Ncrit = 2 # Boundary stop condition for on-sky spiraling (localization spiral). Nsteps_skyb = 10 # One measurement (noise/photometric) should consist of N exposures (localization & optimization spiraling). @@ -29,11 +31,11 @@ Nexp = 1 # Actuators | # --------- # # An actuator motion smaller than disp_double (mm) is carried out by a double-step motion. -disp_double = 0.002 +disp_double = 0.0005 # The overshoot stepsize of the double-step motion (mm). -step_double = 0.003 +step_double = 0.001 # The speed of the double-step motion (mm/s). -speed_double = 0.0018 +speed_double = 0.002 [tip_tilt_control] #units are in mm diff --git a/nottcontrol/script/lib/nott_TTM_alignment.py b/nottcontrol/script/lib/nott_TTM_alignment.py index d1e8ec5..8b80c83 100644 --- a/nottcontrol/script/lib/nott_TTM_alignment.py +++ b/nottcontrol/script/lib/nott_TTM_alignment.py @@ -11,24 +11,37 @@ # TO DO # #-------# -# A) Complete act_pos_align (actuator positions in state of alignment) for each config (once more actuators are installed). -# --> TO BE COMPLETED, one all actuators are installed and spiraling algorithms are fully functional and performant. +# ! A) Complete act_pos_align, the actuator positions in a state of alignment, for each configuration. +# --> Motorize all four beams +# --> First positions after manual optimization of injection +# --> Second positions once flexure mounts are in & localization/optimization algorithms are (hopefully) fully functional and performant. -# B) Generalize sky-ttm angle relations to account for different AT aperture size. -# --> DONE, no change as Heimdallr brings all beams to 12 mm regardless of AT pupil diameter. +# ! B) Add tool to align the beams on the visible cameras +# --> Implement Arena_API functionalities: connecting to cameras, streaming frames and fitting centroids +# --> Implement visual feedback: beam position and size in state of optimal injection vs. current beam position and size +# --> Implement motion control: generalize TTM control functions to all four beams +# --> Implement optimization algorithm -# C) Add function that calculates the rotation angle between on-sky cartesian and NOTT image/pupil plane cartesian frames. -# --> TO BE COMPLETED, got VLTI maps from M.A.M. and still need Asgard 3D models to have a grasp of the complete sequence of passed mirrors from post-switchyard to NOTT image plane. +# ! C) Add tool to calculate dispersed null depths +# --> Change ROIs accordingly +# --> Output data frames -# D) Rerun actuator performance grids -# --> The unability to recover optimal injection configurations is likely due to sub-par actuator (TTM2) resolution. -# --> Rerun actuator grids, close to aligned positions, extending to smaller displacements (down to 0.5 um) +# D) Add a function that calculates the rotation angle between on-sky cartesian and NOTT image/pupil plane cartesian frames. +# --> Got VLTI maps from M.A.M., still need Asgard 3D models to have a grasp of the complete sequence of passed mirrors from post-switchyard to NOTT image plane. -# E) General efficiency of code & documentation where relevant (i.e. non-testing functions) +# E) Revise the actuator-angle relations once flexure mounts are in. -# H) Write and run framework performance -# I) Optimize efficiency based on previous two (decide on spiral steps&speeds) -# J) Write and run algorithm performance +# F) Check symbolic & numeric framework setup. +# --> What changes were made to the bench in the past year? How to change the framework to account for this? +# --> Translation distances changed? Effect of flexure mounts on distances? Need re-simulation in Zemax? +# --> Specifications of any optics changed? OAP2: Horizontal diameter increased but shape and focal length should be unchanged. + +# G) Improve general code efficiency. +# H) Complete documentation. + +# I) Re-run framework performance with the flexure mounts. Compare to thesis results. +# J) Optimize the efficiency of localization & optimization algorithms - i.e., decide on spiral steps and speeds. +# K) Write and run algorithm performance. #---------# # Imports # @@ -44,6 +57,13 @@ import time import logging +# Arena API (Visible cameras) +import arena_api +from arena_api.system import system +# Scipy/Astropy (visible camera beam fitting) +from astropy.modeling import models, fitting +import scipy + # OPCUA / redis import redis from nottcontrol.opcua import OPCUAConnection @@ -61,6 +81,7 @@ from nottcontrol import config as nott_config from nottcontrol.script import config_alignment from nottcontrol.script import data_files + #-----------------------------# # Parameters from config file # #-----------------------------# @@ -69,6 +90,7 @@ # Global parameters t_write = int(config_alignment['redis']['t_write']) bool_UT = (config_alignment['injection']['bool_UT'] == "True") +bool_offset = (config_alignment['injection']['bool_offset'] == "True") fac_loc = int(config_alignment['injection']['fac_loc']) SNR_inj = int(config_alignment['injection']['SNR_inj']) Ncrit = int(config_alignment['injection']['Ncrit']) @@ -77,11 +99,8 @@ disp_double = float(config_alignment['injection']['disp_double']) step_double = float(config_alignment['injection']['step_double']) speed_double = float(config_alignment['injection']['speed_double']) -print("Read configuration [t_write,bool_UT,fac_loc,SNR_inj,Ncrit,Nsteps_skyb,Nexp,disp_double,step_double,speed_double] : ",[t_write,bool_UT,fac_loc,SNR_inj,Ncrit,Nsteps_skyb,Nexp,disp_double,step_double,speed_double]) +print("Read configuration [t_write,bool_UT,bool_offset,fac_loc,SNR_inj,Ncrit,Nsteps_skyb,Nexp,disp_double,step_double,speed_double] : ",[t_write,bool_UT,bool_offset,fac_loc,SNR_inj,Ncrit,Nsteps_skyb,Nexp,disp_double,step_double,speed_double]) -#-----------------# -# Auxiliary Grids # -#-----------------# class alignment: def __init__(self): @@ -113,7 +132,8 @@ def __init__(self): (4) Define vector N globally, comprising the four symbolic equations. (5) Translate the obtained four equations into one single matrix equation b=Ma, with b the shifts and a the angular offsets. (6) Define matrix M and vector of symbolic shifts b globally. - (7) Prepare all actuators for use. + (7) Define the actuator positions, corresponding to a state of optimized injection, globally. + (8) Prepare all actuators for use. Defines ------- @@ -221,6 +241,9 @@ def ThickLens(p1, p2, dv, nv, nprimv, n2v, M): self.b = bloc.copy() self.N = eqns_.copy() + # Defining actuator positions corresponding to an aligned & injecting state. + self.act_pos_align = np.array([[4.172522,4.7416215,4.7712065,3.676958],[3.0494205,3.4115165,4.4507735,3.7884355],[4.3811115,4.759281,4.7679125,3.1275005],[4.8730765,4.678456,4.819549,3.992511]],dtype=np.float64) + ''' # Opening all shutters all_shutters_open(4) @@ -259,6 +282,8 @@ def ThickLens(p1, p2, dv, nv, nprimv, n2v, M): actuators[i].enable() time.sleep(0.050) + self.align() + # Closing OPCUA connection opcua_conn.disconnect() ''' @@ -762,7 +787,7 @@ def _get_actuator_pos(self,config): return [pos,timestamp] - def _actuator_position_to_ttm_angle(self,pos,config): + def _actuator_position_to_ttm_angle(self,pos,config): # TBC """ Description ----------- @@ -798,17 +823,16 @@ def _actuator_position_to_ttm_angle(self,pos,config): ttm_angles_optim = np.array([[0.10,32,-0.11,-41],[4.7,-98,4.9,30],[-2.9,134,-3.1,-107],[3.7,115,3.3,-141]],dtype=np.float64)*10**(-6) ttm_config = ttm_angles_optim[config] # Actuator positions in a state of alignment (mm) - act_pos_align = np.array([[0,0,0,0],[5.219526,5.4300675,3.4311585,3.94609],[0,0,0,0],[0,0,0,0]],dtype=np.float64) #TBC - act_config = act_pos_align[config] + act_config = self.act_pos_align[config] # TTM1X xsum_align = act_config[0]+act_config[1] xsum_input = pos[0]+pos[1] - TTM1X = ttm_config[0] - np.arcsin((xsum_align-xsum_input)/(2*d1_ca)) + TTM1X = ttm_config[0] - np.arcsin((xsum_align-xsum_input)/(2*d1_ca)) # TTM1Y xdiff_align = act_config[1]-act_config[0] xdiff_input = pos[1]-pos[0] - TTM1Y = +ttm_config[1] + np.arcsin((xdiff_input-xdiff_align)/(2*d1_ca)) + TTM1Y = +ttm_config[1] + np.arcsin((xdiff_input-xdiff_align)/(2*d1_ca)) # TTM2X TTM2X = ttm_config[2] - np.arcsin((pos[3]-act_config[3])/d2_ca) # TTM2Y @@ -818,7 +842,7 @@ def _actuator_position_to_ttm_angle(self,pos,config): return ttm_angles - def _ttm_angle_to_actuator_position(self,ttm_angles,config): + def _ttm_angle_to_actuator_position(self,ttm_angles,config): # TBC """ Description ----------- @@ -853,16 +877,15 @@ def _ttm_angle_to_actuator_position(self,ttm_angles,config): ttm_angles_optim = np.array([[0.10,32,-0.11,-41],[4.7,-98,4.9,30],[-2.9,134,-3.1,-107],[3.7,115,3.3,-141]],dtype=np.float64)*10**(-6) ttm_config = ttm_angles_optim[config] # Actuator positions in a state of alignment (mm) - act_pos_align = np.array([[0,0,0,0],[5.219526,5.4300675,3.4311585,3.94609],[0,0,0,0],[0,0,0,0]],dtype=np.float64) #TBC - act_config = act_pos_align[config] + act_config = self.act_pos_align[config] # TTM1 xsum_align = (act_config[0]+act_config[1])/2 xdiff_align = (act_config[1]-act_config[0])/2 xsum = xsum_align - d1_ca*np.sin(ttm_config[0]-ttm_angles[0]) - xdiff = xdiff_align - d1_ca*np.sin(ttm_config[1]-ttm_angles[1]) - x1 = xsum-xdiff - x2 = xsum+xdiff + xdiff = xdiff_align - d1_ca*np.sin(ttm_config[1]-ttm_angles[1]) + x1 = xsum-xdiff #TBD + x2 = xsum+xdiff #TBD # TTM2 x3 = act_config[2] - d2_ca*np.sin(ttm_config[3]-ttm_angles[3]) @@ -961,8 +984,13 @@ def _actoffset(self,act_speed,act_disp): """ - # Snap accuracies - accur_snap = np.array(self._snap_accuracy_grid(act_speed,act_disp),dtype=np.float64) + # Only return an empirical offset if the global variable is set to incorporate offsets. + if bool_offset: + # Snap accuracies + accur_snap = np.array(self._snap_accuracy_grid(act_speed,act_disp),dtype=np.float64) + else: + # No empirical offsets to be incorporated + accur_snap = np.zeros(4,dtype=np.float64) return accur_snap @@ -1040,8 +1068,7 @@ def _get_noise(self,N,t,dt): ''' Description ----------- - Function returns the average noise value (=ROI9), derived from "N" exposures of duration "t" each. - Shutters are not closed during this procedure. + Function returns the average background and noise values (=ROI9), derived as the average of "N" exposures of duration "dt" each. Parameters ---------- @@ -1062,7 +1089,9 @@ def _get_noise(self,N,t,dt): ''' # Background measurements exps = [] - # Gathering five background exposures + # Noise measurements + noises = [] + # Gathering five exposures for j in range(0, N): if (j!=0): time.sleep(dt) @@ -1071,10 +1100,11 @@ def _get_noise(self,N,t,dt): exp_av = get_field("roi9_avg",t_start,t_stop,True) exp_full = get_field("roi9_avg",t_start,t_stop,False) exps.append(exp_av[1]) - # Taking the std - noise = exp_full.std(0)[1] + noises.append(exp_full.std(0)[1]) + # Taking the mean mean = np.mean(exps) + noise = np.mean(noises) return mean,noise @@ -1082,8 +1112,8 @@ def _get_photo(self,N,t,dt,config): ''' Description ----------- - Function returns the average background value (=ROI9), derived from "N" exposures of duration "t" each. - Shutters are closed during this procedure. + Function returns the photometric output value, for a certain beam channel (config), derived as the average of "N" exposures of duration "dt" each. + Parameters ---------- @@ -1094,12 +1124,13 @@ def _get_photo(self,N,t,dt,config): N : single integer Amount of exposures. config : single integer - Configuration parameter. - + Configuration number (= VLTI input beam) (0,1,2,3). + Nr. 0 corresponds to the innermost beam, Nr. 3 to the outermost one (see figure 3 in Garreau et al. 2024 for reference). + Returns ------- - back : single float - Background average value + photo : single float + Photometric output average value ''' # REDIS field names of photometric outputs' ROIs @@ -1252,9 +1283,9 @@ def _move_abs_ttm_act(self,init_pos,disp,speeds,pos_offset,config,sample=False,d ----------- The function moves all actuators (1,2,3,4) in a configuration "config" (=beam), initially at positions "init_pos", by given displacements "disp", at speeds "speeds", taking into account offsets "pos_offset". - Actuator positions are sampled during the motion, by timestep "dt_sample", as well as the timestamps at which they were read out. + Actuator positions are sampled during the motion, by timestep "dt_sample". The timestamps at which they were read out are also registered. If "sample" is True, real-time photometric ROI values are also sampled, the timestamps of which lack behind by t_delay. - In the context of spiraling, actuator errors "err_prev" of the previous step are taken into account. + In the context of spiraling, actuator errors "err_prev" of the previous step can be taken into account. Actuator naming convention within a configuration : 1 : TTM1 actuator that is closest to the bench edge 2 : TTM1 actuator that is furthest from the bench edge @@ -1296,7 +1327,7 @@ def _move_abs_ttm_act(self,init_pos,disp,speeds,pos_offset,config,sample=False,d roi : list of floats List of ROI photometric output values, for the specified config, sampled throughout the actuator motion (if sample == true). err : List of floats (mm) - Errors made upon actuator motions. Positive values indicate overshoot. + Actuator errors made upon actuator motions. Positive values indicate overshoot. """ @@ -1517,7 +1548,7 @@ def individual_step(self,bool_slicer,sky,steps,speeds,config,sample,dt_sample=0. t_delay : single float (ms) Amount of time that the internal Infratec clock lacks behind the Windows lab pc clock. err_prev : numpy array of float values (mm) - Errors made upon a previous spiral step, to be carried over to the next one for purpose of accuracy. + Actuator errors made upon a previous spiral step, to be carried over to the next one for purpose of accuracy. act_disp_prev : numpy array of float values (mm) Actuator motions made in a previous step. In the context of spiraling, it is often useful (efficiency-wise) to not explicitly recompute actuator motions that are similar to the previous step. @@ -1619,11 +1650,12 @@ def localization_spiral(self,sky,step,speed,config,dt_sample): sky : single boolean If True : spiral by given dimension on-sky If False : spiral by given dimension in image plane - step : single float value (mm) + step : single float value The dimension by which the spiral should make its steps. If sky == True : on-sky angular step (radian) Note : It is recommended to take the apparent on-sky angular radius of the source as step. - If sky == False : dummy parameter, 20 micron (waveguide dimension) is taken by default. + If sky == False : image plane step (mm) + Note : Dummy parameter, 20 micron (waveguide dimension) is taken by default. speed : single float value (mm/s) Actuator speed by which a spiral step should occur Note: Parameter to be removed once an optimal speed is recovered (which balances efficiency and accuracy) @@ -1767,7 +1799,7 @@ def _update_plot(indplotpar,val): # Carrying out step(s) for i in range(0,Nsteps): # Step - speeds = np.array([speed,speed,speed/10,speed/10], dtype=np.float64) # TBD + speeds = np.array([speed,speed,speed,speed], dtype=np.float64) # TBD _,_,acts,act_times,rois,err,act_disp = self.individual_step(True,sky,moves[move],speeds,config,True,dt_sample,t_delay,err_prev,act_disp_prev) # Saving errors for next step err_prev = np.array(err,dtype=np.float64) @@ -1809,7 +1841,7 @@ def _update_plot(indplotpar,val): # Necessary displacements act_disp = ACT_final-act_curr #print("Necessary displacements to bring the bench to injecting state : ", act_disp, " mm.") - speeds = np.array([0.0001,0.0001,0.0001,0.0001],dtype=np.float64) #TBD + speeds = np.array([0.0011,0.0011,0.0011,0.0011],dtype=np.float64) #TBD pos_offset = self._actoffset(speeds,act_disp) print("Bringing to injecting actuator position at ", act_curr+act_disp, " mm.") # Push bench to configuration of optimal found injection. @@ -1840,7 +1872,8 @@ def _update_plot(indplotpar,val): # Implementing boundary stop condition for on-sky spiralling if (sky and Nsteps >= Nsteps_skyb): raise TimeoutError("The on-sky spiral scanning algorithm timed out. Consider repointing closer to source.") - + + plt.close(fig) return def optimization_spiral(self,sky,step,speed,config,dt_sample): @@ -1890,10 +1923,7 @@ def optimization_spiral(self,sky,step,speed,config,dt_sample): #if (speed > 1.22*10**(-3) or speed <= 0): # raise ValueError("Given actuator speed is beyond the accepted range (0,1.22] um/s") - if sky : - d = step - else: - d = 5*10**(-3) #(mm) + d = step # Actuator configs and times ACT = [] @@ -1962,18 +1992,11 @@ def _update_plot(indplotpar,val): return indplotpar - # STOP - # x - # | - # x---x---x---x x - # | | | - # x x---x x x - ########## | | | | | - # Spiral # x x x x x - ########## | | | | - # x x---x---x x - # | | - # x---x---x---x---x + # x---x x + ########## | | | + # Spiral # x x x + ########## | | + # x---x---x # Possible moves if sky: @@ -2008,7 +2031,7 @@ def _update_plot(indplotpar,val): # Carrying out step(s) for i in range(0,Nsteps): # Step - speeds = np.array([speed,speed,speed/10,speed/10], dtype=np.float64) # TBD + speeds = np.array([speed,speed,speed,speed], dtype=np.float64) # TBD _,_,acts,act_times,rois,err,act_disp = self.individual_step(True,sky,moves[move],speeds,config,True,dt_sample,t_delay,err_prev,act_disp_prev) # Saving error for next step. err_prev = np.array(err,dtype=np.float64) @@ -2051,18 +2074,415 @@ def _update_plot(indplotpar,val): # Necessary displacements act_disp = ACT_final-act_curr #speeds = np.array(np.abs(act_disp/100),dtype=np.float64) - speeds = np.array([0.0001,0.0001,0.0001,0.0001],dtype=np.float64) #TBD + speeds = np.array([0.0011,0.0011,0.0011,0.0011],dtype=np.float64) #TBD pos_offset = self._actoffset(speeds,act_disp) print("Bringing to optimized actuator position : ", np.max(SNR_samples), "SNR improvement at ", act_curr+act_disp, " mm.") # Push bench to configuration of optimal found injection. _,_,_,_,_,_ = self._move_abs_ttm_act(act_curr,act_disp,speeds,pos_offset,config,False,0.010,self._get_delay(100,True)-t_write) + + plt.close(fig) + return + + def optimization_spiral_gradient(self,sky,step,speed,config,dt_sample,SNR_opt=5): + """ + Description + ----------- + The function traces a square spiral in the user-specified plane (either image or on-sky plane). + Throughout each step along the spiral, corresponding actuator configurations and camera averages are retrieved via opcua and stored. + If a single sample along a step shows an improvement in SNR, compared to the pre-spiral photometric output, that is larger than SNR_opt, the spiral is stopped. + The actuator configuration corresponding to this sample is then pushed to the bench. + + Parameters + ---------- + sky : single boolean + If True : spiral on-sky. + If False : spiral in image plane. + step : single float value (mm) + The dimension by which the spiral should make its steps. + If sky == True : on-sky angular step (radian) + If sky == False : dummy parameter, 5 micron is taken by default. + speed : single float value (mm/s) + Actuator speed by which a spiral step should occur. + Note: Parameter to be removed once optimal speed is obtained. + config : single integer + Configuration number (= VLTI input beam) (0,1,2,3). + Nr. 0 corresponds to the innermost beam, Nr. 3 to the outermost one (see figure 3 in Garreau et al. 2024 for reference). + dt_sample : single float (s) + Amount of time a sample should span. + + Remarks + ------- + The function is expected to be called after the "localization_spiral" function has been called. It is thus expected that a first, broad-scope alignment has already been performed. + If sky == True : Before calling this function, it is expected that the TTMs have already been aligned such that the on-sky source is imaged onto the chip input. + If sky == False : Before calling this function, it is expected that the TTMs have already been aligned such that the internal VLTI beam is injected into the chip input. + + Returns + ------- + None. + + """ + print("----------------------------------") + print("Spiraling for optimization...") + print("----------------------------------") + if (config < 0 or config > 3): + raise ValueError("Please enter a valid configuration number (0,1,2,3)") + + #if (speed > 1.22*10**(-3) or speed <= 0): + # raise ValueError("Given actuator speed is beyond the accepted range (0,1.22] um/s") + + d = step + + # Actuator configs and times + ACT = [] + ACT_times = [] + + # Delay time (total delay minus writing time) + t_delay = self._get_delay(100,True)-t_write + # Exposure time for first exposure (ms) + dt_exp_opt = 200 + # Start time for initial exposure + t_start = self._get_time(1000*time.time(),t_delay) + # Sleep + time.sleep((dt_exp_opt+t_write)*10**(-3)) + # Initial position noise measurement + _,noise = self._get_noise(Nexp,t_start,dt_exp_opt) + # Initial position photometric output measurement + photo_init = self._get_photo(Nexp,t_start,dt_exp_opt,config) + # Storing initial actuator configuration and timestamp. + act_init = self._get_actuator_pos(config)[0] + ACT.append(act_init) + ACT_times.append(self._get_time(1000*time.time(),t_delay)) + + # Container for average SNR values (for spiraling plot) + dim = 7 + SNR_max = -10*np.ones((dim,dim)) + # Appending initial exposure - defined to be zero - at initial indices k,l (indplot = [k,l]) + indplot = np.array([dim//2,dim//2]) + SNR_max[indplot[0]][indplot[1]] = 0 + + # Initializing Plot + fig = plt.figure(figsize=(10,10)) + ax = fig.add_subplot(111) + img = ax.imshow(SNR_max) + # Set limits + img.set_clim(vmin=-50, vmax=50) + # Set tick labels + xticks = np.linspace(0,dim-1,dim) + yticks = np.linspace(0,dim-1,dim) + labels = np.arange(-1000*d*(dim//2),1000*d*(dim//2+1),1000*d) + ax.axes.get_xaxis().set_ticks(xticks) + ax.axes.get_yaxis().set_ticks(yticks) + ax.set_xticklabels(labels) + ax.set_yticklabels(-labels) + # Plotting initial SNR improvement value (=0) as label + ax.text(indplot[1],indplot[0],np.round(SNR_max[indplot[0]][indplot[1]],2),ha='center',va='center',fontsize=14) + # Title + fig.suptitle("Optimization spiral", fontsize=24) + # Showing + fig.canvas.draw() + fig.canvas.flush_events() + + def _update_plot(indplotpar,val): + + # Changing the indices according to recent spiral step + indplot_change = np.array([[-1,0],[0,-1],[1,0],[0,1]]) + indplotpar += indplot_change[move] + # Storing average SNR improvement in container + SNR_max[indplotpar[0]][indplotpar[1]] = val + # Updating spiraling plot + img.set_data(SNR_max) + # Updating plot + ax.text(indplotpar[1],indplotpar[0],np.round(SNR_max[indplotpar[0]][indplotpar[1]],2),ha='center',va='center',fontsize=14) + fig.canvas.draw() + fig.canvas.flush_events() + plt.pause(0.0001) + return indplotpar + + # x---x x + ########## | | | + # Spiral # x x x + ########## | | + # x---x---x + + # Possible moves + if sky: + up=np.array([0,d,0,0],dtype=np.float64) + left=np.array([-d,0,0,0],dtype=np.float64) + down=np.array([0,-d,0,0],dtype=np.float64) + right=np.array([d,0,0,0],dtype=np.float64) + moves = np.array([up,left,down,right]) + else: + up=np.array([0,0,0,d],dtype=np.float64) + left=np.array([0,0,-d,0],dtype=np.float64) + down=np.array([0,0,0,-d],dtype=np.float64) + right=np.array([0,0,d,0],dtype=np.float64) + moves = np.array([up,left,down,right]) + + # Stop criterion + stop = False + # What move is next (index in moves array)? + move = 0 + # How many times has the move type switched? + Nswitch = 0 + # How much consequent moves are being made in a direction at the moment? + Nsteps = 1 + + while not stop: + + # Initializing err_prev + err_prev = np.zeros(4,dtype=np.float64) + # Initializing act_disp_prev + act_disp_prev = np.zeros(4,dtype=np.float64) + + # Carrying out step(s) + for i in range(0,Nsteps): + # Step + speeds = np.array([speed,speed,speed,speed], dtype=np.float64) # TBD + _,_,acts,act_times,rois,err,act_disp = self.individual_step(True,sky,moves[move],speeds,config,True,dt_sample,t_delay,err_prev,act_disp_prev) + # Saving error for next step. + err_prev = np.array(err,dtype=np.float64) + # Saving actuator steps for next step. + act_disp_prev = act_disp + + # Container for sampled ROI exposures + exps = [] + + # Storing camera values and actuator configurations + # 1) Saving photometric readout values (SNR) sampled throughout the step + for j in range(0, len(rois)): + exps.append((rois[j]-photo_init)/noise) + # 2) Saving actuator configurations and timestamps sampled throughout step. + for j in range(0, len(acts)): + ACT.append(acts[j]) + ACT_times.append(act_times[j]) + + # Check whether a sample along the step exceeds the pre-imposed SNR improvement threshold. + exps_arr = np.array(exps,dtype=np.float64) + if ((exps_arr > SNR_opt).any()): + print("A sample along the step is above the SNR improvement threshold.") + print("") + + # Update plot + indplot = _update_plot(indplot,np.max(exps)) + + # Safety sleep + time.sleep(10*t_write*10**(-3)) # TBD + # Finding optimal injection index + i_max = np.argmax(exps_arr) + #print("Index, SNR and actuator configuration of found injecting state :", i_max, SNR_samples[i_max], ACT[i_max]) + # Corresponding actuator positions + ACT_final = ACT[i_max] + # Current configuration + act_curr = self._get_actuator_pos(config)[0] + # Necessary displacements + act_disp = ACT_final-act_curr + #print("Necessary displacements to bring the bench to injecting state : ", act_disp, " mm.") + speeds = np.array([0.0011,0.0011,0.0011,0.0011],dtype=np.float64) #TBD + pos_offset = self._actoffset(speeds,act_disp) + print("Bringing to actuator position above SNR improvement threshold : ", act_curr+act_disp, " mm.") + # Push bench to configuration of optimal found injection. + _,_,_,_,_,_ = self._move_abs_ttm_act(act_curr,act_disp,speeds,pos_offset,config,False,0.010,self._get_delay(100,True)-t_write) + return + + # Updating plot + indplot = _update_plot(indplot, np.max((rois-photo_init)/noise)) + + # Reset actuator configs and times + ACT = [] + ACT_times = [] + + # Setting up next move + if move < 3: + move += 1 + else: + move = 0 + + # Stop condition + if (Nsteps == 3): + stop = True + + # Counting the amount of performed move type switches + Nswitch += 1 + + if (Nswitch % 2 == 0): + Nsteps += 1 + + # If no sample above the SNR improvement threshold is found, return to initial state. + # Current configuration + act_curr = self._get_actuator_pos(config)[0] + # Necessary displacements + act_disp = act_init-act_curr + #print("Necessary displacements to bring the bench to injecting state : ", act_disp, " mm.") + speeds = np.array([0.0011,0.0011,0.0011,0.0011],dtype=np.float64) #TBD + pos_offset = self._actoffset(speeds,act_disp) + print("No sample above SNR improvement threshold found, returning to pre-spiral state : ", act_curr+act_disp, " mm.") + # Push bench to configuration of optimal found injection. + _,_,_,_,_,_ = self._move_abs_ttm_act(act_curr,act_disp,speeds,pos_offset,config,False,0.010,self._get_delay(100,True)-t_write) + + plt.close(fig) + return + + def optimization_cross(self,sky,CS,d,speed=1.1*10**(-3),config=1,dt_sample=0.050,k=10,l=5): + """ + Description + ----------- + This function brings the beam centroid to a state of optimized injection by shaping a cross in the pupil or image plane, controlled by parameter "CS". + Motion is continued in each of the four cartesian directions as long as there is monotonuous improvement in sampled ROI values. + The sampled ROI values are averaged by a sliding window approach, with size k and stepsize l between subsequent windows. + This sliding window approach is adopted to robustly probe the general ROI trend. + When a step does not show monotonuous improvement, the actuator configuration, sampled along the step, corresponding to maximal ROI readout is pushed to the bench. + + Parameters + ---------- + sky : single boolean + If True : trace a cross on-sky. + If False : trace a cross internally. + CS : single boolean + If True, the CS position is kept fixed throughout the cross motion, which is then traced in the image plane. + If False, the IM position is kept fixed throughout the cross motion, which is then traced in the pupil plane. + d : single float value + The dimension by which the cross should make its steps. + If sky : on-sky angular stepsize (rad) + else : displacement stepsize in CS/IM plane (mm) + speed : single float value (mm/s) + Actuator speed by which a step should occur. + config : single integer + Configuration number (= VLTI input beam) (0,1,2,3). + Nr. 0 corresponds to the innermost beam, Nr. 3 to the outermost one (see figure 3 in Garreau et al. 2024 for reference). + dt_sample : single float (s) + Amount of time a sample, made along a step, should span. + k : single integer + Window size for sample averaging + l : single integer + Step size between windows + + Remarks + ------- + The function is expected to be called after the "localization_spiral" function has been called. It is thus expected that a first, broad-scope alignment has already been performed. + If sky == True : Before calling this function, it is expected that the TTMs have already been aligned such that the on-sky source is imaged onto the chip input. + If sky == False : Before calling this function, it is expected that the TTMs have already been aligned such that the internal VLTI beam is injected into the chip input. + + Returns + ------- + None. + + """ + print("----------------------------------") + print("Optimizing by cross-motion...") + print("----------------------------------") + if (config < 0 or config > 3): + raise ValueError("Please enter a valid configuration number (0,1,2,3)") + + # Delay time (total delay minus writing time) + t_delay = self._get_delay(100,True)-t_write + + # Possible moves + if sky: + if CS: + sky = 1 + else: + sky = -1 + up=np.array([0,d,0,0],dtype=np.float64) + left=np.array([-d,0,0,0],dtype=np.float64) + down=np.array([0,-d,0,0],dtype=np.float64) + right=np.array([d,0,0,0],dtype=np.float64) + moves = np.array([up,left,down,right]) + else: + sky = 0 + if CS: + up=np.array([0,0,0,d],dtype=np.float64) + left=np.array([0,0,-d,0],dtype=np.float64) + down=np.array([0,0,0,-d],dtype=np.float64) + right=np.array([0,0,d,0],dtype=np.float64) + else: + up=np.array([0,d,0,0],dtype=np.float64) + left=np.array([-d,0,0,0],dtype=np.float64) + down=np.array([0,-d,0,0],dtype=np.float64) + right=np.array([d,0,0,0],dtype=np.float64) + moves = np.array([up,left,down,right]) + + # A) Storing characteristics of initial configuration + + # Exposure time for first exposure (ms) + dt_exp_opt = 1000 + # Start time for initial exposure + t_start = self._get_time(1000*time.time(),t_delay) + # Sleep + time.sleep((dt_exp_opt+t_write)*10**(-3)) + # Initial position noise measurement + _,noise = self._get_noise(Nexp,t_start,dt_exp_opt) + # Initial position photometric output measurement + photo_init = self._get_photo(Nexp,t_start,dt_exp_opt,config) + print("Initial noise level : ", noise) + # B) Probe all four cartesian directions + dirs = ["up","left","down","right"] + for i in range(0,4): + + print(dirs[i]) + + stop = False + + while not stop: + print("Step") + + # Step + speeds = np.array([speed,speed,speed,speed], dtype=np.float64) # TBD + _,_,acts,_,rois,_,_ = self.individual_step(True,sky,moves[i],speeds,config,True,dt_sample,t_delay) + # Registering post-motion actuator configuration + act_post = self._get_actuator_pos(config)[0] + + # Set stop to True + stop = True + + # Take an average for each sliding window of size k + + n_windows = (len(rois)-k)//l + 1 + + rois_slide = np.array([rois[i:i+k] for i in range(0,n_windows*l,l)]) + rois_slide_av = np.mean(rois_slide,axis=1) + + i_max_av = np.argmax(rois_slide_av) + + # snr + snr = (rois_slide_av-photo_init)/noise + print(rois) + print(rois_slide_av) + + # State of optimal injection + i_max = np.argmax(snr[i_max_av:i_max_av+k]) + act_max = acts[i_max] + + # Only continue when there is improvement in the ROI sampled readouts. + if (np.all(np.diff(rois_slide_av) > 0)): + stop = False + + if stop: + # If no improvement, stop is True, push back to the state of optimal injection sampled throughout the motion. + act_disp = act_max-act_post + speeds_return = np.array([0.0011,0.0011,0.0011,0.0011],dtype=np.float64) #TBD + pos_offset = self._actoffset(speeds_return,act_disp) + _,_,_,_,_,_ = self._move_abs_ttm_act(act_post,act_disp,speeds_return,pos_offset,config,False,0.010,t_delay-t_write) + return ########################################## # Performance characterization / Testing # ########################################## + def cam_read(self,dt): + # Function that retrieves the average ROI values - registered in the REDIS database - from the past dt seconds. + + # REDIS field names + names = ["roi1_avg","roi2_avg","roi3_avg","roi4_avg","roi5_avg","roi6_avg","roi7_avg","roi8_avg","roi9_avg","roi10_avg"] + + # Readout "dt" seconds back in time + t_start,t_stop = define_time(dt) + + output = [get_field(names[i],t_start,t_stop,False) for i in range(0,len(names))] + + return output + def cam_read_test(self,config): # Function to test the readout of the camera ROIs from the REDIS database @@ -2226,9 +2646,9 @@ def act_response_test_single(self,act_displacement,speed,act_name,act_index,offs def act_response_test_multi(self,act_displacements,len_speeds,act_name,act_index,offset,config=1): # Function to probe the actuator response for a range of displacements and speeds - # !!! To be used for displacements in ONE CONSISTENT DIRECTION (i.e. only positive / only negative displacements) + # To be used for displacements in ONE CONSISTENT DIRECTION (i.e. only positive / only negative displacements) - act_pos_align = [5.219526,5.4300675,3.4311585,3.94609] + act_pos_align = self.act_pos_align[config] if (act_index < 2): align_pos = (act_pos_align[0]+act_pos_align[1])/2 else: @@ -2276,12 +2696,12 @@ def act_response_test_multi(self,act_displacements,len_speeds,act_name,act_index print(i) return matrix_acc,times,positions - def act_backlash_test_multi(self,act_displacements,len_speeds,act_name,act_index,config=1): + def act_backlash_test_multi(self,act_displacements,len_speeds,act_name,act_index,offset,config=1): # Function to probe the backlash, remaining after incorporation of the empirical actuator offsets, for a range of displacements and speeds. # For each speed v and displacement dx, the actuator is moved by \pm dx at speed v and then the same displacement is reversed. The backlash is # characterised by how well the initial position (before any displacement) and the final position (after two displacements) agree. - act_pos_align = [5.219526,5.4300675,3.4311585,3.94609] + act_pos_align = self.act_pos_align[config] if (act_index < 2): align_pos = (act_pos_align[0]+act_pos_align[1])/2 else: @@ -2301,9 +2721,9 @@ def act_backlash_test_multi(self,act_displacements,len_speeds,act_name,act_index # Current position init_pos = self._get_actuator_pos(config)[0][act_index] # Step 1 - arr1,_,_ = self.act_response_test_single(disp,speeds[j],act_name,act_index,True,align_pos) + arr1,_,_ = self.act_response_test_single(disp,speeds[j],act_name,act_index,offset,align_pos) # Step 2 - arr2,_,_ = self.act_response_test_single(-disp,speeds[j],act_name,act_index,True,align_pos) + arr2,_,_ = self.act_response_test_single(-disp,speeds[j],act_name,act_index,offset,align_pos) # Final achieved position final_pos = arr2[2] # Backlash @@ -2336,33 +2756,278 @@ def actuator_performance(self): displacements_pos = np.geomspace(0.0005,0.025,grid_size) displacements_neg = np.geomspace(-0.0005,-0.025,grid_size) - for i in range(0,5): - # Backlash grids - matrix_back = self.act_backlash_test_multi(displacements_pos,grid_size,act_name,3) - np.save("backpos"+str(i+1),matrix_back) - matrix_back = self.act_backlash_test_multi(displacements_neg,grid_size,act_name,3) - np.save("backneg"+str(i+1),matrix_back) + for i in range(0,3): # Offset-accounted grids matrix_acc,times,positions = self.act_response_test_multi(displacements_pos,grid_size,act_name,3,True) np.save("offpos"+str(i+1),matrix_acc) matrix_acc,times,positions = self.act_response_test_multi(displacements_neg,grid_size,act_name,3,True) np.save("offneg"+str(i+1),matrix_acc) + for i in range(0,4): + # Backlash grids + matrix_back = self.act_backlash_test_multi(displacements_pos,grid_size,act_name,3,False) + np.save("backpos"+str(i+1),matrix_back) + matrix_back = self.act_backlash_test_multi(displacements_neg,grid_size,act_name,3,False) + np.save("backneg"+str(i+1),matrix_back) + # Backlash grids + matrix_back = self.act_backlash_test_multi(displacements_pos,grid_size,act_name,3,True) + np.save("backposoff"+str(i+1),matrix_back) + matrix_back = self.act_backlash_test_multi(displacements_neg,grid_size,act_name,3,True) + np.save("backnegoff"+str(i+1),matrix_back) return + def visible_camera_performance(self,N,pupilpar=False): + """ + + Parameters + ---------- + pupilpar : single boolean + True when you wish to study the performance of shifts imposed in the pupil plane. + N : single integer + How much iterations, i.e. random shifts, to go through. + + """ + # Selecting devices + tries = 0 + tries_max = 6 + sleep_time_secs = 10 + while tries < tries_max: # Wait for device for 60 seconds + # Only select the visible cameras (not the InfraTec): + device_list = [] + for device_info in system.device_infos: + if device_info["model"] == "PHX064S-M": + device_list.append(device_info) + # Connect to visible cameras + devices = system.create_device(device_list) + if not devices: + print( + f'Try {tries+1} of {tries_max}: waiting for {sleep_time_secs} ' + f'secs for a device to be connected!') + for sec_count in range(sleep_time_secs): + time.sleep(1) + print(f'{sec_count + 1 } seconds passed ', + '.' * sec_count, end='\r') + tries += 1 + else: + print(f'Created {len(devices)} device(s)\n') + break + else: + raise Exception(f'No device found! Please connect a device and run ' + f'the example again.') + + device_IM = system.select_device(devices) + device_PUPIL = system.select_device(devices) + print(f'Device used for image plane:\n\t{device_IM}') + print(f'Device used for pupil plane:\n\t{device_PUPIL}') + + # Get stream nodemap to set features before streaming + stream_nodemap_IM = device_IM.tl_stream_nodemap + stream_nodemap_PUPIL = device_PUPIL.tl_stream_nodemap + + # Enable stream auto negotiate packet size + stream_nodemap_IM['StreamAutoNegotiatePacketSize'].value = True + stream_nodemap_PUPIL['StreamAutoNegotiatePacketSize'].value = True + # Enable stream packet resend + stream_nodemap_IM['StreamPacketResendEnable'].value = True + stream_nodemap_PUPIL['StreamPacketResendEnable'].value = True + + nodemap_IM = device_IM.nodemap + nodemap_PUPIL = device_PUPIL.nodemap + + # Setting width, height, x/y offsets to limit the buffer size + + # Setting IMAGE plane nodemap config + nodemap_IM["Width"].value = 148 + nodemap_IM["Height"].value = 150 + nodemap_IM["OffsetX"].value = 1500 + nodemap_IM["OffsetY"].value = 1250 + # Setting PUPIL plane nodemap config TBC + nodemap_PUPIL["Width"].value = 3072 + nodemap_PUPIL["Height"].value = 2048 + nodemap_PUPIL["OffsetX"].value = 0 + nodemap_PUPIL["OffsetY"].value = 0 + + # Setting exposure mode and exposure time (lowest possible for the green laser) + nodemap_IM["ExposureAuto"].value = "Off" + nodemap_IM["ExposureTime"].value = 27.216 + nodemap_PUPIL["ExposureAuto"].value = "Off" + nodemap_PUPIL["ExposureTime"].value = 25000.0 + + # Gaussian PSF fitting radius estimates + rfit_im = 10 + rfit_pup = 500 + + def retrieve_pos(devicepar,nodemappar,rfit): + + with devicepar.start_stream(): + + # PREPARATION # + #-------------# + # Create buffer + buffer = devicepar.get_buffer() + # Numpy matrix containing image data + nparray = np.array(buffer.data, dtype=np.uint8) + nparray = nparray.reshape(buffer.height, buffer.width) + # Width + w = nodemappar["Width"].value + h = nodemappar["Height"].value + # Data + x = np.linspace(1,w,w) + y = np.linspace(1,h,h) + x,y = np.meshgrid(x,y) + # Indices of maximum + i = np.argmax(nparray)//w + j = np.argmax(nparray)%w + # FITTING # + #---------# + # Airy disk model + airy = models.AiryDisk2D(amplitude=np.max(nparray),x_0=j,y_0=i,radius=rfit,bounds={"amplitude":(0,1.5*np.max(nparray)),"x_0":(0,w),"y_0":(0,h),"radius":(0.1*rfit,2*rfit)}) + # Performing least squares fitting procedure + fit_ent = fitting.LevMarLSQFitter(calc_uncertainties=True) + pix = fit_ent(airy,x,y,nparray) + xfit,yfit=pix.x_0.value,pix.y_0.value + # PLOTTING + #fig = plt.figure(figsize=(10,10)) + #ax = fig.add_subplot(111) + #img = ax.imshow(nparray) + #ax.scatter(j,i, color="red",label="Max") + #ax.scatter(xfit,yfit,color="blue",label="Fit") + #ax.legend() + #fig.canvas.draw() + #fig.canvas.flush_events() + #fig.show() + # Requeue to release buffer memory + devicepar.requeue_buffer(buffer) + print("Fitted position : ", xfit,yfit) + return [xfit,yfit] + + def step(xstep,ystep): + # xstep : mm + # ystep : mm + def deproject(pos,theta): + x = pos[0] + y = pos[1] + x_prim = x*np.cos(theta)-y*np.sin(theta) + y_prim = x*np.sin(theta)+y*np.cos(theta) + return [x_prim,y_prim] + # Deprojection angles (FROM BENCH XY AXES TO CAMERA X'Y' AXES!) + angle_IM = 0 + angle_PUPIL = 0 + # 1) Retrieve initial position + pos_init_IM = retrieve_pos(device_IM,nodemap_IM,rfit_im) + pos_init_PUPIL = retrieve_pos(device_PUPIL,nodemap_PUPIL,rfit_pup) + # 2) Perform an individual step by the given dimensions, in the plane specified; with random speed. + speed=random.uniform(0.005,25)*10**(-3) + speeds = np.array([speed,speed,speed,speed],dtype=np.float64) # mm/s TBC + speeds[speeds<0.005*10**(-3)] = 0.005*10**(-3) + if pupilpar: + steps = np.array([xstep,ystep,0,0],dtype=np.float64) + else: + steps = np.array([0,0,xstep,ystep],dtype=np.float64) + # Performing the step + _,_,_,_,_,act_err,_ = self.individual_step(False,0,steps,speeds,1,False) + # 3) Retrieve final position + pos_final_IM = retrieve_pos(device_IM,nodemap_IM,rfit_im) + pos_final_PUPIL = retrieve_pos(device_PUPIL,nodemap_PUPIL,rfit_pup) + # Deproject the coordinates + pos_init_IM = deproject(pos_init_IM,angle_IM) + pos_init_PUPIL = deproject(pos_init_PUPIL,angle_PUPIL) + pos_final_IM = deproject(pos_final_IM,angle_IM) + pos_final_PUPIL = deproject(pos_final_PUPIL,angle_PUPIL) + # Calculating the shifts + xshift_IM = pos_final_IM[0]-pos_init_IM[0] + yshift_IM = pos_final_IM[1]-pos_init_IM[1] + xshift_PUPIL = pos_final_PUPIL[0]-pos_init_PUPIL[0] + yshift_PUPIL = pos_final_PUPIL[1]-pos_init_PUPIL[1] + # 4) Calculating the image x and y displacements, predicted by the framework from the actuator errors. + curr_pos = self._get_actuator_pos(1)[0] + ttm_acc = self._actuator_displacement_to_ttm_shift(curr_pos,act_err,1) + curr_ttm = self._actuator_position_to_ttm_angle(curr_pos,1) + Darr = self._snap_distance_grid(curr_ttm,1) + shifts = self._framework_numeric_int_reverse(ttm_acc,Darr,1) + # Returning x and y shifts in physical dimensions (micrometer) - each pixel is 2.40 um + return [np.array([pos_init_PUPIL[0],pos_init_PUPIL[1],pos_final_PUPIL[0],pos_final_PUPIL[1],pos_init_IM[0],pos_init_IM[1],pos_final_IM[0],pos_final_IM[1]],dtype=np.float64)*2.40*10**(-3),np.array([shifts[2],shifts[3],shifts[0],shifts[1]],dtype=np.float64),speed] #mm(/s) + + def rand_sign(): + return 1 if random.random() < 0.5 else -1 + + # Framework performance + # Step 1 : Align + self.align() + # Step 2 : Drawing random shifts and imposing the steps to the bench, note: only if the imposed state is valid (f.e. not off the slicer) + def step_validcheck(): + valid = False + while not valid: + valid = True + if pupilpar: # TBC + dx=rand_sign()*random.uniform(0.5,25)*10**(-3) + dy=rand_sign()*random.uniform(0.5,25)*10**(-3) + else: + dx=rand_sign()*random.uniform(0.5,25)*10**(-3) + dy=rand_sign()*random.uniform(0.5,25)*10**(-3) + try: + coordsx,errx,speed = step(dx,0) + coordsy,erry,speed = step(0,dy) + # If individual_step throws exception (state not valid), stay in while loop. + except ValueError: + valid = False + return coordsx,coordsy,errx,erry,dx,dy,speed + + # Shifts data container + acc = [] + pos = [] + for i in range(0,N): + coordsx,coordsy,errx,erry,dx,dy,speed = step_validcheck() + if pupilpar: + dx_err = np.abs(dx)-np.abs(shifts[2]) + dy_err = np.abs(dy)-np.abs(shifts[3]) + acc.append(np.array([dx,dy,dx_err,dy_err,shifts_iter[0],shifts_iter[1],speed,pupilpar],dtype=np.float64)) + else: + pos_init_x = coordsx[4:6] + pos_final_x = coordsx[6:8] + pos_init_y = coordsy[4:6] + pos_final_y = coordsy[6:8] + pos_init_x_PUP = coordsx[0:2] + pos_final_x_PUP = coordsx[2:4] + pos_init_y_PUP = coordsy[0:2] + pos_final_y_PUP = coordsy[2:4] + + err = errx+erry + + dx_ach = pos_final_y[0]-pos_init_x[0] + dy_ach = pos_final_y[1]-pos_init_x[1] + dx_pup = pos_final_y_PUP[0]-pos_init_x_PUP[0] + dy_pup = pos_final_y_PUP[1]-pos_init_x_PUP[1] + + dx_err = np.abs(dx)-np.abs(dx_ach) + dy_err = np.abs(dy)-np.abs(dy_ach) + dx_act_err = np.abs(dx+err[0])-np.abs(dx_ach) + dy_act_err = np.abs(dy+err[1])-np.abs(dy_ach) + print("Iteration "+str(i)+" : "+str([1000*dx_err,1000*dy_err])+" um errors on imposed displacements "+str([1000*dx,1000*dy])+" um.") + acc.append(np.array([dx,dy,dx_err,dy_err,dx_pup,dy_pup,speed,pupilpar,dx_act_err,dy_act_err],dtype=np.float64)) + pos.append(np.array([pos_init_x,pos_final_x,pos_init_y,pos_final_y,pos_init_x_PUP,pos_final_x_PUP,pos_init_y_PUP,pos_final_y_PUP],dtype=np.float64)) + + # Destroy the devices before returning + system.destroy_device(device=device_IM) + system.destroy_device(device=device_PUPIL) + # Saving result + np.save("Acc_frame", np.array(acc)) + np.save("Pos_frame",np.array(pos)) + return acc + def align(self,config=1): print("---------------------------------------------------------------------------") print("Bring beam to visual aligned state.") print("---------------------------------------------------------------------------") - pos_arr = np.array([5.219526,5.4300675,3.4311585,3.94609],dtype=np.float64) + pos_arr = np.array(self.act_pos_align[config],dtype=np.float64) # Necessary Displacements for Alignment curr_pos = self._get_actuator_pos(config)[0] disp_arr = pos_arr-curr_pos - speed_arr = np.abs(disp_arr.copy()/10) + speed_arr = np.array([0.021,0.021,0.021,0.021],dtype=np.float64) off = self._actoffset(speed_arr,disp_arr) self._move_abs_ttm_act(curr_pos,disp_arr,speed_arr,off,config,False,0.010,self._get_delay(100,True)-t_write) return - def algorithm_test(self): + def algorithm_test(self,K,step_opt,speed_opt,dt_opt,k_opt,l_opt): def kick_loc_opt(obj,config): def rand_sign(): @@ -2371,14 +3036,37 @@ def rand_sign(): dy=rand_sign()*random.uniform(20,50)*10**(-3) print("Kicking the beam away from aligned state, by random \pm 50 um distances in image plane (dx,dy) = ", (dx,dy)) steps = np.array([0,0,dx,dy]) - speed_arr = np.array([0.001,0.001,0.005/10,0.005/10],dtype=np.float64) + speed_arr = np.array([0.01,0.01,0.01,0.01],dtype=np.float64) # Kick away obj.individual_step(True,0,steps,speed_arr,1,False,0.010,self._get_delay(100,True)-t_write) + # Registering start time + t_start = time.time() # Spiraling to return - obj.localization_spiral(False,20,0.010,config,0.15) - obj.optimization_spiral(False,5,0.0025,config,0.15) - obj.optimization_spiral(False,1,0.0005,config,0.15) - return + obj.localization_spiral(False,20,0.010,config,0.10) + obj.optimization_cross(False,True,step_opt,speed_opt,1,dt_opt,k_opt,l_opt) + + #obj.optimization_spiral_gradient(False,5*10**(-3),0.0011,config,0.05,8) + #obj.optimization_spiral_gradient(False,3*10**(-3),0.0011,config,0.05,4) + #obj.optimization_spiral_gradient(False,1*10**(-3),0.0011,config,0.05,2) + #obj.optimization_spiral_gradient(False,0.2*10**(-3),0.0003,config,0.05,1) + + # Measuring end time + t_end = time.time() + # Time spent localizing and optimizing + t_spent = t_end-t_start + # Achieved ROI outputs + + # Delay time (total delay minus writing time) + t_delay = self._get_delay(100,True)-t_write + # Start time for exposure + t_start = self._get_time(1000*time.time(),t_delay) + # Sleep + time.sleep((500+t_write)*10**(-3)) + # Photometric output measurement + photo_ach = self._get_photo(Nexp,t_start,500,1) + print("Initial photometric output : ", photo_ach) + + return dx,dy,t_spent,photo_ach # Configuration parameters configpar = 1 # second beam @@ -2393,7 +3081,16 @@ def rand_sign(): act_name='NTPB2' curr_pos = self._get_actuator_pos(configpar)[0] print("Current actuator positions :", self._get_actuator_pos(configpar)[0]) - self.align(configpar) - kick_loc_opt(self,configpar) + + data = [] + + for i in range(0,K): + self.align(configpar) + dx,dy,t_spent,photo_ach = kick_loc_opt(self,configpar) + data.append([dx,dy,t_spent,photo_ach]) + + data_arr = np.array(data,dtype=np.float64) + np.save("AlgorithmTest",data_arr) + return \ No newline at end of file