Source code for toupy.tomo.tv_recons

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Total-Variation (TV) regularised tomographic reconstruction.

Implements the Chambolle-Pock primal-dual algorithm (CP) to solve:

    min_{x >= 0}  (1/2) ||A x - b||^2  +  lambda * TV(x)

where A is the Radon forward operator, b is the (possibly noisy)
sinogram, and TV is the isotropic discrete total-variation.

The algorithm is an exact first-order method and handles the non-smooth
TV term via proximal splitting.  It is well-suited to ptychographic
tomography, where the phase sinogram is noisy or has limited angular
sampling.

Reference:
    Chambolle, A., & Pock, T. (2011). A first-order primal-dual
    algorithm for convex problems with applications to imaging. Journal
    of Mathematical Imaging and Vision, 40(1), 120–145.
    https://doi.org/10.1007/s10107-010-0436-y

    Sidky, E. Y., & Pan, X. (2008). Image reconstruction in circular
    cone-beam computed tomography by constrained, total-variation
    minimization. Physics in Medicine & Biology, 53(17), 4777.
    https://doi.org/10.1088/0031-9155/53/17/014

GPU notes
---------
Both public functions accept ``cuda=True``.  In GPU mode all primal and
dual variables reside on the device as CuPy float32 arrays throughout
the iteration; only the final reconstruction is transferred back to the
host.  The Radon forward and backward operators reuse the CUDA kernels
compiled in :mod:`toupy.tomo.iradon`; no additional compilation is
required.

