Source code for tomobar.regularisersCuPy

import cupy as cp
from tomobar.cuda_kernels import load_cuda_module


[docs] def prox_regul(self, X: cp.ndarray, _regularisation_: dict) -> cp.ndarray: """Enabling proximal operators (regularisers) in iterative reconstruction. Args: X (cp.ndarray): 2D or 3D CuPy array. _regularisation_ (dict): Regularisation dictionary with parameters, see :mod:`tomobar.supp.dicts`. Returns: cp.ndarray: Filtered 2D or 3D CuPy array. """ # The proximal operator of the chosen regulariser if "ROF_TV" in _regularisation_["method"]: # Rudin - Osher - Fatemi Total variation method X_prox = ROF_TV_cupy( X, _regularisation_["regul_param"], _regularisation_["iterations"], _regularisation_["time_marching_step"], self.Atools.device_index, _regularisation_.get("half_precision", False), ) elif "PD_TV" in _regularisation_["method"]: X_prox = PD_TV_cupy( X, _regularisation_["regul_param"], _regularisation_["iterations"], _regularisation_["methodTV"], self.nonneg_regul, _regularisation_["PD_LipschitzConstant"], self.Atools.device_index, _regularisation_.get("half_precision", False), ) return X_prox
[docs] def ROF_TV_cupy( data: cp.ndarray, regularisation_parameter: float = 1e-05, iterations: int = 3000, time_marching_parameter: float = 0.001, gpu_id: int = 0, half_precision: bool = False, ) -> cp.ndarray: """Total Variation using Rudin-Osher-Fatemi (ROF) explicit iteration scheme to perform edge-preserving image denoising. This is a gradient-based algorithm for a smoothed TV term which requires a small time marching parameter and a significant number of iterations. Ref: Rudin, Osher, Fatemi, "Nonlinear Total Variation based noise removal algorithms", 1992. Args: data (cp.ndarray): A 2d or 3d CuPy array. regularisation_parameter (float): Regularisation parameter to control the level of smoothing. Defaults to 1e-05. iterations (int): The number of iterations. Defaults to 3000. time_marching_parameter (float): Time marching parameter, needs to be small to ensure convergance. Defaults to 0.001. gpu_id (int): A GPU device index to perform operation on. Defaults to 0. half_precision (bool): Perform some computations in half-precision, potentially leads to better performance. Defaults to False. Returns: cp.ndarray: ROF-TV filtered CuPy array. """ if gpu_id >= 0: cp.cuda.Device(gpu_id).use() else: raise ValueError("The gpu_device must be a positive integer or zero") cp.get_default_memory_pool().free_all_blocks() input_type = data.dtype if input_type != "float32": raise ValueError("The input data should be float32 data type") dtype_of_D = cp.float16 if half_precision else cp.float32 # initialise CuPy arrays here out_arrays = [data.copy(), cp.zeros(data.shape, dtype=cp.float32, order="C")] d_D1 = cp.empty(data.shape, dtype=dtype_of_D, order="C") d_D2 = cp.empty(data.shape, dtype=dtype_of_D, order="C") type_of_P = "__half" if half_precision else "float" name_expressions = [ f"divergence_kernel_2D<{type_of_P}>", f"TV_kernel2D<{type_of_P}>", f"divergence_kernel_3D<{type_of_P}>", f"TV_kernel3D<{type_of_P}>", ] module = load_cuda_module("rudin_osher_fatemi_total_variation", name_expressions) (dz, dy, dx) = data.shape + (0,) * (3 - data.ndim) block_x = 128 block_dims = (block_x, 1) grid_x = (dx + block_x - 1) // block_x grid_y = dy grid_dims = (grid_x, grid_y) data_dims = (dx, dy) if data.ndim == 2: divergence_kernel = module.get_function(name_expressions[0]) TV_kernel = module.get_function(name_expressions[1]) elif data.ndim == 3: d_D3 = cp.empty(data.shape, dtype=dtype_of_D, order="C") block_dims = block_dims + (1,) grid_dims = grid_dims + (dz,) data_dims = data_dims + (dz,) divergence_kernel = module.get_function(name_expressions[2]) TV_kernel = module.get_function(name_expressions[3]) # perform algorithm iterations input_index = 0 output_index = 1 for _ in range(iterations): if data.ndim == 2: params = ( out_arrays[input_index], d_D1, d_D2, dx, dy, ) elif data.ndim == 3: params = ( out_arrays[input_index], d_D1, d_D2, d_D3, dx, dy, dz, ) divergence_kernel(grid_dims, block_dims, params) if data.ndim == 2: params = ( out_arrays[input_index], out_arrays[output_index], data, d_D1, d_D2, cp.float32(regularisation_parameter), cp.float32(time_marching_parameter), dx, dy, ) elif data.ndim == 3: params = ( out_arrays[input_index], out_arrays[output_index], data, d_D1, d_D2, d_D3, cp.float32(regularisation_parameter), cp.float32(time_marching_parameter), dx, dy, dz, ) TV_kernel(grid_dims, block_dims, params) input_index = 1 - input_index output_index = 1 - output_index return out_arrays[input_index]
[docs] def PD_TV_cupy( data: cp.ndarray, regularisation_parameter: float = 1e-05, iterations: int = 1000, methodTV: int = 0, nonneg: int = 0, lipschitz_const: float = 8.0, gpu_id: int = 0, half_precision: bool = False, ) -> cp.ndarray: """Primal Dual algorithm for non-smooth convex Total Variation functional. Ref: Chambolle, Pock, "A First-Order Primal-Dual Algorithm for Convex Problems with Applications to Imaging", 2010. Args: data (cp.ndarray): A 2d or 3d CuPy array. regularisation_parameter (float): Regularisation parameter to control the level of smoothing. Defaults to 1e-05. iterations (int): The number of iterations. Defaults to 1000. methodTV (int): Choose between isotropic (0) or anisotropic (1) case for TV norm. nonneg (int): Enable non-negativity in updates by selecting 1. Defaults to 0. lipschitz_const (float): Lipschitz constant to control convergence. gpu_id (int): A GPU device index to perform operation on. Defaults to 0. half_precision (bool): Perform some computations in half-precision, potentially leads to better performance. Defaults to False. Returns: cp.ndarray: PD-TV filtered CuPy array. """ if gpu_id >= 0: cp.cuda.Device(gpu_id).use() else: raise ValueError("The gpu_device must be a positive integer or zero") # with cp.cuda.Device(gpu_id): cp.get_default_memory_pool().free_all_blocks() input_type = data.dtype if input_type != "float32": raise ValueError("The input data should be float32 data type") dtype_of_P = cp.float16 if half_precision else cp.float32 # prepare some parameters: tau = cp.float32(regularisation_parameter * 0.1) sigma = cp.float32(1.0 / (lipschitz_const * tau)) theta = cp.float32(1.0) lt = cp.float32(tau / regularisation_parameter) # initialise CuPy arrays here: U_arrays = [data.copy(), cp.zeros(data.shape, dtype=cp.float32, order="C")] P1_arrays = [cp.zeros(data.shape, dtype=dtype_of_P, order="C") for _ in range(2)] P2_arrays = [cp.zeros(data.shape, dtype=dtype_of_P, order="C") for _ in range(2)] # loading and compiling CUDA kernels: type_of_P = "__half" if half_precision else "float" nonneg_kernel_param = "true" if bool(nonneg) else "false" methodTV_kernel_param = "true" if bool(methodTV) else "false" name_expressions = [ f"primal_dual_for_total_variation_2D<{type_of_P}, {nonneg_kernel_param}, {methodTV_kernel_param}>", f"primal_dual_for_total_variation_3D<{type_of_P}, {nonneg_kernel_param}, {methodTV_kernel_param}>", ] module = load_cuda_module("primal_dual_for_total_variation", name_expressions) (dz, dy, dx) = data.shape + (0,) * (3 - data.ndim) block_x = 128 block_dims = (block_x, 1) grid_x = (dx + block_x - 1) // block_x grid_y = dy grid_dims = (grid_x, grid_y) data_dims = (dx, dy) if data.ndim == 2: primal_dual_for_total_variation = module.get_function(name_expressions[0]) elif data.ndim == 3: P3_arrays = [ cp.zeros(data.shape, dtype=dtype_of_P, order="C") for _ in range(2) ] block_dims = block_dims + (1,) grid_dims = grid_dims + (dz,) data_dims = data_dims + (dz,) primal_dual_for_total_variation = module.get_function(name_expressions[1]) # perform algorithm iterations input_index = 0 output_index = 1 for _ in range(iterations): if data.ndim == 2: params = ( data, U_arrays[input_index], U_arrays[output_index], P1_arrays[input_index], P2_arrays[input_index], P1_arrays[output_index], P2_arrays[output_index], sigma, tau, lt, theta, *data_dims, ) elif data.ndim == 3: params = ( data, U_arrays[input_index], U_arrays[output_index], P1_arrays[input_index], P2_arrays[input_index], P3_arrays[input_index], P1_arrays[output_index], P2_arrays[output_index], P3_arrays[output_index], sigma, tau, lt, theta, *data_dims, ) primal_dual_for_total_variation(grid_dims, block_dims, params) input_index = 1 - input_index output_index = 1 - output_index return U_arrays[input_index]