from __future__ import annotations
import re
import warnings
from collections import namedtuple
from functools import lru_cache
from typing import NamedTuple
import h5py
import numpy as np
from numba import njit, prange
from numpy.typing import NDArray
from jungfrau_utils.geometry import DetectorGeometry, detector_geometry
warnings.filterwarnings("default", category=DeprecationWarning)
CHIP_SIZE_X: int = 256
CHIP_SIZE_Y: int = 256
CHIP_NUM_X: int = 4
CHIP_NUM_Y: int = 2
MODULE_SIZE_X: int = CHIP_NUM_X * CHIP_SIZE_X
MODULE_SIZE_Y: int = CHIP_NUM_Y * CHIP_SIZE_Y
CHIP_GAP_X: int = 2
CHIP_GAP_Y: int = 2
MODULE_FULL_SIZE_X: int = MODULE_SIZE_X + (CHIP_NUM_X - 1) * CHIP_GAP_X
MODULE_FULL_SIZE_Y: int = MODULE_SIZE_Y + (CHIP_NUM_Y - 1) * CHIP_GAP_Y
# 256 not divisible by 3, so we round up to 86
# the last 4 pixels can be omitted, so the final height is (256 - 4) / 3 = 84
# 18 since we have 6 more pixels in H per gap
STRIPSEL_SIZE_X: int = 1024 * 3 + 18 # = 3090
STRIPSEL_SIZE_Y: int = 84
# a vertical stripsel detector
STRIPSEL_JF18_SIZE_X: int = 165
STRIPSEL_JF18_SIZE_Y: int = 1488
[docs]
class JFDataHandler:
"""A class to perform jungfrau detector data handling like pedestal correction,
gain conversion, pixel mask, module map, etc.
Args:
detector_name (str): name of a detector in the form ``JF<id>T<nmod>V<version>``
"""
def __init__(self, detector_name: str) -> None:
# detector_name needs to be a valid string
if not (isinstance(detector_name, str) and re.match(r"^JF\d+T\d+V\d+$", detector_name)):
raise ValueError("detector_name must be a string in the form 'JF<id>T<nmod>V<version>'")
if detector_name == "JF02T09V01":
warnings.warn(
"Support for JF02T09V01 is deprecated and will be removed in jungfrau_utils/4.0",
DeprecationWarning,
)
detector_name_noVer = detector_name.split("V")[0]
# Search for a direct match between a detector_name and one of the detector_geometry keys
if detector_name in detector_geometry:
self._detector_geometry = detector_geometry[detector_name]
# If no direct match, search for a detector_name without the version number
elif detector_name_noVer in detector_geometry:
self._detector_geometry = detector_geometry[detector_name_noVer]
else:
raise ValueError(f"Geometry for '{detector_name}' detector is not present.")
self._detector_name = detector_name
self._gain_file: str = ""
self._pedestal_file: str = ""
# these values store the original gains/pedestal values
self._gain: NDArray | None = None
self._pedestal: NDArray | None = None
self._pixel_mask: NDArray | None = None
self._factor: float | None = None
self._highgain: bool = False
# gain and pedestal arrays with better memory layout for the actual data conversion
self._g_all: dict[bool, NDArray | None] = {True: None, False: None}
self._p_all: dict[bool, NDArray | None] = {True: None, False: None}
self._module_map: NDArray = np.arange(self.detector.n_modules)
@property
def detector_name(self) -> str:
"""Detector name (readonly)."""
return self._detector_name
@property
def detector_geometry(self) -> DetectorGeometry:
"""Detector geometry configuration (readonly)."""
return self._detector_geometry
[docs]
def is_stripsel(self) -> bool:
"""Return true if detector is a stripsel."""
warnings.warn(
"is_stripsel() is deprecated and will be removed in jungfrau_utils/4.0",
DeprecationWarning,
)
return self.detector_geometry.is_stripsel
@property
def detector(self) -> NamedTuple:
"""A namedtuple of detector parameters extracted from its name (readonly)."""
det = namedtuple("Detector", ["id", "n_modules", "version"])
return det(*(int(d) for d in re.findall(r"\d+", self.detector_name)))
@property
def gain_file(self) -> str:
"""Return current gain filepath."""
return self._gain_file
@gain_file.setter
def gain_file(self, filepath: str) -> None:
if not filepath:
self._gain_file = ""
self.gain = None
return
if filepath == self._gain_file:
return
with h5py.File(filepath, "r") as h5f:
gains = h5f["/gains"][:]
self._gain_file = filepath
self.gain = gains
@property
def gain(self) -> NDArray | None:
"""Current gain values."""
return self._gain
@gain.setter
def gain(self, value: NDArray | None) -> None:
if value is None:
self._gain = None
return
if value.ndim != 3:
raise ValueError(f"Expected gain dimensions 3, provided {value.ndim}.")
g_shape = (4, *self._shape_in_full)
g_shape_v2 = (6, *self._shape_in_full)
if value.shape != g_shape and value.shape != g_shape_v2:
raise ValueError(
f"Expected gain shape {g_shape} or {g_shape_v2}, provided {value.shape}."
)
# convert _gain values to float32
self._gain = value.astype(np.float32, copy=False)
self._update_g_all()
def _update_g_all(self) -> None:
if self.factor is None:
_g = 1 / self._gain
else:
# self.factor is one number and self.gain is a large array, so this order of division
# will avoid double broadcasting
_g = 1 / self.factor / self._gain
self._g_all[False] = np.stack((_g[0], _g[1], _g[2], _g[2]))
g_shape = (4, *self._shape_in_full) # g_shape_v2 = (6, *self._shape_in_full)
if _g.shape == g_shape:
self._g_all[True] = np.stack((_g[3], _g[3], _g[3], _g[3]))
else: # _g.shape == g_shape_v2
self._g_all[True] = np.stack((_g[3], _g[4], _g[5], _g[5]))
@property
def _g(self) -> NDArray | None:
return self._g_all[self.highgain]
@property
def pedestal_file(self) -> str:
"""Return current pedestal filepath."""
return self._pedestal_file
@pedestal_file.setter
def pedestal_file(self, filepath: str) -> None:
if not filepath:
self._pedestal_file = ""
self.pedestal = None
self.pixel_mask = None
return
if filepath == self._pedestal_file:
return
with h5py.File(filepath, "r") as h5f:
pedestal = h5f["/gains"][:]
pixel_mask = h5f["/pixel_mask"][:]
self._pedestal_file = filepath
self.pedestal = pedestal
self.pixel_mask = pixel_mask
@property
def pedestal(self) -> NDArray | None:
"""Current pedestal values."""
return self._pedestal
@pedestal.setter
def pedestal(self, value: NDArray | None) -> None:
if value is None:
self._pedestal = None
return
if value.ndim != 3:
raise ValueError(f"Expected pedestal dimensions 3, provided {value.ndim}.")
p_shape = (4, *self._shape_in_full)
p_shape_v2 = (3, *self._shape_in_full)
if value.shape != p_shape and value.shape != p_shape_v2:
raise ValueError(
f"Expected pedestal shape {p_shape} or {p_shape_v2}, provided {value.shape}."
)
# convert _pedestal values to float32
self._pedestal = value.astype(np.float32, copy=False)
_p = self._pedestal
self._p_all[False] = np.stack((_p[0], _p[1], _p[2], _p[2]))
if _p.shape == p_shape:
self._p_all[True] = np.stack((_p[3], _p[3], _p[3], _p[3]))
else: # _p.shape == p_shape_v2
self._p_all[True] = np.stack((_p[0], _p[1], _p[2], _p[2]))
@property
def _p(self) -> NDArray | None:
return self._p_all[self.highgain]
@property
def pixel_mask(self) -> NDArray | None:
"""Current raw pixel mask values."""
return self._pixel_mask
@pixel_mask.setter
def pixel_mask(self, value: NDArray | None) -> None:
if value is not None:
if value.ndim != 2:
raise ValueError(f"Expected pixel_mask dimensions 2, provided {value.ndim}.")
pm_shape = self._shape_in_full
if value.shape != pm_shape:
raise ValueError(f"Expected pixel_mask shape {pm_shape}, provided {value.shape}.")
self._pixel_mask = value
self.get_pixel_mask.cache_clear()
self._mask.cache_clear()
@lru_cache(maxsize=8)
def _mask(self, double_pixels: str, module_edge_pixels: str) -> NDArray | None:
if self.pixel_mask is None:
return None
mask = np.invert(self.pixel_mask.astype(bool))
if double_pixels == "mask":
for m in range(self.detector.n_modules):
module_mask = self._get_module_slice(mask, m)
for n in range(1, CHIP_NUM_X):
module_mask[:, CHIP_SIZE_X * n - 1] = False
module_mask[:, CHIP_SIZE_X * n] = False
for n in range(1, CHIP_NUM_Y):
module_mask[CHIP_SIZE_Y * n - 1, :] = False
module_mask[CHIP_SIZE_Y * n, :] = False
if module_edge_pixels == "mask":
for m in range(self.detector.n_modules):
module_mask = self._get_module_slice(mask, m)
module_mask[0, :] = False
module_mask[-1, :] = False
module_mask[:, 0] = False
module_mask[:, -1] = False
return mask
@property
def factor(self) -> float | None:
"""A factor value.
If conversion is True, use this factor to divide converted values. The output values are
also rounded and casted to np.int32 dtype. Keep the original values if None.
"""
return self._factor
@factor.setter
def factor(self, value: float | None) -> None:
if value is not None:
value = float(value)
if self._factor == value:
return
self._factor = value
if self.gain is not None:
self._update_g_all()
@property
def highgain(self) -> bool:
"""Current flag for highgain."""
return self._highgain
@highgain.setter
def highgain(self, value: bool) -> None:
if not isinstance(value, bool):
value = bool(value)
self._highgain = value
@property
def module_map(self) -> NDArray:
"""Current module map."""
return self._module_map
@module_map.setter
def module_map(self, value: NDArray) -> None:
n_modules = self.detector.n_modules
if value is None:
# support legacy data by emulating 'all modules are present'
value = np.arange(n_modules)
if np.array_equal(self._module_map, value):
return
if len(value) != n_modules:
raise ValueError(f"Expected module_map length {n_modules}, provided {len(value)}.")
if min(value) < -1 or n_modules <= max(value):
raise ValueError(f"Valid module_map values are integers between -1 and {n_modules-1}.")
self._module_map = value
self.get_pixel_mask.cache_clear()
def _get_shape_in_n_modules(self, n: int) -> tuple[int, int]:
if self.detector_name == "JF02T09V01": # a special case
shape_y, shape_x = MODULE_SIZE_Y, MODULE_SIZE_X * n
else:
shape_y, shape_x = MODULE_SIZE_Y * n, MODULE_SIZE_X
return shape_y, shape_x
@property
def _shape_in_full(self) -> tuple[int, int]:
return self._get_shape_in_n_modules(self.detector.n_modules)
@property
def _shape_in(self) -> tuple[int, int]:
return self._get_shape_in_n_modules(self._n_active_modules)
@property
def _n_active_modules(self) -> int:
return np.sum(self.module_map != -1)
def _get_shape_out(self, gap_pixels: bool, geometry: bool) -> tuple[int, int]:
"""Return the image shape of a detector without a full-detector rotation step.
Args:
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
Returns:
tuple: Height and width of a resulting image.
"""
if self.detector_geometry.is_stripsel:
return self._get_stripsel_shape_out(geometry=geometry)
if geometry:
module_size_x = MODULE_FULL_SIZE_X if gap_pixels else MODULE_SIZE_X
module_size_y = MODULE_FULL_SIZE_Y if gap_pixels else MODULE_SIZE_Y
origin_y = self.detector_geometry.origin_y
origin_x = self.detector_geometry.origin_x
mod_rot90 = self.detector_geometry.mod_rot90
oy_end_all = []
ox_end_all = []
for oy, ox, mr90 in zip(origin_y, origin_x, mod_rot90):
if mr90 == 1:
oy -= module_size_x
elif mr90 == 2:
oy -= module_size_y
ox -= module_size_x
elif mr90 == 3:
ox -= module_size_y
if mr90 % 2 == 0:
oy_end = oy + module_size_y
ox_end = ox + module_size_x
else:
oy_end = oy + module_size_x
ox_end = ox + module_size_y
oy_end_all.append(oy_end)
ox_end_all.append(ox_end)
shape_y = max(oy_end_all)
shape_x = max(ox_end_all)
else:
shape_y, shape_x = self._shape_in
if gap_pixels:
if self.detector_name == "JF02T09V01":
shape_y += (CHIP_NUM_Y - 1) * CHIP_GAP_Y
shape_x += (CHIP_NUM_X - 1) * CHIP_GAP_X * self._n_active_modules
else:
shape_y += (CHIP_NUM_Y - 1) * CHIP_GAP_Y * self._n_active_modules
shape_x += (CHIP_NUM_X - 1) * CHIP_GAP_X
return shape_y, shape_x
[docs]
def get_shape_out(self, *, gap_pixels: bool = True, geometry: bool = True) -> tuple[int, int]:
"""Return the final image shape of a detector.
Args:
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
Returns:
tuple: Height and width of a resulting image.
"""
shape_y, shape_x = self._get_shape_out(gap_pixels, geometry)
if geometry and self.detector_geometry.det_rot90 % 2:
shape_y, shape_x = shape_x, shape_y
return shape_y, shape_x
def _get_stripsel_shape_out(self, geometry: bool) -> tuple[int, int]:
if geometry:
if self.detector_name.startswith("JF18"):
shape_x = max(self.detector_geometry.origin_x) + STRIPSEL_JF18_SIZE_X
shape_y = max(self.detector_geometry.origin_y) + STRIPSEL_JF18_SIZE_Y
else:
shape_x = max(self.detector_geometry.origin_x) + STRIPSEL_SIZE_X
shape_y = max(self.detector_geometry.origin_y) + STRIPSEL_SIZE_Y
else:
shape_y, shape_x = self._shape_in
return shape_y, shape_x
[docs]
def get_dtype_out(self, dtype_in: np.dtype, *, conversion: bool = True) -> np.dtype:
"""Return resulting image dtype of a detector.
Args:
dtype_in (dtype): dtype of an input data.
conversion (bool, optional): Whether data is expected to be converted to keV (apply gain
and pedestal corrections). Defaults to True.
Returns:
dtype: dtype of a resulting image.
"""
if conversion:
if dtype_in != np.uint16:
raise TypeError(f"Only images of dtype {np.dtype(np.uint16)} can be converted.")
if self.factor is None:
dtype_out = np.dtype(np.float32)
else:
dtype_out = np.dtype(np.int32)
else:
dtype_out = dtype_in
return dtype_out
def _get_module_coords(self, m: int, i: int, geometry: bool, gap_pixels: bool) -> tuple:
if self.detector_geometry.is_stripsel and geometry:
if self.detector_name.startswith("JF18"):
module_size_y = STRIPSEL_JF18_SIZE_Y
module_size_x = STRIPSEL_JF18_SIZE_X
else:
module_size_y = STRIPSEL_SIZE_Y
module_size_x = STRIPSEL_SIZE_X
else:
module_size_y = MODULE_FULL_SIZE_Y if gap_pixels else MODULE_SIZE_Y
module_size_x = MODULE_FULL_SIZE_X if gap_pixels else MODULE_SIZE_X
if geometry: # gap pixels are already accounted for in module geometry coordinates
oy = self.detector_geometry.origin_y[i]
ox = self.detector_geometry.origin_x[i]
det_rot90 = self.detector_geometry.det_rot90
if det_rot90:
shape_y, shape_x = self._get_shape_out(gap_pixels, geometry)
if det_rot90 == 1: # (x, y) -> (y, -x)
ox, oy = oy, shape_x - ox
elif det_rot90 == 2: # (x, y) -> (-x, -y)
ox, oy = shape_x - ox, shape_y - oy
elif det_rot90 == 3: # (x, y) -> (-y, x)
ox, oy = shape_y - oy, ox
mod_rot90 = self.detector_geometry.mod_rot90[i]
mod_rot90 = (mod_rot90 + det_rot90) % 4
if mod_rot90 == 1:
oy -= module_size_x
elif mod_rot90 == 2:
oy -= module_size_y
ox -= module_size_x
elif mod_rot90 == 3:
ox -= module_size_y
else:
mod_rot90 = 0
if self.detector_name == "JF02T09V01":
oy = 0
ox = m * module_size_x
else:
oy = m * module_size_y
ox = 0
if mod_rot90 % 2 == 0:
oy_end = oy + module_size_y
ox_end = ox + module_size_x
else:
oy_end = oy + module_size_x
ox_end = ox + module_size_y
return oy, oy_end, ox, ox_end, mod_rot90
def _get_module_slice(self, data: NDArray, m: int) -> NDArray:
if self.detector_name == "JF02T09V01":
out = data[..., :, m * MODULE_SIZE_X : (m + 1) * MODULE_SIZE_X]
else:
out = data[..., m * MODULE_SIZE_Y : (m + 1) * MODULE_SIZE_Y, :]
return out
[docs]
def process(
self,
images: NDArray,
*,
conversion: bool = True,
mask: bool = True,
gap_pixels: bool = True,
double_pixels: str = "keep",
module_edge_pixels: str = "keep",
geometry: bool = True,
parallel: bool = False,
out: NDArray | None = None,
) -> NDArray:
"""Perform jungfrau detector data processing like pedestal correction, gain conversion,
applying pixel mask, module map, etc.
Args:
images (ndarray): Image stack or single image to be processed
conversion (bool, optional): Convert to keV (apply gain and pedestal corrections).
Defaults to True.
mask (bool, optional): Perform masking of bad pixels (set those values to 0).
Defaults to True.
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
double_pixels (str, optional): A method to handle double pixels in-between ASICs. Can be
"keep", "mask", or "interp". Defaults to "keep".
module_edge_pixels (str, optional): A method to handle pixels at the module edges. Can
be "keep" or "mask". Defaults to "keep".
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
parallel (bool, optional): Parallelize image stack processing. Defaults to False.
out (ndarray, optional): If provided, the destination to place the result. The shape
must be correct, matching that of what the function would have returned if no out
argument were specified. Defaults to None.
Returns:
ndarray: Resulting image stack or single image
"""
# handle 2D data by adding a singleton dimension, making it 3D
if images.ndim == 2:
is_2darray = True
images = images[np.newaxis]
else:
is_2darray = False
n_images, image_shape = images.shape[0], images.shape[1:]
if image_shape not in (self._shape_in, self._shape_in_full):
raise ValueError(
f"Expected image shape {self._shape_in} or {self._shape_in_full}, "
f"provided {image_shape}."
)
if conversion and not self.can_convert():
raise RuntimeError("Gain and/or pedestal values are not set.")
if mask and self._pixel_mask is None:
raise RuntimeError("Pixel mask values are not set.")
if double_pixels not in ("keep", "mask", "interp"):
raise ValueError("'double_pixels' can only be 'keep', 'mask', or 'interp'")
if not mask and double_pixels == "mask":
warnings.warn(
'\'double_pixels="mask"\' has no effect when "mask"=False', RuntimeWarning
)
if double_pixels == "interp" and not gap_pixels:
raise RuntimeError("Double pixel interpolation requires 'gap_pixels' to be True.")
if double_pixels == "interp" and self.factor is not None:
raise ValueError("Unsupported mode: double_pixels='interp' with a factor value.")
if self.detector_geometry.is_stripsel and gap_pixels:
warnings.warn("'gap_pixels' flag has no effect on stripsel detectors", RuntimeWarning)
gap_pixels = False
if self.detector_geometry.is_stripsel and double_pixels != "keep":
warnings.warn(
"Handling double pixels has no effect on stripsel detectors", RuntimeWarning
)
double_pixels = "keep"
if module_edge_pixels not in ("keep", "mask"):
raise ValueError("'module_edge_pixels' can only be 'keep' or 'mask'")
if not mask and module_edge_pixels == "mask":
warnings.warn(
'\'module_edge_pixels="mask"\' has no effect when "mask"=False', RuntimeWarning
)
out_shape = self.get_shape_out(gap_pixels=gap_pixels, geometry=geometry)
stack_out_shape = (n_images, *out_shape)
if out is None:
out_dtype = self.get_dtype_out(images.dtype, conversion=conversion)
out = np.zeros(stack_out_shape, dtype=out_dtype)
else:
if out.shape != stack_out_shape:
raise ValueError(f"Expected out shape {stack_out_shape}, provided {out.shape}.")
if parallel:
adc_to_energy = _adc_to_energy_par_jit
if self.detector_name.startswith("JF18"):
reshape_stripsel = _reshape_stripsel_jf18_par_jit
else:
reshape_stripsel = _reshape_stripsel_par_jit
inplace_interp_dp = _inplace_interp_dp_par_jit
else:
adc_to_energy = _adc_to_energy_jit
if self.detector_name.startswith("JF18"):
reshape_stripsel = _reshape_stripsel_jf18_jit
else:
reshape_stripsel = _reshape_stripsel_jit
inplace_interp_dp = _inplace_interp_dp_jit
_mask = self._mask(double_pixels, module_edge_pixels)
ry = np.arange(MODULE_SIZE_Y, dtype=np.uint32)
ry += ry // CHIP_SIZE_Y * CHIP_GAP_Y * gap_pixels
rx = np.arange(MODULE_SIZE_X, dtype=np.uint32)
rx += rx // CHIP_SIZE_X * CHIP_GAP_X * gap_pixels
for i, m in enumerate(self.module_map):
if m == -1:
continue
if image_shape == self._shape_in_full:
mod = self._get_module_slice(images, i)
else:
mod = self._get_module_slice(images, m)
if conversion:
mod_g = self._get_module_slice(self._g, i)
mod_p = self._get_module_slice(self._p, i)
else:
mod_g = None
mod_p = None
if mask:
mod_mask = self._get_module_slice(_mask, i)
else:
mod_mask = None
oy, oy_end, ox, ox_end, rot90 = self._get_module_coords(m, i, geometry, gap_pixels)
mod_out = out[:, oy:oy_end, ox:ox_end]
if self.detector_geometry.is_stripsel and geometry:
mod_tmp_shape = (n_images, MODULE_SIZE_Y, MODULE_SIZE_X)
mod_tmp_dtype = self.get_dtype_out(images.dtype, conversion=conversion)
mod_tmp = np.zeros(shape=mod_tmp_shape, dtype=mod_tmp_dtype)
adc_to_energy(mod_tmp, mod, mod_g, mod_p, mod_mask, self.factor, ry, rx)
reshape_stripsel(mod_out, mod_tmp)
else:
mod_out = np.rot90(mod_out, k=-rot90, axes=(1, 2))
adc_to_energy(mod_out, mod, mod_g, mod_p, mod_mask, self.factor, ry, rx)
if double_pixels == "interp":
inplace_interp_dp(mod_out)
# remove the singleton dimension if input was 2D
if is_2darray:
out = out[0]
return out
[docs]
def can_convert(self) -> bool:
"""Whether all data for gain/pedestal conversion is present.
Returns:
bool: Return true if all data for gain/pedestal conversion is present.
"""
return (self.gain is not None) and (self.pedestal is not None)
[docs]
@lru_cache(maxsize=8)
def get_pixel_mask(
self,
*,
gap_pixels: bool = True,
double_pixels: str = "keep",
module_edge_pixels: str = "keep",
geometry: bool = True,
) -> NDArray | None:
"""Return pixel mask.
Args:
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
double_pixels (str, optional): A method to handle double pixels in-between ASICs. Can be
"keep", "mask", or "interp". Defaults to "keep".
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
Returns:
ndarray: Resulting pixel mask, where True values correspond to valid pixels.
"""
if double_pixels not in ("keep", "mask", "interp"):
raise ValueError("'double_pixels' can only be 'keep', 'mask', or 'interp'")
if double_pixels == "interp" and not gap_pixels:
raise RuntimeError("Double pixel interpolation requires 'gap_pixels' to be True.")
if self.detector_geometry.is_stripsel and gap_pixels:
warnings.warn("'gap_pixels' flag has no effect on stripsel detectors", RuntimeWarning)
gap_pixels = False
if self.detector_geometry.is_stripsel and double_pixels != "keep":
warnings.warn(
"Handling double pixels has no effect on stripsel detectors", RuntimeWarning
)
double_pixels = "keep"
if module_edge_pixels not in ("keep", "mask"):
raise ValueError("'module_edge_pixels' can only be 'keep' or 'mask'")
if self._pixel_mask is None:
return None
if self.detector_name.startswith("JF18"):
reshape_stripsel = _reshape_stripsel_jf18_jit
else:
reshape_stripsel = _reshape_stripsel_jit
_mask = self._mask(double_pixels, module_edge_pixels)[np.newaxis]
res_shape = self.get_shape_out(gap_pixels=gap_pixels, geometry=geometry)
res = np.zeros((1, *res_shape), dtype=bool)
ry = np.arange(MODULE_SIZE_Y, dtype=np.uint32)
ry += ry // CHIP_SIZE_Y * CHIP_GAP_Y * gap_pixels
rx = np.arange(MODULE_SIZE_X, dtype=np.uint32)
rx += rx // CHIP_SIZE_X * CHIP_GAP_X * gap_pixels
for i, m in enumerate(self.module_map):
if m == -1:
continue
mod = self._get_module_slice(_mask, i)
oy, oy_end, ox, ox_end, rot90 = self._get_module_coords(m, i, geometry, gap_pixels)
mod_res = res[:, oy:oy_end, ox:ox_end]
if self.detector_geometry.is_stripsel and geometry:
reshape_stripsel(mod_res, mod)
else:
mod_res = np.rot90(mod_res, k=-rot90, axes=(1, 2))
# this will just copy data to the correct place
_adc_to_energy_jit(mod_res, mod, None, None, None, None, ry, rx)
if double_pixels == "interp":
_inplace_mask_dp_jit(mod_res)
res = res[0]
return res
[docs]
def get_pixel_coordinates(self) -> tuple:
"""Return arrays (x, y, z) of final coordinates for pixels in raw data.
The shape of the result is the same as the raw input data (equivalently, gap_pixels=False,
geometry=False), but the coordinates represent pixel positions after gap_pixel and geometry
corrections (gap_pixels=True, double_pixels="keep", geometry=True).
"""
warnings.warn(
"get_pixel_coordinates() is deprecated and will be removed in jungfrau_utils/4.0",
DeprecationWarning,
)
if self.detector_geometry.is_stripsel:
raise RuntimeError("Stripsel detectors are currently unsupported.")
if any(self.detector_geometry.mod_rot90):
raise RuntimeError("Detectors with rotated modules are currently unsupported.")
if self.detector_geometry.det_rot90:
raise RuntimeError("Detectors with rotated geometry are currently unsupported.")
_y = np.arange(MODULE_SIZE_Y, dtype=np.float64)
for n in range(1, CHIP_NUM_Y):
_y[n * CHIP_SIZE_Y :] += CHIP_GAP_Y
# shift for double pixels
_y[n * CHIP_SIZE_Y - 1] += 0.5
_y[n * CHIP_SIZE_Y] -= 0.5
_x = np.arange(MODULE_SIZE_X, dtype=np.float64)
for n in range(1, CHIP_NUM_X):
_x[n * CHIP_SIZE_X :] += CHIP_GAP_X
# shift for double pixels
_x[n * CHIP_SIZE_X - 1] += 0.5
_x[n * CHIP_SIZE_X] -= 0.5
y_mod_grid, x_mod_grid = np.meshgrid(_y, _x, indexing="ij")
shape_out = self.get_shape_out(gap_pixels=False, geometry=False)
x_coord = np.zeros(shape=shape_out)
y_coord = np.zeros(shape=shape_out)
z_coord = np.zeros(shape=shape_out)
for i, m in enumerate(self.module_map):
if m == -1:
continue
y_mod = self._get_module_slice(y_coord, i)
x_mod = self._get_module_slice(x_coord, i)
oy, _, ox, _, _ = self._get_module_coords(m, i, geometry=True, gap_pixels=True)
y_mod[:] = y_mod_grid + oy
x_mod[:] = x_mod_grid + ox
# apply final detector rotation
if self.detector_geometry.det_rot90 == 1: # (x, y) -> (y, -x)
x_coord, y_coord = y_coord, np.max(x_coord) - x_coord
elif self.detector_geometry.det_rot90 == 2: # (x, y) -> (-x, -y)
x_coord, y_coord = np.max(x_coord) - x_coord, np.max(y_coord) - y_coord
elif self.detector_geometry.det_rot90 == 3: # (x, y) -> (-y, x)
x_coord, y_coord = np.max(y_coord) - y_coord, x_coord
return x_coord, y_coord, z_coord
[docs]
def get_gains(
self,
images: NDArray,
*,
mask: bool = True,
gap_pixels: bool = True,
double_pixels: str = "keep",
module_edge_pixels: str = "keep",
geometry: bool = True,
) -> NDArray:
"""Return gain values of images.
Args:
images (ndarray): Images to be processed.
mask (bool, optional): Perform masking of bad pixels (set those values to 0).
Defaults to True.
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
double_pixels (str, optional): A method to handle double pixels in-between ASICs. Can be
"keep", "mask", or "interp" (resolves into "keep"). Defaults to "keep".
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
Returns:
ndarray: Gain values of pixels.
"""
if images.dtype != np.uint16:
raise TypeError(f"Expected image type {np.uint16}, provided {images.dtype}.")
if double_pixels == "interp":
# interpolation makes sense only for final keV values
double_pixels = "keep"
gains = images >> 14
gains = self.process(
gains,
conversion=False,
mask=mask,
gap_pixels=gap_pixels,
double_pixels=double_pixels,
module_edge_pixels=module_edge_pixels,
geometry=geometry,
)
return gains
[docs]
def get_saturated_pixels(
self,
images: NDArray,
*,
mask: bool = True,
gap_pixels: bool = True,
double_pixels: str = "keep",
module_edge_pixels: str = "keep",
geometry: bool = True,
) -> tuple:
"""Return coordinates of saturated pixels.
Args:
images (ndarray): Images to be processed.
mask (bool, optional): Perform masking of bad pixels (set those values to 0).
Defaults to True.
gap_pixels (bool, optional): Add gap pixels between detector chips. Defaults to True.
double_pixels (str, optional): A method to handle double pixels in-between ASICs. Can be
"keep", "mask", or "interp" (resolves into "keep"). Defaults to "keep".
geometry (bool, optional): Apply detector geometry corrections. Defaults to True.
Returns:
tuple: Indices of saturated pixels.
"""
if images.dtype != np.uint16:
raise TypeError(f"Expected image type {np.uint16}, provided {images.dtype}.")
if double_pixels == "interp":
# interpolation makes sense only for final keV values
double_pixels = "keep"
if self.highgain:
saturated_value = 0b0011111111111111 # = 16383
else:
saturated_value = 0b1100000000000000 # = 49152
saturated_pixels = images == saturated_value
saturated_pixels = self.process(
saturated_pixels,
conversion=False,
mask=mask,
gap_pixels=gap_pixels,
double_pixels=double_pixels,
module_edge_pixels=module_edge_pixels,
geometry=geometry,
)
saturated_pixels_coordinates = np.nonzero(saturated_pixels)
return saturated_pixels_coordinates
def _adc_to_energy(
res: NDArray,
image: NDArray,
gain: NDArray | None,
pedestal: NDArray | None,
mask: NDArray | None,
factor: float | None,
ry: NDArray,
rx: NDArray,
) -> None:
# TODO: remove after issue is fixed: https://github.com/PyCQA/pylint/issues/2910
for i1 in prange(image.shape[0]): # pylint: disable=not-an-iterable
for i2, ri2 in enumerate(ry):
for i3, ri3 in enumerate(rx):
if mask is not None and not mask[i2, i3]:
continue
if gain is None or pedestal is None:
res[i1, ri2, ri3] = image[i1, i2, i3]
else:
gm = np.uint32(np.right_shift(image[i1, i2, i3], 14))
val = np.float32(image[i1, i2, i3] & 0x3FFF)
tmp_res = (val - pedestal[gm, i2, i3]) * gain[gm, i2, i3]
if factor is None:
res[i1, ri2, ri3] = tmp_res
else:
res[i1, ri2, ri3] = round(tmp_res)
_adc_to_energy_jit = njit()(_adc_to_energy)
_adc_to_energy_par_jit = njit(parallel=True)(_adc_to_energy)
def _reshape_stripsel(res: NDArray, image: NDArray) -> None:
# TODO: remove after issue is fixed: https://github.com/PyCQA/pylint/issues/2910
for ind in prange(image.shape[0]): # pylint: disable=not-an-iterable
# first we fill the normal pixels, the gap ones will be overwritten later
for yin in range(252):
for xin in range(1024):
ichip = xin // 256
xout = (ichip * 774) + (xin % 256) * 3 + yin % 3
# 774 is the chip period, 256*3+6
yout = yin // 3
res[ind, yout, xout] = image[ind, yin, xin]
# now the gap pixels
for igap in range(3):
for yin in range(252):
yout = (yin // 6) * 2
# if we want a proper normalization (the area of those pixels is double,
# so they see 2x the signal)
# first the left side of gap
xin = igap * 256 + 255
xout = igap * 774 + 765 + yin % 6
res[ind, yout, xout] = image[ind, yin, xin] / 2
res[ind, yout + 1, xout] = image[ind, yin, xin] / 2
# then the right side is mirrored
xin = igap * 256 + 255 + 1
xout = igap * 774 + 765 + 11 - yin % 6
res[ind, yout, xout] = image[ind, yin, xin] / 2
res[ind, yout + 1, xout] = image[ind, yin, xin] / 2
_reshape_stripsel_jit = njit()(_reshape_stripsel)
_reshape_stripsel_par_jit = njit(parallel=True)(_reshape_stripsel)
def _reshape_stripsel_jf18(res: NDArray, image: NDArray) -> None:
image = image[:, :, 256 : 256 * 3]
offset_y = 9
offset_x_r = 9
offset_x_l = 11
# TODO: remove after issue is fixed: https://github.com/PyCQA/pylint/issues/2910
for ind in prange(image.shape[0]): # pylint: disable=not-an-iterable
image_tmp = np.rot90(image[ind], k=2)
for xin in range(offset_x_l, 512 - offset_x_r):
for yin in range(offset_y, 255):
yout = (xin - offset_x_l) % 3 + (yin - offset_y) * 3
xout = (xin - offset_x_l) // 3 + xin // 257
res[ind, yout, xout] = image_tmp[yin, xin]
for yin in range(257, 512 - offset_y):
yout = 2 - (xin - offset_x_l) % 3 + (yin - offset_y) * 3 + 6
xout = (xin - offset_x_l) // 3 + xin // 257
res[ind, yout, xout] = image_tmp[yin, xin]
res[ind] = np.rot90(res[ind], k=2)
_reshape_stripsel_jf18_jit = njit()(_reshape_stripsel_jf18)
_reshape_stripsel_jf18_par_jit = njit(parallel=True)(_reshape_stripsel_jf18)
def _inplace_interp_dp(res: NDArray) -> None:
for i1 in prange(res.shape[0]): # pylint: disable=not-an-iterable
# corner quad pixels
for ri2 in (255, 257):
for ri3 in (255, 257, 513, 515, 771, 773):
shift_y = 0 if ri2 == 255 else 1
shift_x = 0 if ri3 == 255 or ri3 == 513 or ri3 == 771 else 1
shared_val = res[i1, ri2 + shift_y, ri3 + shift_x] / 4
res[i1, ri2, ri3] = shared_val
res[i1, ri2 + 1, ri3] = shared_val
res[i1, ri2, ri3 + 1] = shared_val
res[i1, ri2 + 1, ri3 + 1] = shared_val
# rows of double pixels
ri2 = 255
for x_start, x_end in ((0, 255), (259, 513), (517, 771), (775, 1030)):
for ri3 in range(x_start, x_end):
v1 = res[i1, ri2 - 1, ri3]
v2 = res[i1, ri2, ri3]
v3 = res[i1, ri2 + 3, ri3]
v4 = res[i1, ri2 + 4, ri3]
div = 6 * v1 + 3 * v3
if div == 0:
shared_val = v2 / 2
res[i1, ri2, ri3] = shared_val
res[i1, ri2 + 1, ri3] = shared_val
else:
res[i1, ri2, ri3] *= (4 * v1 + v3) / div
res[i1, ri2 + 1, ri3] = v2 - res[i1, ri2, ri3]
div = 6 * v4 + 3 * v2
if div == 0:
shared_val = v3 / 2
res[i1, ri2 + 3, ri3] = shared_val
res[i1, ri2 + 2, ri3] = shared_val
else:
res[i1, ri2 + 3, ri3] *= (4 * v4 + v2) / div
res[i1, ri2 + 2, ri3] = v3 - res[i1, ri2 + 3, ri3]
# columns of double pixels
for ri3 in (255, 513, 771):
for y_start, y_end in ((0, 255), (259, 514)):
for ri2 in range(y_start, y_end):
v1 = res[i1, ri2, ri3 - 1]
v2 = res[i1, ri2, ri3]
v3 = res[i1, ri2, ri3 + 3]
v4 = res[i1, ri2, ri3 + 4]
div = 6 * v1 + 3 * v3
if div == 0:
shared_val = v2 / 2
res[i1, ri2, ri3] = shared_val
res[i1, ri2, ri3 + 1] = shared_val
else:
res[i1, ri2, ri3] *= (4 * v1 + v3) / div
res[i1, ri2, ri3 + 1] = v2 - res[i1, ri2, ri3]
div = 6 * v4 + 3 * v2
if div == 0:
shared_val = v3 / 2
res[i1, ri2, ri3 + 3] = shared_val
res[i1, ri2, ri3 + 2] = shared_val
else:
res[i1, ri2, ri3 + 3] *= (4 * v4 + v2) / div
res[i1, ri2, ri3 + 2] = v3 - res[i1, ri2, ri3 + 3]
_inplace_interp_dp_jit = njit()(_inplace_interp_dp)
_inplace_interp_dp_par_jit = njit(parallel=True)(_inplace_interp_dp)
@njit
def _inplace_mask_dp_jit(res: NDArray) -> None:
# gap_pixels is always True here
for row in (256,):
vals = res[:, row - 1, :1030] & res[:, row + 2, :1030]
res[:, row, :1030] = vals
res[:, row + 1, :1030] = vals
for col in (256, 514, 772):
vals = res[:, :514, col - 1] & res[:, :514, col + 2]
res[:, :514, col] = vals
res[:, :514, col + 1] = vals