Source code for tomobar.methodsIR_CuPy
"""Reconstruction class for regularised iterative methods using CuPy library.
* :func:`RecToolsIRCuPy.FISTA` iterative regularised algorithm [BT2009]_, [Xu2016]_. Implemented with the help of ASTRA's DirectLink experimental feature.
* :func:`RecToolsIRCuPy.Landweber` algorithm.
* :func:`RecToolsIRCuPy.SIRT` algorithm.
* :func:`RecToolsIRCuPy.CGLS` algorithm.
"""
import numpy as np
from typing import Union
try:
import cupy as cp
except ImportError:
print(
"Cupy library is a required dependency for this part of the code, please install"
)
try:
import astra
except ImportError:
print("____! Astra-toolbox package is missing, please install !____")
from tomobar.supp.funcs import _data_dims_swapper
from tomobar.supp.suppTools import (
check_kwargs,
perform_recon_crop,
_apply_horiz_detector_padding,
)
from tomobar.supp.dicts import dicts_check, _reinitialise_atools_OS
from tomobar.regularisersCuPy import prox_regul
from tomobar.astra_wrappers.astra_tools3d import AstraTools3D
[docs]
class RecToolsIRCuPy:
"""CuPy-enabled iterative reconstruction algorithms using ASTRA toolbox for forward/back projection.
Parameters for reconstruction algorithms should be provided in three dictionaries:
:data:`_data_`, :data:`_algorithm_`, and :data:`_regularisation_`. See :mod:`tomobar.supp.dicts`
function of ToMoBAR's :ref:`ref_api` for all parameters explained.
This implementation is typically several times faster than the one in :func:`RecToolsIR.FISTA` of
:mod:`tomobar.methodsIR`, but not all functionality is supported yet.
Args:
DetectorsDimH (int): Horizontal detector dimension size.
DetectorsDimH_pad (int): The amount of padding for the horizontal detector.
DetectorsDimV (int): Vertical detector dimension size.
CenterRotOffset (float): The Centre of Rotation (CoR) scalar or a vector for each angle.
AnglesVec (np.ndarray): Vector of projection angles in radians.
ObjSize (int): The size of the reconstructed object (a slice) defined as [recon_size, recon_size].
datafidelity (str, optional): Data fidelity, choose from LS and PWLS. Defaults to LS.
device_projector (int, optional): Provide a GPU index of a specific GPU device. Defaults to 0.
cupyrun (bool, optional): instantiate CuPy modules.
"""
def __init__(
self,
DetectorsDimH, # Horizontal detector dimension size.
DetectorsDimH_pad, # The amount of padding for the horizontal detector.
DetectorsDimV, # Vertical detector dimension (3D case), 0 or None for 2D case
CenterRotOffset, # The Centre of Rotation scalar or a vector
AnglesVec, # Array of projection angles in radians
ObjSize, # The size of the reconstructed object (a slice)
datafidelity="LS", # Data fidelity, choose from LS and PWLS
device_projector=0, # provide a GPU index (integer) of a specific device
cupyrun=True,
):
self.datafidelity = datafidelity
self.cupyrun = cupyrun
if DetectorsDimH_pad == 0:
self.objsize_user_given = None
else:
self.objsize_user_given = ObjSize
if DetectorsDimH_pad > 0:
# when we pad horizontal detector we might need to reconstruct on a larger grid as well to avoid artifacts
ObjSize = DetectorsDimH + 2 * DetectorsDimH_pad
if DetectorsDimV == 0 or DetectorsDimV is None:
raise ValueError(
"2D CuPy iterative reconstruction is not supported, only 3D reconstruction is supported"
)
else:
self.geom = "3D"
self.Atools = AstraTools3D(
DetectorsDimH,
DetectorsDimH_pad,
DetectorsDimV,
AnglesVec,
CenterRotOffset,
ObjSize,
"gpu",
device_projector,
)
@property
def datafidelity(self) -> int:
return self._datafidelity
@datafidelity.setter
def datafidelity(self, datafidelity_val):
if datafidelity_val not in ["LS", "PWLS", "SWLS", "KL"]:
raise ValueError("Unknown data fidelity type, select: LS, PWLS, SWLS or KL")
self._datafidelity = datafidelity_val
@property
def cupyrun(self) -> int:
return self._cupyrun
@cupyrun.setter
def cupyrun(self, cupyrun_val):
self._cupyrun = cupyrun_val
@property
def objsize_user_given(self) -> int:
return self._objsize_user_given
@objsize_user_given.setter
def objsize_user_given(self, objsize_user_given_val):
self._objsize_user_given = objsize_user_given_val
[docs]
def Landweber(
self, _data_: dict, _algorithm_: Union[dict, None] = None
) -> cp.ndarray:
"""Using Landweber iterative technique to reconstruct projection data given as a CuPy array.
Args:
_data_ (dict): Data dictionary, where projection data is provided.
_algorithm_ (dict, optional): Algorithm dictionary where algorithm parameters are provided.
Returns:
cp.ndarray: The Landweber-reconstructed volume as a CuPy array.
"""
cp._default_memory_pool.free_all_blocks()
######################################################################
# parameters check and initialisation
(_data_upd_, _algorithm_upd_, _regularisation_upd_) = dicts_check(
self, _data_, _algorithm_, method_run="Landweber"
)
del _data_, _algorithm_
_data_upd_["projection_norm_data"] = _apply_horiz_detector_padding(
_data_upd_["projection_norm_data"],
self.Atools.detectors_x_pad,
cupyrun=True,
)
additional_args = {
"cupyrun": True,
"recon_mask_radius": _algorithm_upd_["recon_mask_radius"],
}
######################################################################
x_rec = cp.zeros(
astra.geom_size(self.Atools.vol_geom), dtype=cp.float32
) # initialisation
for _ in range(_algorithm_upd_["iterations"]):
residual = (
self.Atools._forwprojCuPy(x_rec) - _data_upd_["projection_norm_data"]
) # Ax - b term
x_rec -= _algorithm_upd_["tau_step_lanweber"] * self.Atools._backprojCuPy(
residual
)
if _algorithm_upd_["nonnegativity"]:
x_rec[x_rec < 0.0] = 0.0
if self.objsize_user_given is not None:
return perform_recon_crop(x_rec, self.objsize_user_given)
return check_kwargs(x_rec, **additional_args)
[docs]
def SIRT(self, _data_: dict, _algorithm_: Union[dict, None] = None) -> cp.ndarray:
"""Using Simultaneous Iterations Reconstruction Technique (SIRT) iterative technique to
reconstruct projection data given as a CuPy array.
See more about the method `here <https://diamondlightsource.github.io/httomolibgpu/reference/methods_list/reconstruction/SIRT3d_tomobar.html>`__.
Args:
_data_ (dict): Data dictionary, where projection data is provided.
_algorithm_ (dict, optional): Algorithm dictionary where algorithm parameters are provided.
Returns:
cp.ndarray: SIRT-reconstructed volume as a CuPy array.
"""
######################################################################
cp._default_memory_pool.free_all_blocks()
# parameters check and initialisation
(_data_upd_, _algorithm_upd_, _regularisation_upd_) = dicts_check(
self, _data_, _algorithm_, method_run="SIRT"
)
_data_upd_["projection_norm_data"] = _apply_horiz_detector_padding(
_data_upd_["projection_norm_data"],
self.Atools.detectors_x_pad,
cupyrun=True,
)
del _data_, _algorithm_
additional_args = {
"cupyrun": True,
"recon_mask_radius": _algorithm_upd_["recon_mask_radius"],
}
######################################################################
R = 1.0 / self.Atools._forwprojCuPy(
cp.ones(astra.geom_size(self.Atools.vol_geom), dtype=np.float32)
)
R = cp.nan_to_num(R, copy=False, nan=1.0, posinf=1.0, neginf=1.0)
C = 1.0 / self.Atools._backprojCuPy(
cp.ones(astra.geom_size(self.Atools.proj_geom), dtype=np.float32)
)
C = cp.nan_to_num(C, copy=False, nan=1.0, posinf=1.0, neginf=1.0)
x_rec = cp.ones(
astra.geom_size(self.Atools.vol_geom), dtype=np.float32
) # initialisation
# perform SIRT iterations
for _ in range(_algorithm_upd_["iterations"]):
x_rec += C * self.Atools._backprojCuPy(
R
* (
_data_upd_["projection_norm_data"]
- self.Atools._forwprojCuPy(x_rec)
)
)
if _algorithm_upd_["nonnegativity"]:
x_rec[x_rec < 0.0] = 0.0
if self.objsize_user_given is not None:
return perform_recon_crop(x_rec, self.objsize_user_given)
return check_kwargs(x_rec, **additional_args)
[docs]
def CGLS(self, _data_: dict, _algorithm_: Union[dict, None] = None) -> cp.ndarray:
"""Conjugate Gradients Least Squares iterative technique to reconstruct projection data
given as a CuPy array. See more about the method `here <https://diamondlightsource.github.io/httomolibgpu/reference/methods_list/reconstruction/CGLS3d_tomobar.html>`__.
Args:
_data_ (dict): Data dictionary, where projection data is provided.
_algorithm_ (dict, optional): Algorithm dictionary where algorithm parameters are provided.
Returns:
cp.ndarray: CGLS-reconstructed volume as a CuPy array.
"""
cp._default_memory_pool.free_all_blocks()
######################################################################
# parameters check and initialisation
(_data_upd_, _algorithm_upd_, _regularisation_upd_) = dicts_check(
self, _data_, _algorithm_, method_run="CGLS"
)
del _data_, _algorithm_
_data_upd_["projection_norm_data"] = _apply_horiz_detector_padding(
_data_upd_["projection_norm_data"],
self.Atools.detectors_x_pad,
cupyrun=True,
)
additional_args = {
"cupyrun": True,
"recon_mask_radius": _algorithm_upd_["recon_mask_radius"],
}
######################################################################
data_shape_3d = cp.shape(_data_upd_["projection_norm_data"])
# Prepare for CG iterations.
x_rec = cp.zeros(
astra.geom_size(self.Atools.vol_geom), dtype=cp.float32
) # initialisation
x_shape_3d = cp.shape(x_rec)
x_rec = cp.ravel(x_rec, order="C") # vectorise
d = self.Atools._backprojCuPy(_data_upd_["projection_norm_data"])
d = cp.ravel(d, order="C")
normr2 = cp.inner(d, d)
r = cp.ravel(_data_upd_["projection_norm_data"], order="C")
del _data_upd_
# perform CG iterations
for _ in range(_algorithm_upd_["iterations"]):
# Update x_rec and r vectors:
Ad = self.Atools._forwprojCuPy(
cp.reshape(d, newshape=x_shape_3d, order="C")
)
Ad = cp.ravel(Ad, order="C")
alpha = normr2 / cp.inner(Ad, Ad)
x_rec += alpha * d
r -= alpha * Ad
s = self.Atools._backprojCuPy(
cp.reshape(r, newshape=data_shape_3d, order="C")
)
s = cp.ravel(s, order="C")
# Update d vector
normr2_new = cp.inner(s, s)
beta = normr2_new / normr2
normr2 = normr2_new.copy()
d = s + beta * d
if _algorithm_upd_["nonnegativity"]:
x_rec[x_rec < 0.0] = 0.0
del d, s, beta, r, alpha, Ad, normr2_new, normr2
if self.objsize_user_given is not None:
return perform_recon_crop(
cp.reshape(x_rec, newshape=x_shape_3d, order="C"),
self.objsize_user_given,
)
else:
return check_kwargs(
cp.reshape(x_rec, newshape=x_shape_3d, order="C"), **additional_args
)
[docs]
def powermethod(self, _data_: dict) -> float:
"""Power iteration algorithm to calculate the eigenvalue of the operator (projection matrix).
projection_raw_data is required for the PWLS fidelity, otherwise will be ignored.
Args:
_data_ (dict): Data dictionary, where input data is provided.
Returns:
float: the Lipschitz constant
"""
if "data_axes_labels_order" not in _data_:
_data_["data_axes_labels_order"] = None
if (
self.datafidelity in ["PWLS", "SWLS"]
and "projection_raw_data" not in _data_
):
raise ValueError("Please provide projection_raw_data for this model")
if self.datafidelity in ["PWLS", "SWLS"]:
sqweight = _data_["projection_raw_data"]
if _data_["data_axes_labels_order"] is not None:
if self.geom == "2D":
_data_["projection_norm_data"] = _data_dims_swapper(
_data_["projection_norm_data"],
_data_["data_axes_labels_order"],
["angles", "detX"],
)
if self.datafidelity in ["PWLS", "SWLS"]:
_data_["projection_raw_data"] = _data_dims_swapper(
_data_["projection_raw_data"],
_data_["data_axes_labels_order"],
["angles", "detX"],
)
sqweight = _data_["projection_raw_data"]
else:
_data_["projection_norm_data"] = _data_dims_swapper(
_data_["projection_norm_data"],
_data_["data_axes_labels_order"],
["detY", "angles", "detX"],
)
if self.datafidelity in ["PWLS", "SWLS"]:
_data_["projection_raw_data"] = _data_dims_swapper(
_data_["projection_raw_data"],
_data_["data_axes_labels_order"],
["detY", "angles", "detX"],
)
sqweight = _data_["projection_raw_data"]
# we need to reset the swap option here as the data already been modified so we don't swap it again in the method
_data_["data_axes_labels_order"] = None
if _data_.get("OS_number") is None:
_data_["OS_number"] = 1 # the classical approach (default)
else:
_data_ = _reinitialise_atools_OS(self, _data_)
power_iterations = 15
s = 1.0
proj_geom = astra.geom_size(self.Atools.vol_geom)
x1 = cp.random.randn(*proj_geom, dtype=cp.float32)
if _data_["OS_number"] == 1:
# non-OS approach
y = self.Atools._forwprojCuPy(x1)
if self.datafidelity == "PWLS":
y = cp.multiply(sqweight, y)
for _ in range(power_iterations):
x1 = self.Atools._backprojCuPy(y)
s = cp.linalg.norm(cp.ravel(x1), axis=0)
x1 = x1 / s
y = self.Atools._forwprojCuPy(x1)
if self.datafidelity == "PWLS":
y = cp.multiply(sqweight, y)
else:
# OS approach
y = self.Atools._forwprojOSCuPy(x1, 0)
if self.datafidelity == "PWLS":
if self.geom == "2D":
y = cp.multiply(sqweight[self.Atools.newInd_Vec[0, :], :], y)
else:
y = cp.multiply(sqweight[:, self.Atools.newInd_Vec[0, :], :], y)
for _ in range(power_iterations):
x1 = self.Atools._backprojOSCuPy(y, 0)
s = cp.linalg.norm(cp.ravel(x1), axis=0)
x1 = x1 / s
y = self.Atools._forwprojOSCuPy(x1, 0)
if self.datafidelity == "PWLS":
if self.geom == "2D":
y = cp.multiply(sqweight[self.Atools.newInd_Vec[0, :], :], y)
else:
y = cp.multiply(sqweight[:, self.Atools.newInd_Vec[0, :], :], y)
return s
[docs]
def FISTA(
self,
_data_: dict,
_algorithm_: Union[dict, None] = None,
_regularisation_: Union[dict, None] = None,
) -> cp.ndarray:
"""A Fast Iterative Shrinkage-Thresholding Algorithm [BT2009]_ with various types of regularisation from
the regularisation toolkit [KAZ2019]_ (currently accepts ROF_TV and PD_TV only).
See more about the method `here <https://diamondlightsource.github.io/httomolibgpu/reference/methods_list/reconstruction/FISTA3d_tomobar.html>`__.
All parameters for the algorithm should be provided in three dictionaries:
:data:`_data_`, :data:`_algorithm_`, and :data:`_regularisation_`. See :mod:`tomobar.supp.dicts`
function of ToMoBAR's :ref:`ref_api` for all parameters explained.
Please note that not all of the functionality supported in this CuPy implementation compared to :func:`RecToolsIR.FISTA` from
:mod:`tomobar.methodsIR`.
Args:
_data_ (dict): Data dictionary, where input data is provided.
_algorithm_ (dict, optional): Algorithm dictionary where algorithm parameters are provided.
_regularisation_ (dict, optional): Regularisation dictionary, currently accepts ROF_TV and PD_TV only.
Returns:
cp.ndarray: FISTA-reconstructed 3D CuPy array
"""
cp._default_memory_pool.free_all_blocks()
if self.geom == "2D":
# 2D reconstruction
raise ValueError("2D CuPy reconstruction is not yet supported")
# initialise the solution
X = cp.zeros(astra.geom_size(self.Atools.vol_geom), dtype=cp.float32)
(_data_upd_, _algorithm_upd_, _regularisation_upd_) = dicts_check(
self, _data_, _algorithm_, _regularisation_, method_run="FISTA"
)
del _data_, _algorithm_, _regularisation_
_data_upd_["projection_norm_data"] = _apply_horiz_detector_padding(
_data_upd_["projection_norm_data"],
self.Atools.detectors_x_pad,
cupyrun=True,
)
additional_args = {
"cupyrun": True,
"recon_mask_radius": _algorithm_upd_["recon_mask_radius"],
}
if _data_upd_["OS_number"] > 1:
_data_upd_ = _reinitialise_atools_OS(self, _data_upd_)
L_const_inv = cp.float32(
1.0 / _algorithm_upd_["lipschitz_const"]
) # inverted Lipschitz constant
t = cp.float32(1.0)
X_t = cp.copy(X)
# FISTA iterations
for _ in range(_algorithm_upd_["iterations"]):
# loop over subsets (OS)
for sub_ind in range(_data_upd_["OS_number"]):
X_old = X
t_old = t
if _data_upd_["OS_number"] > 1:
# select a specific set of indeces for the subset (OS)
indVec = self.Atools.newInd_Vec[sub_ind, :]
if indVec[self.Atools.NumbProjBins - 1] == 0:
indVec = indVec[:-1] # shrink vector size
if self.datafidelity == "LS":
# 3D Least-squares (LS) data fidelity - OS (linear)
res = (
self.Atools._forwprojOSCuPy(X_t, sub_ind)
- _data_upd_["projection_norm_data"][:, indVec, :]
)
if self.datafidelity == "PWLS":
# 3D Penalised Weighted Least-squares - OS data fidelity (approximately linear)
res = np.multiply(
_data_upd_["projection_raw_data"][:, indVec, :],
(
self.Atools._forwprojOSCuPy(X_t, sub_ind)
- _data_upd_["projection_norm_data"][:, indVec, :]
),
)
# OS-reduced gradient
grad_fidelity = self.Atools._backprojOSCuPy(res, sub_ind)
else:
# full gradient
res = (
self.Atools._forwprojCuPy(X_t)
- _data_upd_["projection_norm_data"]
)
grad_fidelity = self.Atools._backprojCuPy(res)
del res
X = X_t - L_const_inv * grad_fidelity
del X_t, grad_fidelity
if _algorithm_upd_["nonnegativity"]:
X[X < 0.0] = 0.0
if _regularisation_upd_["method"] is not None:
##### The proximal operator of the chosen regulariser #####
X = prox_regul(self, X, _regularisation_upd_)
t = cp.float32((1.0 + np.sqrt(1.0 + 4.0 * t**2)) * 0.5)
X_t = X + cp.float32((t_old - 1.0) / t) * (X - X_old)
if self.objsize_user_given is not None:
return perform_recon_crop(X, self.objsize_user_given)
return check_kwargs(X, **additional_args)