The step-size estimate is also computed entirely on the GPU via a
five-step power iteration, so the first call is slightly slower than
subsequent calls (kernel reuse).
"""

import warnings
import numpy as np
from skimage.transform import radon, iradon

# ---------------------------------------------------------------------------
# Optional CuPy / CUDA kernel imports — graceful CPU fallback
# ---------------------------------------------------------------------------
try:
    import cupy as cp
    from .iradon import CUDA_AVAILABLE, _fbp_kernel, _radon_kernel
except ImportError:
    CUDA_AVAILABLE = False

__all__ = ["tv_reconstruction", "chambolle_pock_tv"]


# ---------------------------------------------------------------------------
# Array-library-agnostic finite-difference operators
# Implemented with slicing (not np.roll) so the same code runs on both
# NumPy and CuPy arrays.
# ---------------------------------------------------------------------------

def _get_xp(arr):
    """Return cupy if arr is a CuPy array, else numpy."""
    if CUDA_AVAILABLE:
        return cp.get_array_module(arr)
    return np


def _grad2d(u):
    """Forward finite-difference gradient (dy, dx), Neumann BC."""
    xp = _get_xp(u)
    gy = xp.empty_like(u)
    gx = xp.empty_like(u)
    gy[:-1, :] = u[1:, :] - u[:-1, :]
    gy[-1, :]  = 0.0
    gx[:, :-1] = u[:, 1:] - u[:, :-1]
    gx[:, -1]  = 0.0
    return gy, gx


def _div2d(gy, gx):
    """Negative divergence (adjoint of _grad2d), Neumann BC."""
    xp = _get_xp(gy)
    dy = xp.empty_like(gy)
    dy[0, :]    =  gy[0, :]
    dy[1:-1, :] =  gy[1:-1, :] - gy[:-2, :]
    dy[-1, :]   = -gy[-2, :]
    dx = xp.empty_like(gx)
    dx[:, 0]    =  gx[:, 0]
    dx[:, 1:-1] =  gx[:, 1:-1] - gx[:, :-2]
    dx[:, -1]   = -gx[:, -2]
    return dy + dx


# ---------------------------------------------------------------------------
# TV proximal operators (array-library agnostic)
# ---------------------------------------------------------------------------

def _prox_tv_dual(py, px, lam):
    """Project TV dual onto the isotropic l-inf ball of radius lam.

    prox_{sigma * (lam*||·||_{2,1})*}(z) = z / max(1, |z| / lam)
    The step-size sigma is already folded into z before calling.
    """
    xp    = _get_xp(py)
    norm  = xp.sqrt(py**2 + px**2)
    scale = xp.maximum(1.0, norm / lam)
    return py / scale, px / scale


def _prox_l2_data_dual(z, b, sigma):
    """Proximal op of the conjugate of (1/2)||· - b||^2, step sigma.

    Moreau identity:  prox_{sigma * f*}(z) = (z - sigma*b) / (1 + sigma)
    """
    return (z - sigma * b) / (1.0 + sigma)


# ---------------------------------------------------------------------------
# CPU Radon / back-projection wrappers (skimage)
# ---------------------------------------------------------------------------

def _forward_cpu(x, theta):
    return radon(x, theta=theta, circle=False)


def _backward_cpu(sino, theta, n):
    return iradon(sino, theta=theta, filter_name=None, output_size=n, circle=False)


# ---------------------------------------------------------------------------
# GPU Radon / back-projection wrappers (custom CUDA kernels from iradon.py)
# ---------------------------------------------------------------------------
# Convention used here matches skimage circle=False:
#   nbins = ceil(sqrt(2) * n)
# All GPU arrays are float32.
# ---------------------------------------------------------------------------

def _make_trig_gpu(theta_deg):
    """Precompute float32 cos/sin tables on the GPU."""
    th = np.deg2rad(np.asarray(theta_deg, dtype=np.float64)).astype(np.float32)
    return cp.asarray(np.cos(th)), cp.asarray(np.sin(th))


def _forward_gpu(x_gpu, cos_th, sin_th, nbins, nsteps):
    """GPU Radon forward projection.

    Parameters
    ----------
    x_gpu : cupy.ndarray, shape (N, N), float32
    cos_th, sin_th : cupy.ndarray, shape (nangles,), float32
    nbins : int   — number of detector bins (= ceil(sqrt(2)*N) for circle=False)
    nsteps : int  — integration steps along ray

    Returns
    -------
    sino_gpu : cupy.ndarray, shape (nbins, nangles), float32
    """
    N       = x_gpu.shape[0]
    nangles = cos_th.shape[0]
    sino_flat = cp.zeros(nbins * nangles, dtype=cp.float32)

    block = (16, 16, 1)
    grid  = (int(np.ceil(nbins   / 16)),
             int(np.ceil(nangles / 16)), 1)
    _radon_kernel(
        grid, block,
        (x_gpu.ravel(), sino_flat, cos_th, sin_th,
         np.int32(N), np.int32(nbins), np.int32(nangles), np.int32(nsteps)),
    )
    cp.cuda.stream.get_current_stream().synchronize()
    # Kernel stores (nangles * nbins) in angle-major order → transpose to (nbins, nangles)
    return sino_flat.reshape(nangles, nbins).T


def _backward_gpu(sino_gpu, n, cos_th, sin_th):
    """GPU unfiltered back-projection (adjoint of _forward_gpu).

    Parameters
    ----------
    sino_gpu : cupy.ndarray, shape (nbins, nangles), float32
    n : int   — output image side length
    cos_th, sin_th : cupy.ndarray, shape (nangles,), float32

    Returns
    -------
    img_gpu : cupy.ndarray, shape (n, n), float32
    """
    nbins, nangles = sino_gpu.shape
    mid_index = nbins // 2
    img_flat  = cp.zeros(n * n, dtype=cp.float32)

    # Kernel expects sinogram in (nangles * nbins,) angle-major column-major order
    sino_cm = cp.ascontiguousarray(sino_gpu.T).ravel()

    block = (16, 16, 1)
    grid  = (int(np.ceil(n / 16)), int(np.ceil(n / 16)), 1)
    _fbp_kernel(
        grid, block,
        (sino_cm, cos_th, sin_th, img_flat,
         np.int32(n), np.int32(nbins), np.int32(nangles), np.int32(mid_index)),
    )
    cp.cuda.stream.get_current_stream().synchronize()
    return img_flat.reshape(n, n)


# ---------------------------------------------------------------------------
# Step-size estimation via power iteration
# ---------------------------------------------------------------------------

def _estimate_norm_K2_cpu(n, theta, n_iter=5):
    """Estimate ||K||^2 = ||A||^2 + 8 on CPU via power iteration."""
    rng = np.random.default_rng(0)
    v = rng.standard_normal((n, n))
    for _ in range(n_iter):
        Av   = _forward_cpu(v, theta)
        ATAv = _backward_cpu(Av, theta, n)
        norm2 = np.sqrt(np.sum(ATAv**2) / (np.sum(v**2) + 1e-30))
        v = ATAv / (np.linalg.norm(ATAv) + 1e-30)
    return float(norm2) + 8.0


def _estimate_norm_K2_gpu(n, cos_th, sin_th, nbins, nsteps, n_iter=5):
    """Estimate ||K||^2 on GPU via power iteration (all ops stay on device)."""
    v = cp.random.default_rng(0).standard_normal((n, n)).astype(cp.float32)
    for _ in range(n_iter):
        Av   = _forward_gpu(v, cos_th, sin_th, nbins, nsteps)
        ATAv = _backward_gpu(Av, n, cos_th, sin_th)
        norm2 = float(cp.sqrt(cp.sum(ATAv**2) / (cp.sum(v**2) + 1e-30)))
        ATAv_norm = float(cp.linalg.norm(ATAv))
        v = ATAv / (ATAv_norm + 1e-30)
    return norm2 + 8.0


# ---------------------------------------------------------------------------
# Core Chambolle-Pock solver
# ---------------------------------------------------------------------------

[docs] def chambolle_pock_tv( sinogram, theta, lam=1e-2, n_iter=100, output_size=None, cuda=False, verbose=False, ): """ Chambolle-Pock TV-regularised reconstruction. Solves the primal-dual saddle-point problem: .. math:: \\min_{x \\ge 0}\\; \\max_{p,q}\\; \\langle K x,\\, (p, q) \\rangle - G^*(p, q) + F(x) where :math:`K = (A,\\, \\nabla)` stacks the Radon operator and the gradient, :math:`G^*` is the convex conjugate of the data and TV terms, and :math:`F` is the non-negativity indicator. Parameters ---------- sinogram : ndarray, shape (n_pixels, n_angles) Measured sinogram with axes (detector pixel, angle). theta : array_like, shape (n_angles,) Projection angles [degrees]. lam : float, optional TV regularisation weight. Default 1e-2. n_iter : int, optional Number of Chambolle-Pock iterations. Default 100. output_size : int or None, optional Side length of the square reconstruction grid. When ``None`` the value is inferred from the sinogram as ``round(n_pixels / sqrt(2))``, matching ``skimage.transform.radon`` with ``circle=False``. cuda : bool, optional Run on GPU via CuPy. All primal/dual variables reside on the device throughout the iteration; only the final image is transferred back. Falls back to CPU with a warning when CuPy is unavailable. Default False. verbose : bool, optional Print cost every 10 iterations. Default False. Returns ------- x : ndarray, shape (output_size, output_size) Reconstructed image. Always returned as a CPU numpy array. See Also -------- tv_reconstruction : High-level wrapper with FBP warm-start. """ if cuda and not CUDA_AVAILABLE: warnings.warn( "CuPy not available — falling back to CPU TV reconstruction.", stacklevel=2, ) cuda = False theta = np.asarray(theta, dtype=float) sinogram = np.asarray(sinogram, dtype=float) n_pix, n_ang = sinogram.shape if output_size is None: n = int(np.round(n_pix / np.sqrt(2))) else: n = int(output_size) # ------------------------------------------------------------------ # GPU path # ------------------------------------------------------------------ if cuda: nbins = int(np.ceil(np.sqrt(2) * n)) nsteps = int(np.ceil(n * np.sqrt(2))) cos_th, sin_th = _make_trig_gpu(theta) norm_K2 = _estimate_norm_K2_gpu(n, cos_th, sin_th, nbins, nsteps) tau = np.float32(0.9 / np.sqrt(norm_K2)) sigma = np.float32(0.9 / np.sqrt(norm_K2)) # Allocate all variables on GPU as float32 x = cp.zeros((n, n), dtype=cp.float32) x_bar = cp.zeros((n, n), dtype=cp.float32) py = cp.zeros((n, n), dtype=cp.float32) px = cp.zeros((n, n), dtype=cp.float32) # Sinogram dual variable — shape matches forward projection output q = cp.zeros((nbins, n_ang), dtype=cp.float32) b_gpu = cp.asarray(sinogram, dtype=cp.float32) # The measured sinogram may have a different detector size than nbins # (e.g. generated with skimage circle=False giving ceil(sqrt(2)*orig_n)). # Resize q and b to match if necessary. if b_gpu.shape[0] != nbins: # Interpolate sinogram to match GPU forward operator output size from cupyx.scipy.ndimage import zoom as cp_zoom scale = nbins / b_gpu.shape[0] b_gpu = cp_zoom(b_gpu, (scale, 1.0), order=1) for it in range(n_iter): Ax_bar = _forward_gpu(x_bar, cos_th, sin_th, nbins, nsteps) q = _prox_l2_data_dual(q + sigma * Ax_bar, b_gpu, sigma) gy, gx = _grad2d(x_bar) py, px = _prox_tv_dual(py + sigma * gy, px + sigma * gx, lam) AT_q = _backward_gpu(q, n, cos_th, sin_th) div_p = _div2d(py, px) x_new = cp.maximum(x - tau * (AT_q + div_p), 0.0) x_bar = 2.0 * x_new - x x = x_new if verbose and (it % 10 == 0): gy_x, gx_x = _grad2d(x) cost = (0.5 * float(cp.sum((_forward_gpu( x, cos_th, sin_th, nbins, nsteps) - b_gpu)**2)) + lam * float(cp.sum(cp.sqrt(gy_x**2 + gx_x**2)))) print(" iter {:4d} cost = {:.6e}".format(it, cost)) return cp.asnumpy(x) # ------------------------------------------------------------------ # CPU path # ------------------------------------------------------------------ rng = np.random.default_rng(0) v = rng.standard_normal((n, n)) for _ in range(5): Av = _forward_cpu(v, theta) ATAv = _backward_cpu(Av, theta, n) norm2 = np.sqrt(np.sum(ATAv**2) / (np.sum(v**2) + 1e-30)) v = ATAv / (np.linalg.norm(ATAv) + 1e-30) tau = 0.9 / np.sqrt(float(norm2) + 8.0) sigma = tau x = np.zeros((n, n)) x_bar = x.copy() py = np.zeros_like(x) px = np.zeros_like(x) q = np.zeros_like(sinogram) for it in range(n_iter): Ax_bar = _forward_cpu(x_bar, theta) q = _prox_l2_data_dual(q + sigma * Ax_bar, sinogram, sigma) gy, gx = _grad2d(x_bar) py, px = _prox_tv_dual(py + sigma * gy, px + sigma * gx, lam) AT_q = _backward_cpu(q, theta, n) div_p = _div2d(py, px) x_new = np.maximum(x - tau * (AT_q + div_p), 0.0) x_bar = 2.0 * x_new - x x = x_new if verbose and (it % 10 == 0): gy_x, gx_x = _grad2d(x) cost = (0.5 * np.sum((_forward_cpu(x, theta) - sinogram)**2) + lam * np.sum(np.sqrt(gy_x**2 + gx_x**2))) print(" iter {:4d} cost = {:.6e}".format(it, cost)) return x
# --------------------------------------------------------------------------- # High-level wrapper # ---------------------------------------------------------------------------
[docs] def tv_reconstruction( sinogram, theta, lam=1e-2, n_iter=100, output_size=None, init="fbp", cuda=False, verbose=False, ): """ TV-regularised tomographic reconstruction. High-level wrapper around :func:`chambolle_pock_tv` with an optional FBP warm-start and the same sinogram-axis convention used throughout toupy (detector pixels along axis 0). Parameters ---------- sinogram : ndarray, shape (n_pixels, n_angles) Sinogram — same axis convention as :func:`mod_iradon`. theta : array_like, shape (n_angles,) Projection angles in degrees. lam : float, optional TV regularisation weight. Default 1e-2. n_iter : int, optional Number of Chambolle-Pock iterations. Default 100. output_size : int or None, optional Side length of the square reconstruction grid. ``None`` infers from the sinogram (matches ``skimage.transform.radon``). init : {'fbp', 'zeros'}, optional Initialisation strategy. ``'fbp'`` uses a Ram-Lak FBP as the starting point (recommended; faster convergence). ``'zeros'`` starts from the zero image. Default ``'fbp'``. cuda : bool, optional Run on GPU via CuPy. Default False. verbose : bool, optional Print cost every 10 iterations. Default False. Returns ------- recons : ndarray, shape (n, n) Reconstructed image. Notes ----- The TV weight *lam* should be tuned to the noise level. A cross-validation or L-curve approach is recommended for quantitative work. Examples -------- >>> import numpy as np >>> from skimage.transform import radon >>> from toupy.simulation import phantom >>> from toupy.tomo import tv_reconstruction >>> img = phantom(256) >>> theta = np.linspace(0, 180, 180, endpoint=False) >>> sino = radon(img, theta=theta, circle=False) >>> recons = tv_reconstruction(sino, theta, lam=0.01, n_iter=50) """ theta = np.asarray(theta, dtype=float) sinogram = np.asarray(sinogram, dtype=float) return chambolle_pock_tv( sinogram, theta, lam=lam, n_iter=n_iter, output_size=output_size, cuda=cuda, verbose=verbose, )