#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date : 2026-01-01
# Author : Lancelot PINCET
# GitHub : https://github.com/LancelotPincet
# Library : funcLP
# %% Libraries
import numpy as np
from scipy.interpolate import RectBivariateSpline, LSQBivariateSpline, InterpolatedUnivariateSpline
from funclp import Function, Parameter, ufunc
from corelp import rfrom
bspline3d, bspline3d_dx, bspline3d_dy, bspline3d_dz, get_mean, get_amp, get_offset = rfrom("._splines", "bspline3d", "bspline3d_dx", "bspline3d_dy", "bspline3d_dz", "get_mean", "get_amp", "get_offset")
bspline3d, get_mean, get_amp, get_offset = rfrom("._splines", "bspline3d", "get_mean", "get_amp", "get_offset")
# %% Parameters
def mux(res, *vars) :
return get_mean(res, vars[0])
def muy(res, *vars) :
return get_mean(res, vars[1])
def muz(res, *vars) :
return get_mean(res, vars[2])
def amp(res, *vars) :
return get_amp(res)
def offset(res, *vars) :
return get_offset(res)
# %% Function
[docs]
class Spline3D(Function):
def __init__(self, model2interp=None, x2interp=None, y2interp=None, z2interp=None, kx=3, ky=3, kz=3, /, **kwargs):
if kx > 5 or ky > 5 or kz > 5:
raise ValueError('Spline of order higher than 5 is not implemented')
if model2interp is not None and x2interp is not None and y2interp is not None and z2interp is not None:
kx, ky, kz = int(kx), int(ky), int(kz)
# Flatten coordinate arrays — accepts any shape (broadcastable inputs)
model2interp = np.asarray(model2interp)
if model2interp.size == x2interp.size and model2interp.size == y2interp.size :
x2interp = np.asarray(x2interp)[0:1, 0:1, :]
y2interp = np.asarray(y2interp)[0:1, :, 0:1]
z2interp = np.asarray(z2interp)[:, 0:1, 0:1]
x2interp = np.asarray(x2interp).ravel()
y2interp = np.asarray(y2interp).ravel()
z2interp = np.asarray(z2interp).ravel()
nx, ny, nz = len(x2interp), len(y2interp), len(z2interp)
# model2interp must be (nx, ny, nz) after squeezing broadcast dims
if model2interp.shape != (nx, ny, nz):
raise ValueError(
f'model2interp must have shape (nx, ny, nz) = '
f'({nx}, {ny}, {nz}), got {model2interp.shape}'
)
# Step 1: fit each z-slice (nx, ny) with RectBivariateSpline
sp0 = RectBivariateSpline(x2interp, y2interp, model2interp[:, :, 0], kx=kx, ky=ky)
tx, ty, _ = sp0.tck
tx = tx.astype(np.float32)
ty = ty.astype(np.float32)
nx_b = len(tx) - kx - 1
ny_b = len(ty) - ky - 1
all_coeffs = np.zeros((nz, ny_b, nx_b), dtype=np.float32)
all_coeffs[0] = sp0.tck[2][:nx_b * ny_b].reshape(ny_b, nx_b)
for i in range(1, nz):
spi = RectBivariateSpline(x2interp, y2interp, model2interp[:, :, i], kx=kx, ky=ky)
all_coeffs[i] = spi.tck[2][:nx_b * ny_b].reshape(ny_b, nx_b)
# Step 2: fit 1D splines along z for each (iy, ix) coefficient position
sp_z0 = InterpolatedUnivariateSpline(z2interp, all_coeffs[:, 0, 0], k=kz)
tz = sp_z0._eval_args[0].astype(np.float32)
nz_b = len(tz) - kz - 1
coeffs_cube = np.zeros((nz_b, ny_b, nx_b), dtype=np.float32)
for iy in range(ny_b):
for ix in range(nx_b):
sp_z = InterpolatedUnivariateSpline(z2interp, all_coeffs[:, iy, ix], k=kz)
coeffs_cube[:, iy, ix] = sp_z.get_coeffs()
kwargs.update(dict(tx=tx, ty=ty, tz=tz, coeffs=coeffs))
kwargs.update(dict(kx=kx, ky=ky, kz=kz))
super().__init__(**kwargs)
@ufunc(
variables=["x", "y", "z"],
parameters=[
Parameter("mux", 0., estimate=mux),
Parameter("muy", 0., estimate=muy),
Parameter("muz", 0., estimate=muz),
Parameter("amp", 1., estimate=amp),
Parameter("offset", 0., estimate=offset),
Parameter("kx", 3),
Parameter("ky", 3),
Parameter("kz", 3),
],
constants=["tx", "ty", "tz", "coeffs"],
)
def function(x, y, z, /, mux=0., muy=0., muz=0., amp=1., offset=0., kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None) :
return amp * bspline3d(tx, ty, tz, coeffs, kx, ky, kz, x-mux, y-muy, z-muz) + offset
@ufunc(constants=["tx", "ty", "tz", "coeffs"])
def d_mux(x, y, z, /, mux, muy, muz, amp, offset, kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None):
return -amp * bspline3d_dx(tx, ty, tz, coeffs, kx, ky, kz, x - mux, y - muy, z - muz)
@ufunc(constants=["tx", "ty", "tz", "coeffs"])
def d_muy(x, y, z, /, mux, muy, muz, amp, offset, kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None):
return -amp * bspline3d_dy(tx, ty, tz, coeffs, kx, ky, kz, x - mux, y - muy, z - muz)
@ufunc(constants=["tx", "ty", "tz", "coeffs"])
def d_muz(x, y, z, /, mux, muy, muz, amp, offset, kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None):
return -amp * bspline3d_dz(tx, ty, tz, coeffs, kx, ky, kz, x - mux, y - muy, z - muz)
@ufunc(constants=["tx", "ty", "tz", "coeffs"])
def d_amp(x, y, z, /, mux, muy, muz, amp, offset, kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None):
return bspline3d(tx, ty, tz, coeffs, kx, ky, kz, x - mux, y - muy, z - muz)
@ufunc(constants=["tx", "ty", "tz", "coeffs"])
def d_offset(x, y, z, /, mux, muy, muz, amp, offset, kx=3, ky=3, kz=3, tx=None, ty=None, tz=None, coeffs=None):
return np.float32(1.0)
def _cpu_assembly_extra_imports_source(self, estimator, function_name, estimator_name, distribution_name, parameters):
return "from funclp.modules.Function_LP._functions.splines._splines import bspline3d, bspline3d_dx, bspline3d_dy, bspline3d_dz"
def _cpu_assembly_model_setup_source(self, model_params, parameters):
return '''block_mux = mux[model]
block_muy = muy[model]
block_muz = muz[model]
block_amp = amp[model]
block_offset = offset[model]
block_kx = kx[model]
block_ky = ky[model]
block_kz = kz[model]'''
def _cpu_assembly_model_eval_source(self, inputs_scalar):
return ''' base = bspline3d(tx, ty, tz, coeffs, block_kx, block_ky, block_kz, point_x - block_mux, point_y - block_muy, point_z - block_muz)
mod = block_amp * base + block_offset
dev = deviance_scalar(point_raw_data, mod, point_weight)
los = loss_scalar(point_raw_data, mod, point_weight)
fis = fisher_scalar(point_raw_data, mod, point_weight)
chi_local += dev'''
def _cpu_assembly_derivatives_source(self, parameters, inputs_scalar):
return ''' if bool2fit[0]:
jacob_local[count] = -block_amp * bspline3d_dx(tx, ty, tz, coeffs, block_kx, block_ky, block_kz, point_x - block_mux, point_y - block_muy, point_z - block_muz)
count += 1
if bool2fit[1]:
jacob_local[count] = -block_amp * bspline3d_dy(tx, ty, tz, coeffs, block_kx, block_ky, block_kz, point_x - block_mux, point_y - block_muy, point_z - block_muz)
count += 1
if bool2fit[2]:
jacob_local[count] = -block_amp * bspline3d_dz(tx, ty, tz, coeffs, block_kx, block_ky, block_kz, point_x - block_mux, point_y - block_muy, point_z - block_muz)
count += 1
if bool2fit[3]:
jacob_local[count] = base
count += 1
if bool2fit[4]:
jacob_local[count] = 1.0
count += 1
if bool2fit[5]:
jacob_local[count] = d_kx(point_x, point_y, point_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[6]:
jacob_local[count] = d_ky(point_x, point_y, point_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[7]:
jacob_local[count] = d_kz(point_x, point_y, point_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1'''
def _gpu_assembly_derivatives_source(self, parameters, inputs_threads):
return ''' if bool2fit[0]:
jacob_local[count] = d_mux(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[1]:
jacob_local[count] = d_muy(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[2]:
jacob_local[count] = d_muz(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[3]:
jacob_local[count] = d_amp(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[4]:
jacob_local[count] = d_offset(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[5]:
jacob_local[count] = d_kx(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[6]:
jacob_local[count] = d_ky(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1
if bool2fit[7]:
jacob_local[count] = d_kz(thread_x, thread_y, thread_z, block_mux, block_muy, block_muz, block_amp, block_offset, block_kx, block_ky, block_kz, tx, ty, tz, coeffs)
count += 1'''
# %% Test function run
if __name__ == "__main__":
from corelp import debug
from funclp import plot
import numpy as np
debug_folder = debug(__file__)
# Inputs
variables = (
np.linspace(-1, 1, 100).reshape((1,1,100)),
np.linspace(-1, 1, 100).reshape((1,100,1)),
np.linspace(-1, 1, 100).reshape((100,1,1)),
)
parameters = dict()
# Get interpolation
from funclp import Gaussian3D
gausfunction = Gaussian3D()
model = gausfunction(*variables)
# Plot function
instance = Spline3D(model, *variables)
instance(variables[0] + 0.5, variables[1] - 0.5, variables[2], **parameters)