Source code for toupy.tomo.iradon

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Filtered Back-Projection and SART reconstruction.

GPU path  : CuPy + a custom CUDA kernel
CPU path  : pure NumPy/SciPy, identical to the original mod_iradon.

The public API is unchanged:
    compute_angle_weights, compute_filter,
    mod_iradon, mod_iradon_cuda,
    backprojector, reconsSART
"""

# standard packages
import warnings
warnings.filterwarnings("ignore")

# third party packages
import numpy as np
from scipy.fft import fft, ifft, fftfreq
from scipy.interpolate import interp1d
from skimage.transform import iradon_sart

# local package
from ..utils import create_circle
from ..utils.plot_utils import isnotebook

# ---------------------------------------------------------------------------
# Optional CuPy import — graceful degradation to CPU if unavailable
# ---------------------------------------------------------------------------
try:
    import cupy as cp
    from cupy import RawKernel
    CUDA_AVAILABLE = True
except ImportError:
    CUDA_AVAILABLE = False
    print("CuPy not found: GPU reconstruction unavailable, falling back to CPU.")

__all__ = [
    "compute_angle_weights",
    "compute_filter",
    "mod_iradon",
    "mod_iradon_cuda",
    "backprojector",
    "reconsSART",
]

# ---------------------------------------------------------------------------
# CUDA kernel for filtered back-projection
# ---------------------------------------------------------------------------
# Each CUDA thread handles one (row, col) output pixel.
# For every projection angle the kernel:
#   1. computes the detector coordinate t via dot-product with (cos θ, -sin θ)
#   2. does linear interpolation on the filtered sinogram row
#   3. accumulates into the output image
#
# The kernel is compiled once at module load time (if CuPy is present) and
# reused across all calls — compilation takes ~200 ms but happens only once
# per Python session.
# ---------------------------------------------------------------------------
_FBP_KERNEL_SRC = r"""
extern "C" __global__
void fbp_backproject(
    const float* __restrict__ sino,   // filtered sinogram (nbins x nangles), col-major
    const float* __restrict__ cos_th, // cos(theta) for each angle  (nangles,)
    const float* __restrict__ sin_th, // sin(theta) for each angle  (nangles,)
    float*       __restrict__ img,    // output image (output_size x output_size)
    const int    output_size,
    const int    nbins,
    const int    nangles,
    const int    mid_index            // detector centre index
)
{
    // pixel coordinates
    const int col = blockIdx.x * blockDim.x + threadIdx.x;
    const int row = blockIdx.y * blockDim.y + threadIdx.y;
    if (col >= output_size || row >= output_size) return;

    const int half = output_size / 2;
    const float xpr = (float)(col - half);   // centred x
    const float ypr = (float)(row - half);   // centred y

    float acc = 0.0f;
    for (int a = 0; a < nangles; ++a) {
        // detector coordinate of this pixel for angle a
        float t = ypr * cos_th[a] - xpr * sin_th[a] + (float)mid_index;

        // linear interpolation — clamp to valid range
        int   t0 = (int)floorf(t);
        float dt = t - (float)t0;
        float v0 = (t0 >= 0 && t0 < nbins)     ? sino[a * nbins + t0]     : 0.0f;
        float v1 = (t0+1 >= 0 && t0+1 < nbins) ? sino[a * nbins + (t0+1)] : 0.0f;
        acc += v0 * (1.0f - dt) + v1 * dt;
    }
    img[row * output_size + col] = acc;
}
"""

# Compile once
if CUDA_AVAILABLE:
    _fbp_kernel = RawKernel(_FBP_KERNEL_SRC, "fbp_backproject")


# ---------------------------------------------------------------------------
# CUDA kernel for forward projection (Radon transform)
# ---------------------------------------------------------------------------
# Each thread computes one (detector_bin, angle) sinogram entry.
# It walks along the projection ray and accumulates image values
# using bilinear interpolation.
# ---------------------------------------------------------------------------
_RADON_KERNEL_SRC = r"""
extern "C" __global__
void radon_project(
    const float* __restrict__ img,    // input image (N x N), row-major
    float*       __restrict__ sino,   // output sinogram (nbins x nangles), col-major
    const float* __restrict__ cos_th, // cos(theta) for each angle
    const float* __restrict__ sin_th, // sin(theta) for each angle
    const int    N,                   // image side length
    const int    nbins,               // number of detector bins
    const int    nangles,
    const int    nsteps               // number of integration steps along ray
)
{
    const int bin   = blockIdx.x * blockDim.x + threadIdx.x;
    const int angle = blockIdx.y * blockDim.y + threadIdx.y;
    if (bin >= nbins || angle >= nangles) return;

    const float half_img  = (float)(N - 1) * 0.5f;
    const float half_bins = (float)(nbins - 1) * 0.5f;
    const float step      = 1.0f;            // step size in pixels
    const float t         = (float)bin - half_bins;

    // ray direction (perpendicular to projection direction)
    const float dx =  cos_th[angle];
    const float dy = -sin_th[angle];
    // ray origin: offset along detector axis
    float ox = t * sin_th[angle];  // projection direction = (sin θ, cos θ)
    float oy = t * cos_th[angle];

    float acc = 0.0f;
    const float half_steps = (float)(nsteps - 1) * 0.5f;
    for (int s = 0; s < nsteps; ++s) {
        float sx = ox + (s - half_steps) * dx + half_img;
        float sy = oy + (s - half_steps) * dy + half_img;

        // bilinear interpolation
        int   x0 = (int)floorf(sx), y0 = (int)floorf(sy);
        float fx = sx - x0,         fy = sy - y0;
        float v  = 0.0f;
        if (x0 >= 0 && x0 < N && y0 >= 0 && y0 < N)
            v += img[y0 * N + x0] * (1.0f - fx) * (1.0f - fy);
        if (x0+1 < N && y0 >= 0 && y0 < N)
            v += img[y0 * N + (x0+1)] * fx * (1.0f - fy);
        if (x0 >= 0 && x0 < N && y0+1 < N)
            v += img[(y0+1) * N + x0] * (1.0f - fx) * fy;
        if (x0+1 < N && y0+1 < N)
            v += img[(y0+1) * N + (x0+1)] * fx * fy;
        acc += v;
    }
    sino[angle * nbins + bin] = acc * step;
}
"""

if CUDA_AVAILABLE:
    _radon_kernel = RawKernel(_RADON_KERNEL_SRC, "radon_project")


# ---------------------------------------------------------------------------
# Helper: FFT-based filtering on GPU
# ---------------------------------------------------------------------------
def _filter_sinogram_gpu(sinogram_gpu, filter_type, derivatives, freqcutoff):
    """
    Apply the ramp (or other) filter to a sinogram already on the GPU.

    Parameters
    ----------
    sinogram_gpu : cupy.ndarray  shape (nbins, nangles)
    filter_type, derivatives, freqcutoff : same semantics as compute_filter

    Returns
    -------
    filtered : cupy.ndarray  shape (nbins, nangles)  float32
    """
    nbins = sinogram_gpu.shape[0]
    # compute filter on CPU, transfer once
    h = compute_filter(nbins, filter_type=filter_type,
                       derivatives=derivatives, freqcutoff=freqcutoff)
    pad = h.shape[0]

    # pad sinogram
    sino_pad = cp.zeros((pad, sinogram_gpu.shape[1]), dtype=cp.complex64)
    sino_pad[:nbins] = sinogram_gpu.astype(cp.complex64)

    # transfer filter to GPU and broadcast over angles
    h_gpu = cp.asarray(h.astype(np.complex64))          # (pad, 1)
    filtered = cp.fft.ifft(cp.fft.fft(sino_pad, axis=0) * h_gpu, axis=0)
    return cp.real(filtered[:nbins]).astype(cp.float32)


# ---------------------------------------------------------------------------
# Public functions
# ---------------------------------------------------------------------------

[docs] def compute_angle_weights(theta): """ Compute per-angle weights for non-equally-spaced angular distributions. Parameters ---------- theta : ndarray Angles in degrees. Returns ------- weights : ndarray Weight for each angle, proportional to the angular distance to its neighbours. Forked from odtbrain.util.compute_angle_weights_1d. """ theta = theta.flatten() - theta.min() sortargs = np.argsort(theta) sorttheta = theta[sortargs] diff_theta = (np.roll(sorttheta, -1) - np.roll(sorttheta, 1)) % 180 weights = diff_theta / np.sum(diff_theta) * diff_theta.size unsortweights = np.zeros_like(weights) unsortweights[sortargs] = weights return unsortweights
[docs] def compute_filter(nbins, filter_type="ram-lak", derivatives=False, freqcutoff=1): """ Compute the frequency-domain filter for FBP reconstruction. Parameters ---------- nbins : int Number of detector bins (sinogram rows). filter_type : str, optional One of ``ram-lak``, ``shepp-logan``, ``cosine``, ``hamming``, ``hann``. Default is ``ram-lak``. derivatives : bool, optional Use a Hilbert filter suited for derivative projections. Default False. freqcutoff : float, optional Normalised frequency cutoff in [0, 1]. Default 1 (no cutoff). Returns ------- fourier_filter : ndarray shape (projection_size_padded, 1) complex64 """ projection_size_padded = max(64, int(2 ** np.ceil(np.log2(2 * nbins)))) f = fftfreq(projection_size_padded).reshape(-1, 1) omega = 2 * np.pi * f fourier_filter = ( np.ones_like(f, dtype=np.complex64) if derivatives else 2 * np.abs(f).astype(np.complex64) ) if filter_type == "ram-lak": pass elif filter_type == "shepp-logan": fourier_filter[1:] *= np.sinc(omega[1:] / (2 * freqcutoff * np.pi)) elif filter_type == "cosine": fourier_filter[1:] *= np.cos(omega[1:] / (2 * freqcutoff)) elif filter_type == "hamming": fourier_filter[1:] *= 0.54 + 0.46 * np.cos(omega[1:] / freqcutoff) elif filter_type == "hann": fourier_filter[1:] *= (1 + np.cos(omega[1:] / freqcutoff)) / 2 elif filter_type is None: fourier_filter[:] = 1 else: raise ValueError("Unknown filter: {}".format(filter_type)) fourier_filter[np.abs(omega) > np.pi * freqcutoff] = 0 if derivatives: fourier_filter = np.sign(f) * fourier_filter / (1j * np.pi) return fourier_filter
[docs] def mod_iradon( radon_image, theta=None, output_size=None, filter_type="ram-lak", derivatives=False, interpolation="linear", circle=False, freqcutoff=1, ): """ Inverse Radon transform — CPU path (pure NumPy/SciPy). Reconstruct an image from its Radon transform using filtered back-projection. Parameters ---------- radon_image : ndarray shape (nbins, nangles) Sinogram. Each column corresponds to one projection angle. theta : ndarray, optional Projection angles in degrees. Defaults to ``nangles`` evenly-spaced values in [0, 180). output_size : int, optional Side length of the square output image. filter_type : str, optional See :func:`compute_filter`. derivatives : bool, optional Set True if the sinogram contains projection derivatives. interpolation : str, optional ``linear`` (default), ``nearest``, or ``cubic``. circle : bool, optional Zero pixels outside the inscribed circle. freqcutoff : float, optional Normalised frequency cutoff. Returns ------- reconstructed : ndarray shape (output_size, output_size) """ if radon_image.ndim != 2: raise ValueError("The input image must be 2-D") if theta is None: m, n = radon_image.shape theta = np.linspace(0, 180, n, endpoint=False) else: theta = np.asarray(theta) if len(theta) != radon_image.shape[1]: raise ValueError("theta length does not match the number of projections.") if interpolation not in ("linear", "nearest", "cubic"): raise ValueError("Unknown interpolation: {}".format(interpolation)) if not output_size: output_size = (radon_image.shape[0] if circle else int(np.floor(np.sqrt(radon_image.shape[0] ** 2 / 2.0)))) th = np.deg2rad(theta) fourier_filter = compute_filter(radon_image.shape[0], filter_type=filter_type, derivatives=derivatives, freqcutoff=freqcutoff) pad_width = ((0, fourier_filter.shape[0] - radon_image.shape[0]), (0, 0)) img = np.pad(radon_image, pad_width, mode="constant", constant_values=0) projection = fft(img, axis=0) * fourier_filter radon_filtered = np.real(ifft(projection, axis=0))[:radon_image.shape[0], :] reconstructed = np.zeros((output_size, output_size)) mid_index = radon_image.shape[0] // 2 [X, Y] = np.mgrid[0:output_size, 0:output_size] xpr = X - output_size // 2 ypr = Y - output_size // 2 x = np.arange(radon_filtered.shape[0]) - mid_index for i in range(len(theta)): t = ypr * np.cos(th[i]) - xpr * np.sin(th[i]) if interpolation == "linear": backprojected = np.interp(t, x, radon_filtered[:, i], left=0, right=0) else: backprojected = interp1d(x, radon_filtered[:, i], kind=interpolation, bounds_error=False, fill_value=0)(t) reconstructed += backprojected if circle: radius = output_size // 2 mask = (xpr ** 2 + ypr ** 2) <= radius ** 2 reconstructed[~mask] = 0.0 return reconstructed * np.pi / (2 * len(th))
[docs] def mod_iradon_cuda( radon_image, theta=None, output_size=None, filter_type="ram-lak", derivatives=False, circle=False, freqcutoff=1, ): """ Inverse Radon transform — CUDA/GPU path. Falls back to :func:`mod_iradon` automatically if CuPy is unavailable. Parameters ---------- radon_image : ndarray shape (nbins, nangles) Sinogram on CPU. Converted to float32 internally. theta : ndarray, optional Projection angles in degrees. output_size : int, optional Side length of the square output image. filter_type : str, optional See :func:`compute_filter`. derivatives : bool, optional Set True if the sinogram contains projection derivatives. circle : bool, optional Zero pixels outside the inscribed circle. freqcutoff : float, optional Normalised frequency cutoff. Returns ------- reconstructed : ndarray shape (output_size, output_size) float32 """ if not CUDA_AVAILABLE: warnings.warn("CuPy unavailable — falling back to CPU reconstruction.") return mod_iradon(radon_image, theta=theta, output_size=output_size, filter_type=filter_type, derivatives=derivatives, circle=circle, freqcutoff=freqcutoff) if radon_image.ndim != 2: raise ValueError("The input image must be 2-D") if theta is None: m, n = radon_image.shape theta = np.linspace(0, 180, n, endpoint=False) else: theta = np.asarray(theta) if len(theta) != radon_image.shape[1]: raise ValueError("theta length does not match the number of projections.") if not output_size: output_size = (radon_image.shape[0] if circle else int(np.floor(np.sqrt(radon_image.shape[0] ** 2 / 2.0)))) th = np.deg2rad(theta).astype(np.float32) cos_th = np.cos(th).astype(np.float32) sin_th = np.sin(th).astype(np.float32) nbins = radon_image.shape[0] nangles = radon_image.shape[1] mid_index = nbins // 2 # --- Upload sinogram and apply filter on GPU --- sino_gpu = cp.asarray(radon_image.astype(np.float32)) # (nbins, nangles) sino_filt = _filter_sinogram_gpu(sino_gpu, filter_type, derivatives, freqcutoff) # Make column-major layout so the kernel can stride over angles efficiently sino_filt = cp.ascontiguousarray(sino_filt.T).ravel() # (nangles * nbins,) # --- Allocate output --- img_gpu = cp.zeros(output_size * output_size, dtype=cp.float32) cos_th_gpu = cp.asarray(cos_th) sin_th_gpu = cp.asarray(sin_th) # --- Launch kernel --- block = (16, 16, 1) grid = ( int(np.ceil(output_size / block[0])), int(np.ceil(output_size / block[1])), 1, ) _fbp_kernel( grid, block, (sino_filt, cos_th_gpu, sin_th_gpu, img_gpu, np.int32(output_size), np.int32(nbins), np.int32(nangles), np.int32(mid_index)), ) cp.cuda.stream.get_current_stream().synchronize() # --- Retrieve and scale --- reconstructed = cp.asnumpy(img_gpu).reshape(output_size, output_size) reconstructed *= np.pi / (2 * nangles) if circle: half = output_size // 2 Y, X = np.ogrid[-half:output_size - half, -half:output_size - half] mask = X ** 2 + Y ** 2 <= half ** 2 reconstructed[~mask] = 0.0 return reconstructed
[docs] def backprojector(sinogram, theta, **params): """ Wrapper that dispatches to the GPU or CPU FBP implementation. Parameters ---------- sinogram : ndarray shape (nbins, nangles) theta : ndarray params : dict ``params["opencl"]`` — repurposed as ``params["cuda"]``: set True to use the CUDA path. The key ``opencl`` is still accepted as an alias for backwards compatibility. All other keys are forwarded to the chosen implementation. Returns ------- recons : ndarray shape (output_size, output_size) """ params.setdefault("weight_angles", False) # accept both "cuda" and the legacy "opencl" key use_cuda = params.get("cuda", params.get("opencl", False)) if params["weight_angles"]: weights = compute_angle_weights(theta).reshape(1, -1) sinogram = sinogram * weights shared = dict( theta = theta, output_size = sinogram.shape[0], filter_type = params["filtertype"], derivatives = params["derivatives"], circle = params["circle"], freqcutoff = params["freqcutoff"], ) if use_cuda: return mod_iradon_cuda(sinogram, **shared) else: return mod_iradon(sinogram, **shared)
[docs] def reconsSART( sinogram, theta, num_iter=2, FBPinitial_guess=True, relaxation_params=0.15, **params ): """ Tomographic reconstruction with the SART algorithm. Parameters ---------- sinogram : ndarray shape (nbins, nangles) theta : ndarray num_iter : int, optional Number of SART iterations. Default 2. FBPinitial_guess : bool, optional Seed SART with an FBP reconstruction. Default True. relaxation_params : float, optional SART relaxation parameter. Default 0.15. Returns ------- recons_sart : ndarray """ theta = np.float64(theta) sinogram = np.float64(sinogram) circle = params["circle"] if params.get("weight_angles", False): weights = compute_angle_weights(theta).reshape(1, -1) sinogram = sinogram * weights if FBPinitial_guess: print("Calculating the initial guess for SART using FBP") reconsFBP = backprojector(sinogram, theta, **params) reconsFBP = np.float64(reconsFBP) print("Done. Starting SART") recons_sart = iradon_sart(sinogram, theta=theta, image=reconsFBP, relaxation=relaxation_params) else: recons_sart = iradon_sart(sinogram, theta=theta, relaxation=relaxation_params) print("Starting iterative reconstruction:") for ii in range(num_iter): print("Iteration {}".format(ii + 1)) recons_sart = iradon_sart(sinogram, theta=theta, image=recons_sart, relaxation=relaxation_params) if circle: recons_circle = create_circle(recons_sart) recons_sart = recons_sart * recons_circle return recons_sart