Skip to content
148 changes: 122 additions & 26 deletions src/abel/classes/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,25 @@ class Runnable(ABC):

# shot tracking function (to be repeated)
def perform_shot(self, shot):

self.shot = shot
self.step = self.steps[shot]



# apply scan function if it exists
if self.scan_fcn is not None:
vals_all = np.repeat(self.vals,self.num_shots_per_step)
vals_all = np.repeat(self.vals, self.num_shots_per_step)

#print('inside perform_shot') #TODO: delete
self.scan_fcn(self, vals_all[shot])

print('self.scan_fcn(self, vals_all[shot]): ', self.scan_fcn(self, vals_all[shot]))

print(self.vals)
print(self.plasma_lens.offset_x) # TODO:delete

#print('inside perform_shot') #TODO: delete

# check if object exists
if not self.overwrite and os.path.exists(self.object_path(shot)):
Expand All @@ -46,7 +57,7 @@ def perform_shot(self, shot):
## SCAN FUNCTIONALITY

def is_scan(self):
return self.scan_fcn is not None
return hasattr(self, 'scan_fcn') and self.scan_fcn is not None

# scan function
def scan(self, run_name=None, fcn=None, vals=[None], label=None, scale=1, num_shots_per_step=1, step_filter=None, shot_filter=None, savedepth=2, verbose=None, overwrite=False, parallel=False, max_cores=16):
Expand Down Expand Up @@ -105,6 +116,8 @@ def scan(self, run_name=None, fcn=None, vals=[None], label=None, scale=1, num_sh

# recalculate number of cores used
num_cores = min(max_cores, len(shots_to_perform))

#print('inside scan') #TODO: delete

# perform parallel tracking
with joblib_progress('Tracking shots ('+str(num_cores)+' in parallel)', len(shots_to_perform)):
Expand Down Expand Up @@ -256,8 +269,12 @@ def final_beam(self):
## Support for Bayesian optimisation through Ax.optimize

def set_parameters(self, params):
for key in params.keys():
self.set_attr(key, params[key])
print(type(params)) # TODO: delete
print(params[0]) # TODO: delete

for param in params:
for key in param.keys():
self.set_attr(key, param[key])

# Setting attribute
def set_attr(self, attr, val):
Expand Down Expand Up @@ -299,6 +316,9 @@ def evaluation_function(params):

# set the parameters
opt_fcn = lambda obj, _: obj.set_parameters(params)


print('hello inside evaluation_function') # TODO: delete

# run the simulations
self.scan(run_name=self.run_name, fcn=opt_fcn, vals=np.arange(num_steps), step_filter=self.step, num_shots_per_step=num_shots_per_step, savedepth=savedepth, verbose=verbose, overwrite=overwrite, parallel=parallel, max_cores=max_cores, label=label)
Expand All @@ -321,22 +341,76 @@ def evaluation_function(params):

return val_mean


# Make the list of dict parameters compatible with ax.api.client.configure_experiment()
from ax.api.client import Client
from ax.api.configs import RangeParameterConfig

def config_parameters(parameters):
exp_params = [None for _ in range(len(parameters))]
for i in range(len(parameters)):

if 'step_size' not in parameters[i]:
step_size = None
if 'scaling' not in parameters[i]:
scaling = None

exp_params[i] = [
RangeParameterConfig(
name=parameters[i]['name'],
bounds=parameters[i]['bounds'],
parameter_type=parameters[i]['type'],
step_size=step_size,
scaling=scaling
)
]
return exp_params



# perform optimization
from ax import optimize as bayes_opt
best_parameters, best_values, experiment, model = bayes_opt(
parameters = parameters,
evaluation_function = evaluation_function,
minimize = True,
total_trials = num_steps
)

# from ax.service.managed_loop import optimize as bayes_opt
# best_parameters, best_values, experiment, model = bayes_opt(
# parameters = parameters,
# evaluation_function = evaluation_function,
# minimize = True,
# total_trials = num_steps
# )


client = Client()

out = evaluation_function(parameters)
print(config_parameters(parameters)) #TODO: delete


client.configure_experiment(parameters=config_parameters(parameters))

client.configure_optimization(objective="-merit_fcn")
for _ in range(num_steps):
for trial_index, parameters in client.get_next_trials(max_trials=1).items():

result = evaluation_function(parameters)

# Set raw_data as a dictionary with metric names as keys and results as values
raw_data = {merit_fcn: result}

client.complete_trial(
trial_index=trial_index,
raw_data=raw_data,
)

best_parameters, best_values, index, name = client.get_best_parameterization()

return best_parameters, best_values


def extract_beam_function(self, beam_fcn, index=-1, clean=False):
"""
Extract mean and standard deviation value of beam parameters across a
scan.
scan. For each scan step, the mean and standard deviation are calculated
across all shots in the scan step.

Parameters
----------
Expand All @@ -356,9 +430,13 @@ def extract_beam_function(self, beam_fcn, index=-1, clean=False):

Returns
----------
vals : 2D ndarray
Each column in ``vals`` corresponds to a scan step, each element in the
column corresponds to a shot.
val_mean : float or 1D ndarray
The mean values of ``beam_fcn``. Each column in ``val_mean``
corresponds to a scan step.

val_std : float or 1D ndarray
The standard deviations of ``beam_fcn``. Each column in ``val_std``
corresponds to a scan step.
"""

import inspect
Expand All @@ -374,35 +452,53 @@ def extract_beam_function(self, beam_fcn, index=-1, clean=False):
def extract_function(self, fcn, clean=False):

import inspect

# Prepare the arrays
val_mean = np.empty(self.num_steps)
val_std = np.empty(self.num_steps)
input_list = inspect.signature(fcn).parameters

# Extract values
if self.is_scan():

# Prepare the arrays
val_mean = np.empty(self.num_steps)
val_std = np.empty(self.num_steps)

for step in range(self.num_steps):

# Get values for this step
val_output = np.empty(self.num_shots_per_step)
for shot_in_step in range(self.num_shots_per_step):

input_list = inspect.signature(fcn).parameters
if 'clean' in input_list: # Check if the input list contains clean.
val_output[shot_in_step] = fcn(self[step, shot_in_step], clean=clean)
else:
val_output[shot_in_step] = fcn(self[step, shot_in_step])

# Get step mean and error
val_mean[step] = np.mean(val_output)
val_std[step] = np.std(val_output)

# Get step mean and error
val_mean[step] = np.mean(val_output)
val_std[step] = np.std(val_output)

elif not self.is_scan() and self.num_shots > 0:
val_output = np.empty(self.num_shots)
for shot in range(self.num_shots):

if 'clean' in input_list: # Check if the input list contains clean.
val_output[shot] = fcn(self[shot], clean=clean)
else:
val_output[shot] = fcn(self[shot])

# Get shot mean and error
val_mean = np.mean(val_output)
val_std = np.std(val_output)

else:
raise ValueError('Shots not found.')

return val_mean, val_std


def scan_steps_beam_params(self, beam_fcn, clean=False):
"""
Extract beam parameter data across all scan steps and shots.
Extract output beam parameter data for all shots in each scan step and
store these separately.

Parameters
----------
Expand Down
Loading