-
Notifications
You must be signed in to change notification settings - Fork 4
Beam attenuation #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Beam attenuation #35
Changes from all commits
5b97d4f
9489e21
2dd5b96
a4a93a8
8454b50
5fd4afa
a046034
205f2c7
981e290
52e162f
6aab1d4
3450021
61477f6
a8ef9bf
0660377
8f605d1
5135f45
7d7c92f
08a33a4
f341268
2a216ac
eba6dd9
e2f16d3
2473895
dc71ab7
e5616dc
faaf823
dcdfb3f
50f999b
f52c573
5bbb671
d0fb3d2
9667175
beffc82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,3 +164,6 @@ Thumbs.db | |
| # IDEs | ||
| .vscode/ | ||
| .cursor/ | ||
|
|
||
| # Other | ||
| notes.* | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,246 @@ | ||||||||||
| from __future__ import annotations | ||||||||||
|
|
||||||||||
| import asyncio | ||||||||||
| import math | ||||||||||
| from dataclasses import dataclass | ||||||||||
|
|
||||||||||
| import numpy as np | ||||||||||
| import xrayutilities as xu | ||||||||||
| from ophyd_async.core import ( | ||||||||||
| AsyncMovable, | ||||||||||
| AsyncStatus, | ||||||||||
| DeviceVector, | ||||||||||
| StandardReadable, | ||||||||||
| StrictEnum, | ||||||||||
| ) | ||||||||||
| from ophyd_async.epics.core import EpicsDevice, epics_signal_r, epics_signal_rw | ||||||||||
|
|
||||||||||
| from cditools.motors import Energy | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @dataclass | ||||||||||
| class AttenuatorCombination: | ||||||||||
| transmission: float | ||||||||||
| attenuators: list[int] | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def attenuation(self): | ||||||||||
| return 1 - self.transmission | ||||||||||
|
|
||||||||||
|
|
||||||||||
| THICKNESSES = (16, 24, 66, 124) # microns | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would make each of these a combination of |
||||||||||
|
|
||||||||||
|
|
||||||||||
| class AttenuatorStatusEnum(StrictEnum): | ||||||||||
| LOW = "Low" # off / not obstructing | ||||||||||
| HIGH = "High" # on / obstructing | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class Attenuator(EpicsDevice, AsyncMovable[AttenuatorStatusEnum]): | ||||||||||
| filter_material = xu.materials.Al | ||||||||||
|
|
||||||||||
| def __init__(self, prefix: str, num: int, thickness: int): | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would type this as |
||||||||||
| """ | ||||||||||
| prefix - the common prefix for the attenuator bank | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use numpydoc conventions |
||||||||||
| num - an integer denoting which attenuator within the bank this is | ||||||||||
| thickness - the thickness of the attenuator in microns | ||||||||||
|
|
||||||||||
| position - the read / write PV to open and close the attenuator | ||||||||||
| """ | ||||||||||
| self.prefix = prefix | ||||||||||
| self.num = num | ||||||||||
| self.thickness = thickness # microns | ||||||||||
|
|
||||||||||
| self.position = epics_signal_rw( | ||||||||||
| AttenuatorStatusEnum, | ||||||||||
| f"{self.prefix}:DO{self.num}-Sts", | ||||||||||
| write_pv=f"{self.prefix}:DO{self.num}-Cmd", | ||||||||||
| ) | ||||||||||
| self.mode = epics_signal_rw(bool, f"{self.prefix}:DIO{self.num}-Mode") | ||||||||||
| self.in_status = epics_signal_r( | ||||||||||
| AttenuatorStatusEnum, f"{self.prefix}:DI{self.num}-Sts" | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| super().__init__(prefix=self.prefix) | ||||||||||
|
|
||||||||||
| def __repr__(self): | ||||||||||
| return f"{self.thickness!s} microns, {self.filter_material}" | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def name(self): | ||||||||||
| return f"attenuator_{self.num}" | ||||||||||
|
|
||||||||||
| @AsyncStatus.wrap | ||||||||||
| async def set(self, value: AttenuatorStatusEnum): | ||||||||||
| await self.position.set(value) | ||||||||||
|
|
||||||||||
| async def open(self): | ||||||||||
| """Open means open to allowing the beam to pass unobstructed""" | ||||||||||
| await self.position.set(AttenuatorStatusEnum.LOW) | ||||||||||
|
|
||||||||||
| async def close(self): | ||||||||||
| """Closed means obstructing the beam""" | ||||||||||
| await self.position.set(AttenuatorStatusEnum.HIGH) | ||||||||||
|
|
||||||||||
| def transmission(self, photon_energy: float, egu: str = "KeV"): | ||||||||||
| """Transmission is the fraction of beam remaining""" | ||||||||||
| abs_len = self._absorption_length(photon_energy, egu=egu) | ||||||||||
| return np.exp(-self.thickness / abs_len) | ||||||||||
|
|
||||||||||
| def attenuation(self, photon_energy: float, egu: str = "KeV"): | ||||||||||
| """Attenuation is the fraction of the beam removed""" | ||||||||||
| return 1 - self.transmission(photon_energy, egu=egu) | ||||||||||
|
|
||||||||||
| def _absorption_length(self, photon_energy: float, egu: str = "KeV") -> float: | ||||||||||
| """ | ||||||||||
| Calculates L, the characteristic absorption length of this material, | ||||||||||
| at this beam energy. | ||||||||||
|
|
||||||||||
| photon energy: the beam energy | ||||||||||
| egu: the engineering units of the beam energy (KeV or eV) | ||||||||||
| absorption length: the characteristic absorption length of the | ||||||||||
| filter material (microns) | ||||||||||
| """ | ||||||||||
| if egu == "KeV": | ||||||||||
| photon_energy = photon_energy * 1e3 | ||||||||||
| elif egu != "eV": | ||||||||||
| msg = "Photon energy units must be eV or KeV" | ||||||||||
| raise RuntimeError(msg) | ||||||||||
| return self.filter_material.absorption_length(photon_energy) # type: ignore[reportArgumentType] | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class AttenuatorBank(StandardReadable, EpicsDevice, AsyncMovable[float]): | ||||||||||
| """ | ||||||||||
| The ioc for the iologik1 lives on xf09id1-inst-ioc1.nsls2.bnl.gov | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| thicknesses = THICKNESSES | ||||||||||
|
|
||||||||||
| def __init__(self, prefix: str, energy: Energy): | ||||||||||
| self.prefix = prefix | ||||||||||
| self.energy = energy | ||||||||||
|
|
||||||||||
| with self.add_children_as_readables(): | ||||||||||
| self.attenuators = DeviceVector( | ||||||||||
| { | ||||||||||
| i: Attenuator(self.prefix, i, self.thicknesses[i - 1]) | ||||||||||
| for i in range(1, len(self.thicknesses) + 1) | ||||||||||
|
Comment on lines
+126
to
+127
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Slightly more idiomatic Python |
||||||||||
| } | ||||||||||
| ) | ||||||||||
| super().__init__(prefix=self.prefix) | ||||||||||
|
|
||||||||||
| @property | ||||||||||
| def photon_energy(self): | ||||||||||
| return self.energy.energy.readback.get() | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after a lot of back-and-forth we have avoided wrapping epics I/O in properties. The main arguments against are that it violates the expectation that properties are attributes which are fast (not arbitrarily long) and is not uniform across other devices. |
||||||||||
|
|
||||||||||
| @property | ||||||||||
| def egu(self): | ||||||||||
| return self.energy.egu | ||||||||||
|
|
||||||||||
| async def get_status(self): # type: ignore[reportUnknownParameterType] | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be |
||||||||||
| """ | ||||||||||
| Status polls the bluesky energy object for the current beam energy, and | ||||||||||
| returns that energy, each filter position, each transmission, and | ||||||||||
| the total transmission. | ||||||||||
| """ | ||||||||||
| status = {} | ||||||||||
| active_attens = [] | ||||||||||
| en = self.photon_energy | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are already in an async function here so
Suggested change
I may be wrong about the exact name, ophyd-async has a verb for "get me the one true number for this or explode". |
||||||||||
| egu = self.egu | ||||||||||
| positions = await asyncio.gather( | ||||||||||
| *(a.position.get_value() for _, a in self.attenuators.items()) | ||||||||||
| ) | ||||||||||
| for i, pos in zip(self.attenuators, positions): | ||||||||||
| atten = self.attenuators[i] | ||||||||||
| is_active = pos == AttenuatorStatusEnum.HIGH | ||||||||||
| if is_active: | ||||||||||
| active_attens.append(atten) | ||||||||||
| transmission = atten.transmission(en, egu) if is_active else 0 | ||||||||||
| status[atten.name] = {"active": is_active, "transmission": transmission} | ||||||||||
| status["active_attenuators"] = [a.num for a in active_attens] | ||||||||||
| status["photon_energy"] = en | ||||||||||
| status["egu"] = egu | ||||||||||
| status["total_transmission"] = self._calculate_total_transmission( | ||||||||||
| *active_attens | ||||||||||
| ) | ||||||||||
| return status | ||||||||||
|
|
||||||||||
| @AsyncStatus.wrap | ||||||||||
| async def set(self, value: float): | ||||||||||
| """Set the transmission for the attenuator bank""" | ||||||||||
| attenuation_combination = self.find_closest_transmission(value) | ||||||||||
| coros = [] | ||||||||||
| for ( | ||||||||||
| num, | ||||||||||
| atten, | ||||||||||
| ) in self.attenuators.items(): | ||||||||||
| if num in attenuation_combination.attenuators: | ||||||||||
| coros.append(atten.close()) | ||||||||||
| else: | ||||||||||
| coros.append(atten.open()) | ||||||||||
| await asyncio.gather(*coros) | ||||||||||
|
|
||||||||||
| def find_closest_transmission( | ||||||||||
| self, target_transmission: float | ||||||||||
| ) -> AttenuatorCombination: | ||||||||||
| """ | ||||||||||
| This could be faster if we implemented binary search, | ||||||||||
| but that seems like overkill for our use case. The search space | ||||||||||
| is small, so we start in the middle, and work up or down. | ||||||||||
| """ | ||||||||||
| available_attenuations = self._calculate_available_transmissions() | ||||||||||
| best_idx = len(available_attenuations) // 2 | ||||||||||
| atten = available_attenuations[best_idx].transmission | ||||||||||
| diff = float("inf") | ||||||||||
| new_diff = abs(target_transmission - atten) | ||||||||||
| inc = 1 if target_transmission > atten else -1 | ||||||||||
|
|
||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or As there are physical attenuator banks we are always going to be in the realm of small numbers and my expectation on the relative runtimes is that computing the transmission is >> more expensive that this search. To some degree, the sort in |
||||||||||
| while new_diff < diff: | ||||||||||
| diff = new_diff | ||||||||||
| # break if we are about to check outside the list | ||||||||||
| if best_idx + inc >= len(available_attenuations) or best_idx + inc < 0: | ||||||||||
| break | ||||||||||
| atten = available_attenuations[best_idx + inc].transmission | ||||||||||
| new_diff = abs(target_transmission - atten) | ||||||||||
| if new_diff < diff: | ||||||||||
| best_idx += inc | ||||||||||
| else: # if diff did not change, then we have found the best option | ||||||||||
| break | ||||||||||
| # TODO - should return just the found attentuation? or also the | ||||||||||
| # requested attenuation and/or the difference? | ||||||||||
| return available_attenuations[best_idx] | ||||||||||
|
|
||||||||||
| def _calculate_available_transmissions(self) -> list[AttenuatorCombination]: | ||||||||||
| """ | ||||||||||
| Calculates all possible transmissions for the attenuator bank, using | ||||||||||
| the powerset of the available attenuators. | ||||||||||
| """ | ||||||||||
| available_transmissions = [] | ||||||||||
| for combination in self._powerset(): | ||||||||||
| attens = [self.attenuators[a] for a in self.attenuators if a in combination] | ||||||||||
| total_transmission = self._calculate_total_transmission(*attens) | ||||||||||
| available_transmissions.append( | ||||||||||
| AttenuatorCombination(total_transmission, combination) | ||||||||||
| ) | ||||||||||
| # We want the available attenuations sorted so we can efficiently search through them | ||||||||||
| available_transmissions.sort(key=lambda a: a.transmission) # type: ignore[attr-defined] | ||||||||||
| return available_transmissions | ||||||||||
|
|
||||||||||
| def _calculate_total_transmission(self, *attenuators: Attenuator) -> float: | ||||||||||
| transmissions = [ | ||||||||||
| a.transmission(self.photon_energy, self.egu) for a in attenuators | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than get the energy here, pass it in. This would both open up a path to having an extra API of "what is the best combination if I changed to a different energy" without having to actually change the energy. In both set/read you are in an async function so can do the read and push it down. |
||||||||||
| ] | ||||||||||
| return round(float(math.prod(transmissions)), 3) | ||||||||||
|
|
||||||||||
| def _powerset(self) -> list[list[int]]: | ||||||||||
| """ | ||||||||||
| This is a famously O(n*2^n) problem. | ||||||||||
| """ | ||||||||||
| powerset = [] | ||||||||||
| for i in range(1 << len(self.attenuators)): | ||||||||||
| combination = [] | ||||||||||
| for j in range(len(self.attenuators)): | ||||||||||
| if i & (1 << j): | ||||||||||
| combination.append(j + 1) # +1 because attenuators are 1-indexed | ||||||||||
| powerset.append(combination) | ||||||||||
| return powerset | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put the SHA pinning back.