#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date : 2026-01-01
# Author : Lancelot PINCET
# GitHub : https://github.com/LancelotPincet
# Library : funcLP
# %% Libraries
from funclp import Fit
from funclp.modules.kernel_caching_LP.kernel_caching import kernel_caching
import math
from corelp import prop
import numba as nb
# %% Function
[docs]
class LM(Fit) :
# Attributs
damping_tau = 1e-3 # damping initialization
damping_min, damping_max = 1e-6, 1e6 # damping limit values
max_retries = 10 # Maximum of step retries for a given hessian
# Functions
[docs]
def fit_init(self) :
self.damping_data = self.xp.empty(shape=self.nmodels, dtype=self.dtype) # Damping data
self.nu_data = self.xp.full(shape=self.nmodels, dtype=self.dtype, fill_value=2.0) # Daing factor value
self.improved = self.xp.zeros(shape=self.nmodels, dtype=self.xp.bool_) # Bool vector: True if the inner optimizer loop improved --> Stop inner loop
self.ignore = self.xp.zeros(shape=self.nmodels, dtype=self.xp.bool_) # Bool vector: True if the inner optimizer failed or improved --> continue to next inner loop
self.damping_initialized = False
[docs]
def fit_optimize(self) :
# Status
self.xp.not_equal(self.converged, 0, out=self.improved) # Global converged are considered already improved in the inner loop
# Looping several time on a given hessian with different dampings
for _ in range(self.max_retries) :
self.ignore[:] = self.improved # If already improved, we ignore from start
# Damping, Cholseky solve, Parameter steps
self.damped_step(self.hessian_data, self.hessian_cache, self.gradient_data, self.damping_data, self.damping_min, self.damping_max, self.damping_tau, self.damping_initialized, self.parameters.T, self.parameters_indices, self.parameters_steps, self.bounds_min, self.bounds_max, self.ignore)
if not self.damping_initialized : self.damping_initialized = True
# evaluate chi2 after step and check if improving
self.trial_chi2(self.raw_data, *self.variables, *self.data, *self.parameters, *self.constants, self.weights, self.chi2_data, self.parameters.T, self.parameters_indices, self.parameters_steps, self.gradient_data, self.hessian_cache, self.damping_data, self.nu_data, self.damping_max, self.damping_min, self.converged, self.improved, self.ignore)
# End loop
if self.improved.all() :
break
self.converged[~self.improved] = -3
# Damping step
[docs]
@staticmethod
@nb.njit(parallel=True, nogil=True, cache=True, fastmath=True)
def cpu_damped_step(hessian, hessian_cache, gradient, damping, damping_min, damping_max, tau, initialized, parameters, indices, steps, bounds_min, bounds_max, ignore):
nmodels, nparams, _ = hessian.shape
for model in nb.prange(nmodels):
if ignore[model]:
continue
# Damping
# Initialization
if not initialized :
m = 0.0
for p in range(nparams) :
v = max(1e-12, abs(hessian_cache[model, p, p]))
if v > m : m = v
damping[model] = min(max(tau * m, damping_min), damping_max)
# Build damped hessian : H = H0 + lambda * diag(H0)
for i in range(nparams) :
for j in range(nparams) :
hessian[model, i, j] = hessian_cache[model, i, j]
hessian[model, i, i] += damping[model] * max(1e-12, abs(hessian_cache[model, i, i]))
# Cholesky solve
# lower-triangular part stores L
failed = False
for i in range(nparams):
for j in range(i + 1):
s = hessian[model, i, j]
for k in range(j):
s -= hessian[model, i, k] * hessian[model, j, k]
if i == j:
if s <= 0.0:
failed = True
break
hessian[model, i, j] = math.sqrt(s)
else:
hessian[model, i, j] = s / hessian[model, j, j]
if failed:
break
# Non positive-definite Hessian: keep parameters unchanged and let
# the trial stage increase damping.
if failed:
for p in range(nparams):
steps[model, p] = 0.0
continue
# Fowrward substitution: L y = -g
for i in range(nparams):
s = -gradient[model, i]
for k in range(i):
s -= hessian[model, i, k] * steps[model, k]
steps[model, i] = s / hessian[model, i, i]
# Back substitution: L^T x = y
for i in range(nparams - 1, -1, -1):
s = steps[model, i]
for k in range(i + 1, nparams):
s -= hessian[model, k, i] * steps[model, k]
steps[model, i] = s / hessian[model, i, i]
# Parameter steps
# Bounded updates
for p in range(nparams):
pidx = indices[p]
oldv = parameters[model, pidx]
newv = oldv + steps[model, p]
bound_min = bounds_min[model, p] if bounds_min.ndim == 2 else bounds_min[p]
bound_max = bounds_max[model, p] if bounds_max.ndim == 2 else bounds_max[p]
if newv < bound_min:
newv = bound_min
steps[model, p] = newv - oldv
elif newv > bound_max:
newv = bound_max
steps[model, p] = newv - oldv
parameters[model, pidx] = newv
[docs]
@staticmethod
@nb.cuda.jit(cache=True)
def gpu_damped_step(hessian, hessian_cache, gradient, damping, damping_min, damping_max, tau, initialized, parameters, indices, steps, bounds_min, bounds_max, ignore):
model = nb.cuda.grid(1)
nmodels, nparams, _ = hessian.shape
if model >= nmodels or ignore[model] : return
# DAMPING
# Initialize damping once per model if needed
if not initialized :
m = 0.0
for p in range(nparams) :
v = max(1e-12, abs(hessian_cache[model, p, p]))
if v > m : m = v
damping[model] = min(max(tau * m, damping_min), damping_max)
# Build damped Hessian: H = H0 + lambda * diag(H0)
for i in range(nparams):
for j in range(nparams):
hessian[model, i, j] = hessian_cache[model, i, j]
hessian[model, i, i] += damping[model] * max(1e-12, abs(hessian_cache[model, i, i]))
# CHOLESKY SOLVE
# Lower-triangular part stores L
failed = False
for i in range(nparams):
for j in range(i + 1):
s = hessian[model, i, j]
for k in range(j):
s -= hessian[model, i, k] * hessian[model, j, k]
if i == j:
if s <= 0.0:
failed = True
break
hessian[model, i, j] = math.sqrt(s)
else:
hessian[model, i, j] = s / hessian[model, j, j]
if failed:
break
if failed:
for p in range(nparams):
steps[model, p] = 0.0
return
# Forward substitution: L y = -g
for i in range(nparams):
s = -gradient[model, i]
for k in range(i):
s -= hessian[model, i, k] * steps[model, k]
steps[model, i] = s / hessian[model, i, i]
# Back substitution: L^T x = y
for i in range(nparams - 1, -1, -1):
s = steps[model, i]
for k in range(i + 1, nparams):
s -= hessian[model, k, i] * steps[model, k]
steps[model, i] = s / hessian[model, i, i]
# PARAM CHANGE
# Apply bounded parameter update
for p in range(nparams):
pidx = indices[p]
oldv = parameters[model, pidx]
newv = oldv + steps[model, p]
bound_min = bounds_min[model, p] if bounds_min.ndim == 2 else bounds_min[p]
bound_max = bounds_max[model, p] if bounds_max.ndim == 2 else bounds_max[p]
if newv < bound_min:
newv = bound_min
steps[model, p] = newv - oldv
elif newv > bound_max:
newv = bound_max
steps[model, p] = newv - oldv
parameters[model, pidx] = newv
@property
def damped_step(self) :
if not self.cuda : return self.cpu_damped_step
threads_per_block = 32
blocks_per_grid = (self.nmodels + threads_per_block - 1) // threads_per_block
return self.gpu_damped_step[blocks_per_grid, threads_per_block]
# Chi2 trial
def _trial_cache_suffix(self):
"""Stable suffix for cached trial kernels."""
function_name = self.function.__class__.__name__
estimator_name = self.estimator.__class__.__name__
distribution = getattr(self.estimator, "distribution", None)
distribution_name = distribution.__class__.__name__ if distribution is not None else "None"
return function_name, estimator_name, distribution_name
def _cpu_trial_source(self):
"""Build source code for cached CPU trial-chi2 kernel."""
function_name, estimator_name, distribution_name = self._trial_cache_suffix()
variables = [key for key in self.function.variables]
data = [key for key in self.function.data]
parameters = [key for key in self.function.parameters.keys()]
constants = [key for key in self.function.constants]
inputs = ', '.join(variables + data + parameters + constants)
inputs_scalar = ', '.join(
[f'point_{key}' for key in variables]
+ [f'point_{key}' for key in data]
+ [f'model_{key}' for key in parameters]
+ constants
)
point_variables = '\n '.join([f'point_{key} = {key}[point]' for key in variables])
point_data = '\n '.join([f'point_{key} = {key}[model, point]' for key in data])
model_params = '\n '.join([f'model_{key} = {key}[model]' for key in parameters])
return f'''
import numba as nb
from ._{function_name}_cpukernel_function import _{function_name}_cpukernel_function as model_scalar
from ._{estimator_name}_{distribution_name}_cpukernel_deviance import _{estimator_name}_{distribution_name}_cpukernel_deviance as deviance_scalar
@nb.njit(parallel=True, nogil=True, fastmath=True, cache=True)
def _{function_name}_{estimator_name}_{distribution_name}_cpu_trial_chi2(
raw_data, {inputs}, weights, chi2, parameters, indices, steps, gradient,
hessian, damping, nu, damping_max, damping_min, converged, improved, ignore
):
nmodels, npoints = raw_data.shape
nparams = steps.shape[1]
for model in nb.prange(nmodels):
if ignore[model]:
continue
chi_local = 0.0
{model_params}
for point in range(npoints):
{point_variables}
{point_data}
point_raw_data = raw_data[model, point]
point_weight = weights[model, point]
mod = model_scalar({inputs_scalar})
dev = deviance_scalar(point_raw_data, mod, point_weight)
chi_local += dev
new_chi2 = chi_local
old_chi2 = chi2[model]
pred = 0.0
for param in range(nparams):
pred += steps[model, param] * (
-gradient[model, param]
+ damping[model] * max(1e-12, abs(hessian[model, param, param])) * steps[model, param]
)
pred *= 0.5
ared = old_chi2 - new_chi2
rho = ared / pred if pred > 1e-12 else -1.0
rho = min(max(rho, -1e6), 1e6)
if (not ignore[model]) and (pred > 1e-12) and (ared > 0.0):
improved[model] = True
chi2[model] = new_chi2
tmp = 1.0 - (2.0 * rho - 1.0) ** 3
if tmp < 1.0 / 3.0:
tmp = 1.0 / 3.0
elif tmp > 10.0:
tmp = 10.0
damping[model] *= tmp
nu[model] = 2.0
if damping[model] < damping_min:
damping[model] = damping_min
else:
if not ignore[model]:
for param in range(nparams):
parameters[model, indices[param]] -= steps[model, param]
if damping[model] >= damping_max:
converged[model] = -1
improved[model] = True
else:
damping[model] *= nu[model]
nu[model] *= 2.0
if damping[model] > damping_max:
damping[model] = damping_max
'''
def _gpu_trial_source(self):
"""Build source code for cached GPU trial-chi2 kernel."""
function_name, estimator_name, distribution_name = self._trial_cache_suffix()
variables = [key for key in self.function.variables]
data = [key for key in self.function.data]
parameters = [key for key in self.function.parameters.keys()]
constants = [key for key in self.function.constants]
inputs = ', '.join(variables + data + parameters + constants)
inputs_threads = ', '.join(
[f'thread_{key}' for key in variables]
+ [f'thread_{key}' for key in data]
+ [f'block_{key}' for key in parameters]
+ constants
)
thread_variables = '\n '.join([f'thread_{key} = {key}[point]' for key in variables])
thread_data = '\n '.join([f'thread_{key} = {key}[model, point]' for key in data])
block_params = '\n '.join([f'block_{key} = {key}[model]' for key in parameters])
return f'''
import numba as nb
from numba import cuda
from ._{function_name}_gpukernel_function import _{function_name}_gpukernel_function as model_scalar
from ._{estimator_name}_{distribution_name}_gpukernel_deviance import _{estimator_name}_{distribution_name}_gpukernel_deviance as deviance_scalar
TPB = 128
@nb.cuda.jit(cache=True)
def _{function_name}_{estimator_name}_{distribution_name}_gpu_trial_chi2(
raw_data, {inputs}, weights, chi2, parameters, indices, steps, gradient,
hessian, damping, nu, damping_max, damping_min, converged, improved, ignore
):
model = nb.cuda.blockIdx.x
tid = nb.cuda.threadIdx.x
bdim = nb.cuda.blockDim.x
nmodels, npoints = raw_data.shape
nparams = steps.shape[1]
if model >= nmodels or ignore[model]:
return
chi_local = nb.float32(0.0)
{block_params}
for point in range(tid, npoints, bdim):
{thread_variables}
{thread_data}
thread_raw_data = raw_data[model, point]
thread_weight = weights[model, point]
pred = model_scalar({inputs_threads})
dev = deviance_scalar(thread_raw_data, pred, thread_weight)
chi_local += dev
s_chi = nb.cuda.shared.array(TPB, nb.float32)
s_chi[tid] = chi_local
nb.cuda.syncthreads()
stride = bdim // 2
while stride > 0:
if tid < stride:
s_chi[tid] += s_chi[tid + stride]
nb.cuda.syncthreads()
stride //= 2
if tid == 0:
new_chi2 = s_chi[0]
old_chi2 = chi2[model]
pred = 0.0
for param in range(nparams):
step = steps[model, param]
pred += step * (
-gradient[model, param]
+ damping[model] * max(1e-12, abs(hessian[model, param, param])) * step
)
pred *= 0.5
ared = old_chi2 - new_chi2
rho = ared / pred if pred > 1e-12 else -1.0
rho = min(max(rho, -1e6), 1e6)
if (not ignore[model]) and (pred > 1e-12) and (ared > 0.0):
improved[model] = True
chi2[model] = new_chi2
tmp = 1.0 - (2.0 * rho - 1.0) ** 3
if tmp < 1.0 / 3.0:
tmp = 1.0 / 3.0
elif tmp > 10.0:
tmp = 10.0
damping[model] *= tmp
nu[model] = 2.0
if damping[model] < damping_min:
damping[model] = damping_min
else:
if not ignore[model]:
for param in range(nparams):
parameters[model, indices[param]] -= steps[model, param]
if damping[model] >= damping_max:
converged[model] = -1
improved[model] = True
else:
damping[model] *= nu[model]
nu[model] *= 2.0
if damping[model] > damping_max:
damping[model] = damping_max
'''
@prop(cache=True)
def cpu_trial_chi2(self):
function_name, estimator_name, distribution_name = self._trial_cache_suffix()
_ = self.estimator.cpukernel_deviance
module_name = f"_{function_name}_{estimator_name}_{distribution_name}_cpu_trial_chi2"
source = self._cpu_trial_source()
return kernel_caching(module_name, source, object_name=module_name)
@prop(cache=True)
def gpu_trial_chi2(self):
function_name, estimator_name, distribution_name = self._trial_cache_suffix()
_ = self.estimator.gpukernel_deviance
module_name = f"_{function_name}_{estimator_name}_{distribution_name}_gpu_trial_chi2"
source = self._gpu_trial_source()
return kernel_caching(module_name, source, object_name=module_name)
@property
def trial_chi2(self) :
if not self.cuda : return self.cpu_trial_chi2
threads_per_block = 128
blocks_per_grid = self.nmodels
return self.gpu_trial_chi2[blocks_per_grid, threads_per_block]