Source code for tomobar.supp.suppTools

"""Supplementary data tools:

List of functions:

* normaliser - to normalise the raw data and take the negative log (if needed). Options are: 'mean', 'median' and 'dynamic'.
* autocropper - automatically crops the 3D projection data to reduce its size.

@authors:
    Daniil Kazantsev: https://github.com/dkazanc

    Gerard Jover Pujol https://github.com/IararIV/
"""

import numpy as np
import typing
from typing import Union

try:
    import cupy as xp

    try:
        xp.cuda.Device(0).compute_capability
    except xp.cuda.runtime.CUDARuntimeError:
        import numpy as xp

        print("CuPy is installed but the GPU device is inaccessible")
except ImportError:
    import numpy as xp


try:
    from skimage.transform import downscale_local_mean
except ImportError:
    print("____! Skimage module is required for Dynamic Flat fields estimation !____")
try:
    from skimage.restoration import estimate_sigma
except ImportError:
    print("____! Skimage module is required for Dynamic Flat fields estimation !____")
try:
    import scipy
except ImportError:
    print("____! scipy module is required for Dynamic Flat fields estimation !____")
try:
    import bm3d
except ImportError:
    # "____! BM3D module is required to use for dynamic flat fields calculation !____"
    pass


[docs]def DFFC(data, flats, darks, downsample, nrPArepetions): # Load frames meanDarkfield = np.mean(darks, axis=1, dtype=np.float64) whiteVect = np.zeros( (flats.shape[1], flats.shape[0] * flats.shape[2]), dtype=np.float64 ) for i in range(flats.shape[1]): whiteVect[i] = flats[:, i, :].flatten() - meanDarkfield.flatten() mn = np.mean(whiteVect, axis=0) # Substract mean flat field M, N = whiteVect.shape Data = whiteVect - mn # ============================================================================= # Parallel Analysis (EEFs selection): # Selection of the number of components for PCA using parallel Analysis. # Each flat field is a single row of the matrix flatFields, different # rows are different observations. # ============================================================================= def cov(X): one_vector = np.ones((1, X.shape[0])) mu = np.dot(one_vector, X) / X.shape[0] X_mean_subtract = X - mu covA = np.dot(X_mean_subtract.T, X_mean_subtract) / (X.shape[0] - 1) return covA def parallelAnalysis(flatFields, repetitions): stdEFF = np.std(flatFields, axis=0, ddof=1, dtype=np.float64) H, W = flatFields.shape keepTrack = np.zeros((H, repetitions), dtype=np.float64) stdMatrix = np.tile(stdEFF, (H, 1)) for i in range(repetitions): print(f"Parallel Analysis - repetition {i}") sample = stdMatrix * np.random.randn(H, W) D1, _ = np.linalg.eig(np.cov(sample)) keepTrack[:, i] = D1.copy() mean_flat_fields_EFF = np.mean(flatFields, axis=0) F = flatFields - mean_flat_fields_EFF D1, V1 = np.linalg.eig(np.cov(F)) selection = np.zeros((1, H)) # mean + 2 * std selection[ :, D1 > (np.mean(keepTrack, axis=1) + 2 * np.std(keepTrack, axis=1, ddof=1)) ] = 1 numberPC = np.sum(selection) return V1, D1, int(numberPC) # Parallel Analysis nrEigenflatfields = 0 print("Parallel Analysis:") while nrEigenflatfields <= 0: V1, D1, nrEigenflatfields = parallelAnalysis(Data, nrPArepetions) print(f"{nrEigenflatfields} eigen flat fields selected!") idx = D1.argsort()[::-1] D1 = D1[idx] V1 = V1[:, idx] # Calculation eigen flat fields H, C, W = data.shape eig0 = mn.reshape((H, W)) EFF = np.zeros((nrEigenflatfields + 1, H, W)) # n_EFF + 1 eig0 EFF_denoised = np.zeros((nrEigenflatfields + 1, H, W)) # n_EFF + 1 eig0 print("Calculating EFFs:") EFF[0] = eig0 for i in range(nrEigenflatfields): EFF[i + 1] = (np.matmul(Data.T, V1[i]).T).reshape((H, W)) EFF_denoised = EFF.copy() # Denoise eigen flat fields print("Denoising EFFs using BM3D method:") for i in range(1, len(EFF)): print(f"Denoising EFF {i}") EFF_max, EFF_min = EFF_denoised[i, :, :].max(), EFF_denoised[i, :, :].min() EFF_denoised[i, :, :] = (EFF_denoised[i, :, :] - EFF_min) / (EFF_max - EFF_min) sigma_bm3d = estimate_sigma(EFF_denoised[i, :, :]) * 10 # print(f"Estimated sigma: {sigma_bm3d}") EFF_denoised[i, :, :] = bm3d.bm3d(EFF_denoised[i, :, :], sigma_bm3d) EFF_denoised[i, :, :] = (EFF_denoised[i, :, :] * (EFF_max - EFF_min)) + EFF_min print("Denoising completed.") # ============================================================================= # cost_func: cost funcion used to estimate the weights using TV # ============================================================================= def cost_func(x, *args): (projections, meanFF, FF, DF) = args FF_eff = np.zeros((FF.shape[1], FF.shape[2])) for i in range(len(FF)): FF_eff = FF_eff + x[i] * FF[i] logCorProj = ( (projections - DF) / (meanFF + FF_eff) * np.mean(meanFF.flatten() + FF_eff.flatten()) ) Gx, Gy = np.gradient(logCorProj) mag = (Gx**2 + Gy**2) ** (1 / 2) cost = np.sum(mag.flatten()) return cost # ============================================================================= # CondTVmean function: finds the optimal estimates of the coefficients of the # eigen flat fields. # ============================================================================= def condTVmean(projection, meanFF, FF, DF, x, DS): # Downsample image projection = downscale_local_mean(projection, (DS, DS)) meanFF = downscale_local_mean(meanFF, (DS, DS)) FF2 = np.zeros((FF.shape[0], meanFF.shape[0], meanFF.shape[1])) for i in range(len(FF)): FF2[i] = downscale_local_mean(FF[i], (DS, DS)) FF = FF2 DF = downscale_local_mean(DF, (DS, DS)) # Optimize weights (x) x = scipy.optimize.minimize( cost_func, x, args=(projection, meanFF, FF, DF), method="BFGS", tol=1e-8 ) return x.x H, C, W = data.shape print("TV optimisation for DFF coefficients:") clean_DFFC = np.zeros((H, C, W), dtype=np.float64) for i in range(C): if i % 5 == 0: print("Normalising projection", i) projection = data[:, i, :] # Estimate weights for a single projection meanFF = EFF_denoised[0] FF = EFF_denoised[1:] weights = np.zeros(nrEigenflatfields) x = condTVmean(projection, meanFF, FF, meanDarkfield, weights, downsample) # Dynamic FFC FFeff = np.zeros(meanDarkfield.shape) for j in range(nrEigenflatfields): FFeff = FFeff + x[j] * EFF_denoised[j + 1] tmp = np.divide((projection - meanDarkfield), (EFF_denoised[0] + FFeff)) clean_DFFC[:, i, :] = tmp return [clean_DFFC, EFF, EFF_denoised]
[docs]def normaliser( data: np.array, flats: np.array, darks: np.array, log: bool = True, method: str = "mean", axis: int = 0, **kwargs, ) -> np.ndarray: """Data normalisation module Args: data (np.array): 3d numpy array of raw data. flats (np.array): 2d numpy array for flat field. darks (np.array): 2d numpy array for darks field. log (bool, optional): Take negative log. Defaults to True. method (str, optional): Normalisation method, choose "mean", "median" or "dynamic". Defaults to "mean". axis (int, optional): Define the ANGLES axis. dyn_downsample (int, optional): Parameter for "dynamic" method. Defaults to 2. dyn_iterations (int, optional): Parameter for "dynamic" method. Defaults to 10. Raises: NameError: method error Returns: np.ndarray: 3d numpy array of normalised data """ if np.ndim(data) == 2: raise NameError("Normalisation is implemented for 3d data input") if darks is None: darks = np.zeros(np.shape(flats), dtype="float32") if method is None or method == "mean": flats = np.mean(flats, axis) # mean across flats darks = np.mean(darks, axis) # mean across darks elif method == "median": flats = np.median(flats, axis) # median across flats darks = np.median(darks, axis) # median across darks elif method == "dynamic": # dynamic flat field normalisation according to the paper of Vincent Van Nieuwenhove for key, value in kwargs.items(): if key == "dyn_downsample": dyn_downsample_v = value else: dyn_downsample_v = 2 if key == "dyn_iterations": dyn_iterations_v = value else: dyn_iterations_v = 10 [data_norm, EFF, EFF_filt] = DFFC( data, flats, darks, downsample=dyn_downsample_v, nrPArepetions=dyn_iterations_v, ) else: raise NameError( "Please select an appropriate method for normalisation: mean, median or dynamic" ) if method != "dynamic": denom = flats - darks denom[ (np.where(denom <= 0.0)) ] = 1.0 # remove zeros/negatives in the denominator if any if axis == 1: denom = denom[:, np.newaxis, :] darks = darks[:, np.newaxis, :] nomin = data - darks # get nominator nomin[(np.where(nomin < 0.0))] = 1.0 # remove negatives data_norm = np.true_divide(nomin, denom) if log: # calculate negative log (avoiding of log(0) (= inf) and > 1.0 (negative val)) data_norm[data_norm > 0.0] = -np.log(data_norm[data_norm > 0.0]) data_norm[data_norm < 0.0] = 0.0 # remove negative values # return [data_norm, EFF, EFF_filt] return data_norm
[docs]def autocropper(data, addbox, backgr_pix1): """The method crops 3D projection data in order to reduce the total data size. Method assumes that the object is positioned vertically around the central point of the horizontal detector. It is important since the vertical mid ROI of each projection is used to estimate the background noise levels. Args: data (np.ndarray) The required dimensions: [Projections, detectorsVertical, detectorsHoriz] ! addbox: (int) to add additional pixels in addition to automatically found cropped values, i.e. increasing the cropping region (safety option) backgr_pix1 (int): to create rectangular ROIs to collect noise statistics on both (vertical) sides of each 2D projection """ backgr_pix2 = int(2.5 * backgr_pix1) # usually enough to collect noise statistics [Projections, detectorsVertical, detectorsHoriz] = np.shape(data) horiz_left_indices = np.zeros(Projections).astype(int) horiz_right_indices = np.zeros(Projections).astype(int) vert_up_indices = np.zeros(Projections).astype(int) vert_down_indices = np.zeros(Projections).astype(int) for i in range(0, Projections): proj2D = data[i, :, :] # extract 2D projection detectorsVert_mid = (int)(0.5 * detectorsVertical) # extract two small regions which belong to the background RegionLEFT = proj2D[ detectorsVert_mid - backgr_pix2 : detectorsVert_mid + backgr_pix2, 0:backgr_pix1, ] RegionRIGHT = proj2D[ detectorsVert_mid - backgr_pix2 : detectorsVert_mid + backgr_pix2, -1 - backgr_pix1 : -1, ] ValMean = np.mean(RegionLEFT) + np.mean(RegionRIGHT) # get 1D mean vectors vert_sum = np.mean(proj2D, 1) horiz_sum = np.mean(proj2D, 0) # find the maximum values across the vectors largest_vert_index = (vert_sum == max(vert_sum)).argmax(axis=0) largest_horiz_index = (horiz_sum == max(horiz_sum)).argmax(axis=0) # now we need to find the dips of the "gaussian" moving down from the top if largest_vert_index == 0: min_vert_index = 0 else: min_vert_index = (vert_sum[largest_vert_index::-1] <= ValMean).argmax( axis=0 ) if largest_vert_index == (detectorsVertical - 1): max_vert_index = largest_vert_index + 1 else: max_vert_index = (vert_sum[largest_vert_index:-1] <= ValMean).argmax(axis=0) if largest_horiz_index == 0: min_horiz_index = 0 else: min_horiz_index = (horiz_sum[largest_horiz_index::-1] <= ValMean).argmax( axis=0 ) if largest_horiz_index == (detectorsHoriz - 1): max_horiz_index = largest_horiz_index + 1 else: max_horiz_index = (horiz_sum[largest_horiz_index:-1] <= ValMean).argmax( axis=0 ) # checking the boudaries of the selected indices if min_vert_index != 0: min_vert_index = largest_vert_index - min_vert_index if (min_vert_index - addbox) >= 0: min_vert_index -= addbox if max_vert_index != (detectorsVertical): max_vert_index = largest_vert_index + max_vert_index if (max_vert_index + addbox) < detectorsVertical: max_vert_index += addbox if min_horiz_index != 0: min_horiz_index = largest_horiz_index - min_horiz_index if (min_horiz_index - addbox) >= 0: min_horiz_index -= addbox if max_horiz_index != (detectorsHoriz): max_horiz_index = largest_horiz_index + max_horiz_index if (max_horiz_index + addbox) < detectorsHoriz: max_horiz_index += addbox horiz_left_indices[i] = min_horiz_index horiz_right_indices[i] = max_horiz_index vert_up_indices[i] = min_vert_index vert_down_indices[i] = max_vert_index crop_left_horiz = np.min(horiz_left_indices) crop_right_horiz = np.max(horiz_right_indices) crop_up_vert = np.min(vert_up_indices) crop_down_vert = np.max(vert_down_indices) # Finally time to crop the data cropped_data = data[ :, crop_up_vert:crop_down_vert, crop_left_horiz:crop_right_horiz ] return cropped_data
def _apply_circular_mask(data, recon_mask_radius, axis=2): """Applies a circular mask of a certain radius to zero the values outside tha mask Args: data (cp or np ndarray): reconstructed volume recon_mask_radius (float): radius size Returns: cp or np ndarray: recon volume after mask applied """ recon_size = data.shape[axis] Y, X = xp.ogrid[:recon_size, :recon_size] half_size = recon_size // 2 dist_from_center = xp.sqrt((X - half_size) ** 2 + (Y - half_size) ** 2) if recon_mask_radius <= 1.0: mask = dist_from_center <= half_size - abs( half_size - half_size / recon_mask_radius ) else: mask = dist_from_center <= half_size + abs( half_size - half_size / recon_mask_radius ) data *= mask return data def _check_kwargs(reconstruction, **kwargs): # Iterating over optional parameters: for key, value in kwargs.items(): if key == "recon_mask_radius" and value is not None: _apply_circular_mask(reconstruction, value) return reconstruction
[docs]def circ_mask(X, diameter): # applying a circular mask to the reconstructed image/volume # Make the 'diameter' smaller than 1.0 in order to shrink it obj_shape = np.shape(X) X_masked = np.float32(np.zeros(obj_shape)) if np.ndim(X) == 2: objsize = obj_shape[0] elif np.ndim(X) == 3: objsize = obj_shape[1] else: print("Object input size is wrong for the mask to apply to") c = np.linspace( -(objsize * (1.0 / diameter)) / 2.0, (objsize * (1.0 / diameter)) / 2.0, objsize ) x, y = np.meshgrid(c, c) mask = np.float32(np.array((x**2 + y**2 < (objsize / 2.0) ** 2))) if np.ndim(X) == 3: for z in range(0, obj_shape[0]): X_masked[z, :, :] = np.multiply(X[z, :, :], mask) else: X_masked = np.multiply(X, mask) return X_masked