# Copyright (c) Thomas Else 2023-25.
# License: MIT
from typing import Sequence
import numpy as np
from pylops import MatrixMult
from tqdm.auto import tqdm
import warnings
from .cuda_implementation import get_model as get_model_gpu_single_c
from .cuda_implementation_refraction import get_model as get_model_gpu_double_c
from .numpy_implementation import get_model as get_model_cpu_single_c
from .. import ReconstructionAlgorithm
from ...core.image_structures.pa_time_data import PATimeSeries
from ... import PAData
try:
cuda_enabled = True
import cupy as cp
except:
cuda_enabled = False
class ModelBasedReconstruction(ReconstructionAlgorithm):
"""Model based reconstruction algorithm processor."""
[docs]
def generate_model(
self, detx, dety, fs, dx, nx, x_0, nt, gpu=True, cache=False, **kwargs
):
"""
Parameters
----------
detx
dety
fs
dx
nx
x_0
nt
gpu
cache
kwargs.
Returns
-------
"""
import pylops
from pylops.optimization.leastsquares import regularized_inversion
from pylops.signalprocessing import Convolve1D, Convolve2D
if gpu:
get_model_single_c = get_model_gpu_single_c
get_model_double_c = get_model_gpu_double_c
else:
get_model_double_c = get_model_cpu_single_c
get_model_single_c = get_model_cpu_single_c
model_type = self.custom_params.get("model_type", "single_sos")
regulariser = self.custom_params.get(
"regulariser", self.custom_params.get("regularizer", None)
)
if regulariser not in ["identity", "laplacian", "TV"]:
raise ValueError("Invalid regulariser.")
reg_lambda = self.custom_params.get("reg_lambda", 1)
iter_lim = self.custom_params.get("iter_lim", 10)
# generate a model.
if model_type == "single_sos":
c = kwargs.get("c", None) or self.custom_params.get("c")
dl = c / fs
model_matrix = get_model_single_c(
detx, dety, dl, dx, nx, x_0, nt, cache=cache
)
elif model_type == "refraction":
c0 = kwargs.get("c0", None) or self.custom_params.get("c0")
c1 = kwargs.get("c1", None) or self.custom_params.get("c1")
y_cutoff = kwargs.get("y_cutoff", None) or self.custom_params.get(
"y_cutoff"
)
model_matrix = get_model_double_c(
detx, dety, c0 / fs, c1 / fs, y_cutoff, dx, nx, x_0, nt, cache=cache
)
else:
raise NotImplementedError(f"Model {model_type} is unavailable.")
self._raw_model = model_matrix
irf = kwargs.get("irf_model", None)
if irf is None:
irf = self.custom_params.get("irf_model", None)
ndet = detx.shape[0]
if irf is not None:
convolve_irf = Convolve1D(
(ndet, nt), irf, axis=1, offset=nt // 2, dtype=np.float32
)
full_model = convolve_irf @ MatrixMult(model_matrix)
else:
full_model = MatrixMult(model_matrix)
inv_args = {}
if regulariser == "identity":
reg = pylops.Identity(nx * nx)
elif regulariser == "laplacian":
if gpu:
reg_mat = cp.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
else:
reg_mat = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
reg = Convolve2D([nx, nx], reg_mat)
elif regulariser == "TV":
reg = [
pylops.FirstDerivative(
dims=(nx, nx), axis=0, edge=True, kind="backward", dtype=np.float32
),
pylops.FirstDerivative(
dims=(nx, nx), axis=1, edge=True, kind="backward", dtype=np.float32
),
]
inv_args["RegsL2"] = [pylops.Identity(nx * nx)]
inv_args["epsRL2s"] = [reg_lambda[1]]
inv_args["mu"] = kwargs.get("mu", 1.0)
inv_args["tol"] = kwargs.get("tol", 1e-10)
inv_args["tau"] = kwargs.get("tau", 1.0)
else:
raise NotImplementedError(f"Regulariser {regulariser} is not available.")
self._model_matrix = full_model, reg
if regulariser in ["identity", "laplacian"]:
if gpu:
inv_args["niter"] = iter_lim
inv_args["tol"] = 1e-6
else:
inv_args["iter_lim"] = iter_lim
return lambda y: regularized_inversion(
full_model, y, [reg], epsRs=[reg_lambda], show=False, **inv_args
)[0]
else:
self._full_model = full_model
self._reg = reg
self._niter_outer = iter_lim[0]
self._niter_inner = iter_lim[1]
self._epsRL1s = [reg_lambda[0], reg_lambda[0]]
self._inv_args = inv_args
return lambda y: pylops.optimization.sparsity.splitbregman(
full_model,
y,
reg,
niter_outer=iter_lim[0],
niter_inner=iter_lim[1],
epsRL1s=[reg_lambda[0], reg_lambda[0]],
show=False,
**inv_args,
)[0]
[docs]
def __init__(
self,
n_pixels: Sequence[int],
field_of_view: Sequence[float],
kwargs_model=None,
pa_example: "PAData" = None,
**kwargs,
):
if kwargs.get("regulariser", None) is None:
kwargs["regulariser"] = "identity"
super().__init__(n_pixels, field_of_view, **kwargs)
self._model_matrix = None
self._raw_model = None
if kwargs_model is None:
kwargs_model = {}
if pa_example is not None:
kwargs_model["geometry"] = pa_example.get_scan_geometry()
kwargs_model["fs"] = pa_example.get_sampling_frequency()
kwargs_model["nt"] = pa_example.get_time_series().shape[-1]
kwargs_model["c"] = pa_example.get_speed_of_sound()
kwargs_model["irf_model"] = pa_example.get_impulse_response()[:]
# Note that super init puts kwargs in self.custom_params
if "geometry" in kwargs_model:
# identify x and y:
detectors = kwargs_model["geometry"]
indices = []
for i in range(3):
if np.any(kwargs_model["geometry"][:, i] != 0.0):
indices.append(i)
self._indices = indices
detx = detectors[:, indices[0]]
dety = detectors[:, indices[1]]
fs = kwargs_model["fs"]
self._fs = fs
nx = n_pixels[indices[0]]
self._nx = nx
dx = field_of_view[indices[0]] / (nx - 1)
self._dx = dx
x_0 = -field_of_view[indices[0]] / 2
self._x_0 = x_0
gpu = kwargs_model.get("gpu", cuda_enabled)
cache = kwargs_model.get("cache", False)
self._model = self.generate_model(
detx, dety, dx=dx, nx=nx, x_0=x_0, gpu=gpu, cache=cache, **kwargs_model
)
else:
self._fs = None
self._nx = None
self._dx = None
self._x_0 = None
self._model = None
self._indices = None
warnings.warn(
"Model matrix has not been precomputed, as insufficient information was supplied."
)
warnings.warn(
"This class is experimental. If you would like to contribute to further development of this "
"approach, please get in touch."
)
[docs]
def pre_prepare_data(self, x: PATimeSeries):
"""
Parameters
----------
x
"""
# really just for speed of sound setting
pass
[docs]
def reconstruct(
self,
signal: np.ndarray,
fs: float,
geometry: np.ndarray,
n_pixels: Sequence[int],
field_of_view: Sequence[float],
speed_of_sound=None,
**kwargs,
) -> np.ndarray:
"""
Parameters
----------
signal
fs
geometry
n_pixels
field_of_view
speed_of_sound
kwargs.
Returns
-------
"""
gpu = self.custom_params.get("gpu", cuda_enabled)
if self._model is not None:
model = self._model
fs = self._fs
nx = self._nx
dx = self._dx
x_0 = self._x_0
indices = self._indices
else:
detectors = geometry
indices = []
for i in range(3):
if np.all(kwargs["geometry"][:, i] != 0.0):
indices.append(i)
assert len(indices) == 2
detx = detectors[:, indices[0]]
dety = detectors[:, indices[1]]
do_irf = self.custom_params.get("do_irf", True)
irf = kwargs["irf"] if do_irf else None
nx = n_pixels[indices[0]]
dx = field_of_view[indices[0]] / (nx - 1)
x_0 = -field_of_view[indices[0]] / 2
nt = signal.shape[-1]
model = self.generate_model(
detx,
dety,
fs,
dx,
nx,
x_0,
nt,
irf_model=irf,
gpu=gpu,
c=speed_of_sound,
)
if gpu:
t = cp.array(signal)
else:
t = np.array(signal)
or_shape = t.shape
output_shape = [1, 1, 1]
for i in indices:
output_shape[i] = nx
# Reverse output shape to be (z, y, x) order
output_shape = tuple(output_shape[::-1])
t = t.reshape((np.prod(t.shape[:-2]),) + t.shape[-2:])
output = np.zeros((t.shape[0],) + output_shape)
for i, t0 in enumerate(tqdm(t)):
result = model(t0.flatten()).reshape(output_shape)
if gpu:
output[i] = result.get()
else:
output[i] = result
return output.reshape(or_shape[:-2] + output.shape[-3:])
[docs]
@staticmethod
def get_algorithm_name() -> str:
"""
Returns
-------
"""
return "Model Based Reconstruction"