#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date : 2025-12-20
# Author : Lancelot PINCET
# GitHub : https://github.com/LancelotPincet
# Library : funcLP
# Module : Fit
"""
Class defining fitting algorithms.
"""
# %% Libraries
from corelp import prop, selfkwargs
from funclp import CudaReference, use_inputs, use_shapes, use_cuda, use_broadcasting
from abc import ABC, abstractmethod
import numpy as np
import numba as nb
# %% Class
[docs]
class Fit(ABC, CudaReference) :
'''
Class defining fitting algorithms.
Parameters
----------
kwargs : dict
Attributes to change.
Attributes
----------
function : Function
function instance to fit
estimator : Estimator
estimator instance to use to fit
fit : method
Core function used defining the fitting algorithm.
max_iterations : int
Maximum number of loops in the algorithm used.
'''
@prop()
def name(self) :
return self.__class__.__name__
def __init__(self, function, estimator, **kwargs) :
self._function = function # Function object
self._estimator = estimator # Estimator object
self.cuda_reference = self.function
self.estimator.cuda_reference = self.function
selfkwargs(self, kwargs)
# Function and estimator
@property
def function(self) :
return self._function
@property
def estimator(self) :
return self._estimator
# Attributs
max_iterations = 200 #Maximum number of iterations
max_retries = 10 # Maximum of step retries for a given step change
ftol = 1e-8 # Loop stops when chi2 change is lower than ftol.
xtol = 1e-8 # Loop stops when parameter step is lower than xtol.
gtol = 0 # Loop stops when gradient maximum is lower than gtol.
# Parameters to fit
@property
def index2fit(self) :
parameters2fit = self.parameters2fit
return [i for i, param in enumerate(self.function.parameters.keys()) if param in parameters2fit]
@property
def bool2fit(self) :
return [getattr(self.function, f'{param}_fit') for param in self.function.parameters.keys()]
@property
def parameters2fit(self) :
return [key for key in self.function.parameters.keys() if getattr(self.function, f'{key}_fit')]
@property
def nparameters2fit(self) :
n = len(self.parameters2fit)
assert n <= 8, "Cannot fit more than 8 parameters"
return n
@property
def lower_bounds(self) :
bounds = [self.xp.asarray(getattr(self.function, f'{key}_min')) for key in self.parameters2fit]
if all(bound.ndim == 0 for bound in bounds):
return self.xp.asarray(bounds)
normalized = []
for bound in bounds:
if bound.ndim == 0:
normalized.append(self.xp.full(self.nmodels, bound, dtype=self.dtype))
else:
normalized.append(bound.reshape(self.nmodels).astype(self.dtype, copy=False))
return self.xp.stack(normalized, axis=1)
@property
def upper_bounds(self) :
bounds = [self.xp.asarray(getattr(self.function, f'{key}_max')) for key in self.parameters2fit]
if all(bound.ndim == 0 for bound in bounds):
return self.xp.asarray(bounds)
normalized = []
for bound in bounds:
if bound.ndim == 0:
normalized.append(self.xp.full(self.nmodels, bound, dtype=self.dtype))
else:
normalized.append(bound.reshape(self.nmodels).astype(self.dtype, copy=False))
return self.xp.stack(normalized, axis=1)
#ABC
[docs]
@abstractmethod
def fit_init(self) :
''' Fit initialization specific to optimizer '''
pass
[docs]
@abstractmethod
def fit_optimize(self) :
''' Fit main logic optimizer '''
pass
def __call__(self, raw_data, *args, weights=np.float32(1.)) :
''' Fitting function '''
# Start
cache_cuda = self.cuda
if hasattr(self.function, 'prepare_fit_inputs'):
raw_data, args, weights = self.function.prepare_fit_inputs(raw_data, args, weights)
fit_ufunc = self.function.fit_ufunc
else:
fit_ufunc = self.function.__class__.function
inputs = use_inputs(fit_ufunc, args, self.function.parameters) # variables, data, parameters
(nomodel, nopoint), (self.nmodels, self.npoints), in_shapes = use_shapes(*inputs) # (nomodel, nopoint), (nmodels, npoints), (variables_shapes, data_shapes, parameters_shapes)
self.cuda, self.xp, transfer_back, blocks_per_grid, threads_per_block = use_cuda(self.function, (self.nmodels, self.npoints), inputs)
self.variables, self.data, self.parameters, self.dtype = use_broadcasting(self.xp, *inputs, *in_shapes, (self.nmodels, self.npoints))
self.parameters = self.xp.asarray(self.parameters)
self.constants = [self.xp.asarray(arr) for arr in self.function.constants.values()]
self.jitted = self.function.gpu_function[blocks_per_grid, threads_per_block] if self.cuda else self.function.cpu_function
self.parameters_indices = self.xp.asarray(self.index2fit)
self.parameters_bools = self.xp.asarray(self.bool2fit)
self.bounds_min = self.lower_bounds
self.bounds_max = self.upper_bounds
self.assembly_kernel = self.function.get_assembly(self.estimator, self.cuda, self.nmodels)
weights = self.xp.asarray(weights)
# Allocate memory
self.raw_data = self.xp.asarray(raw_data).reshape((self.nmodels, self.npoints)) # Data to fit
self.weights = weights if weights.size > 1 else self.xp.full(shape=(self.nmodels, self.npoints), fill_value=weights) # weights vector
self.parameters_steps = self.xp.empty(shape=(self.nmodels, self.nparameters2fit), dtype=self.dtype) # Step to apply on each parameter
self.chi2_data = self.xp.empty(shape=self.nmodels, dtype=self.dtype) # chi2 current vector
self.chi2_cache = self.xp.empty_like(self.chi2_data) # chi2 reference vector to define improvement
self.gradient_data = self.xp.empty(shape=(self.nmodels, self.nparameters2fit), dtype=self.dtype) # gradient matrix
self.hessian_data = self.xp.empty(shape=(self.nmodels, self.nparameters2fit, self.nparameters2fit), dtype=self.dtype) # hessian matrix
self.hessian_cache = self.xp.empty_like(self.hessian_data) # hessian cache
self.converged = self.xp.zeros(shape=self.nmodels, dtype=self.xp.int8) # -4: reserved for parameter clamped to bounds (used by downstream apps), -3: optimization fail, -2: optimization terminated and failed, -1: optimization termination to test, 0: not converged yet, 1: gtol (gradient), 2: ftol (chi2), 3: xtol (steps)
# Initialize
self.fit_init()
# Iterations
for _ in range(self.max_iterations) :
# chi2, gradient, hessian
self.assembly_kernel(self.raw_data, *self.variables, *self.data, *self.parameters, *self.constants, self.weights, self.chi2_data, self.gradient_data, self.hessian_data, self.parameters_bools, self.converged)
# Reset caches
self.hessian_cache[:] = self.hessian_data
self.chi2_cache[:] = self.chi2_data
# Fit main logic [depends on optimizer]
self.fit_optimize()
# Calculate convergence
self.convergence(self.ftol, self.chi2_cache, self.chi2_data, self.gtol, self.gradient_data, self.xtol, self.parameters.T, self.parameters_indices, self.parameters_steps, self.converged)
all_converged = self.converged.all()
if all_converged :
break
# End
if transfer_back :
self.parameters = [self.xp.asnumpy(param) for param in self.parameters]
self.converged = self.xp.asnumpy(self.converged)
if nomodel : self.parameters = [param.item() for param in self.parameters]
self.cuda = cache_cuda
self.function.parameters = {key: self.parameters[pos] if getattr(self.function, f'{key}_fit') else self.function.parameters[key] for pos, key in enumerate(self.function.parameters.keys())}
return {key: self.function.parameters[key] for key in self.parameters2fit}
# --- Convergence ---
[docs]
@staticmethod
@nb.njit(parallel=True, cache=True)
def cpu_convergence(ftol, old_chi2, new_chi2, gtol, gradient, xtol, parameters, indices, parameters_steps, converged) :
nmodels, nparams = gradient.shape
for model in nb.prange(nmodels) :
if converged[model] > 0 or converged[model] < -1 : continue # test if 0 or -1
# gtol
if gtol:
maxi = 0.0
for param in range(nparams) :
g = abs(gradient[model, param])
if maxi < g : maxi = g
if maxi <= gtol :
converged[model] = 1
continue
# ftol
if ftol and converged[model] == 0: # Do not test on -1 status
if abs(new_chi2[model] - old_chi2[model]) <= ftol * max(1.0, abs(old_chi2[model])) :
converged[model] = 2
continue
# xtol
if xtol:
small = True
for param in range(nparams):
x = parameters[model, indices[param]]
p = parameters_steps[model, param]
if abs(p) > xtol * (abs(x) + 1.0):
small = False
break
if small:
converged[model] = 3
continue
# Failure
if converged[model] == -1 :
converged[model] = -2
[docs]
@staticmethod
@nb.cuda.jit(cache=True)
def gpu_convergence(ftol, old_chi2, new_chi2, gtol, gradient, xtol, parameters, indices, parameters_steps, converged) :
model = nb.cuda.grid(1) # 1D grid of threads
nmodels, nparams = gradient.shape
if model >=nmodels or converged[model] > 0 or converged[model] < -1 : return # test if 0 or -1
# gtol
if gtol:
maxi = 0.0
for param in range(nparams) :
g = abs(gradient[model, param])
if maxi < g : maxi = g
if maxi <= gtol :
converged[model] = 1
return
# ftol
if ftol and converged[model] == 0: # Do not test on -1 status
if abs(new_chi2[model] - old_chi2[model]) <= ftol * max(1.0, abs(old_chi2[model])) :
converged[model] = 2
return
# xtol
if xtol:
small = True
for param in range(nparams):
x = parameters[model, indices[param]]
p = parameters_steps[model, param]
if abs(p) > xtol * (abs(x) + 1.0):
small = False
break
if small:
converged[model] = 3
return
# Failure
if converged[model] == -1 :
converged[model] = -2
@prop(cache=True)
def convergence(self) :
if not self.cuda : return self.cpu_convergence
threads_per_block = 32
blocks_per_grid = (self.nmodels + threads_per_block - 1) // threads_per_block
return self.gpu_convergence[blocks_per_grid, threads_per_block]
# %% Test function run
if __name__ == "__main__":
from corelp import test
test(__file__)