Source code for funclp.modules.Function_LP.Function

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date          : 2025-12-17
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet
# Library       : funcLP
# Module        : Function

"""
Function class defining a function model.
"""



# %% Libraries
from corelp import prop, selfkwargs
from funclp import CudaReference, kernel_caching 
from abc import ABC, abstractmethod
import numpy as np



# %% Class
[docs] class Function(ABC, CudaReference) : ''' Function class defining a function model. Parameters ---------- kwargs : dict Attributes to change. Attributes ---------- parameters : dict dictionnary with eah parameter current value(s) nmodels : int Number of models according to current parameters of the function. ''' @prop() def name(self) : return self.__class__.__name__ def __init__(self, **kwargs) : selfkwargs(self, kwargs) for constant in self.constants.keys() : if not hasattr(self, constant) : raise SyntaxError(f'Please define {constant} constant to initialise this function')
[docs] @abstractmethod def function(self) : pass
def __call__(self, *args, **kwargs) : return self._function(*args, **kwargs) # Parameters @property def nmodels(self) : shape = np.broadcast_shapes(*[np.shape(getattr(self, param, [])) for param in self.parameters]) if len(shape) > 1 : raise ValueError('Parameters cannot have more than 1 dimension') return shape[0] if len(shape) == 1 else 0 # Set main ufunc parameters def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) main_ufunc = cls.function classname = cls.__name__ # Inputs cls.variables, cls.data, cls.constants = main_ufunc.variables, main_ufunc.data, main_ufunc.constants @property def prop(instance) : return {parameter : getattr(instance, parameter) for parameter in main_ufunc.parameters} @prop.setter def prop(instance, dic) : for key, value in dic.items() : setattr(instance, key, value) cls.parameters = prop @property def prop(instance) : return {constant : getattr(instance, constant) for constant in main_ufunc.constants} @prop.setter def prop(instance, dic) : for key, value in dic.items() : setattr(instance, key, value) cls.constants = prop # Looping on constants for pname in main_ufunc.constants: @property def prop(instance, pname=pname) : _param = getattr(instance, f'_{pname}', None) if _param is not None : return _param raise SyntaxError('This constant should be defined at function init time') @prop.setter def prop(instance, value, pname=pname) : setattr(instance, f'_{pname}', convert(value)) setattr(cls, pname, prop) # Looping on parameters for pname in main_ufunc.parameters: spec = main_ufunc.parameter_specs.get(pname, None) if spec is None: raise SyntaxError(f'{pname} parameter should be declared in @ufunc(parameters=...)') #parameter property @property def prop(instance, pname=pname) : _param = getattr(instance, f'_{pname}', None) if _param is not None : return _param return getattr(instance, f'_{pname}0') @prop.setter def prop(instance, value, pname=pname) : if value is None : setattr(instance, f'_{pname}', None) else : setattr(instance, f'_{pname}', convert(value)) setattr(cls, pname, prop) # derivative default dname = f'd_{pname}' if dname not in cls.__dict__: module_name = f'_{classname}_ufunc_{dname}' inputs_plus = main_ufunc.inputs.replace(pname, f'{pname} + eps') inputs_minus = main_ufunc.inputs.replace(pname, f'{pname} - eps') parameters = ', '.join([parameter_code(main_ufunc.parameter_specs[key]) for key in main_ufunc.parameters]) string = f''' from ._{classname}_cpukernel_function import _{classname}_cpukernel_function as kernel from funclp import Parameter, ufunc import math @ufunc(variables={main_ufunc.variables!r}, data={main_ufunc.data!r}, parameters=[{parameters}], constants={main_ufunc.constants!r}, fastmath=False) def {dname}({main_ufunc.d_inputs}): eps = 1e-3 * max(1.0, abs({pname})) f_plus = kernel({inputs_plus}) f_minus = kernel({inputs_minus}) if math.isfinite(f_plus) and math.isfinite(f_minus): return (f_plus - f_minus) / (2.0 * eps) f_x = kernel({main_ufunc.inputs}) if math.isfinite(f_plus) and math.isfinite(f_x): return (f_plus - f_x) / eps if math.isfinite(f_minus) and math.isfinite(f_x): return (f_x - f_minus) / eps return math.nan ''' d_ufunc = kernel_caching(module_name, string, object_name=dname) d_ufunc.__set_name__(cls, dname) setattr(cls, dname, d_ufunc) setattr(cls, f'_{pname}0', convert(spec.default)) #Estimate parameters function estimate = spec.estimate if estimate is None : setattr(cls, f'{pname}0', None) setattr(cls, f'{pname}_min', -np.float32(np.inf)) setattr(cls, f'{pname}_max', +np.float32(np.inf)) setattr(cls, f'{pname}_fit', bool(spec.fit)) else : setattr(cls, f'{pname}0', staticmethod(estimate)) setattr(cls, f'{pname}_fit', bool(spec.fit)) mini, maxi = spec.bounds if mini is None : mini = -np.float32(np.inf) if maxi is None : maxi = +np.float32(np.inf) setattr(cls, f'{pname}_min', mini) setattr(cls, f'{pname}_max', maxi) def _cpu_assembly_extra_imports_source(self, estimator, function_name, estimator_name, distribution_name, parameters): return "" def _gpu_assembly_extra_imports_source(self, estimator, function_name, estimator_name, distribution_name, parameters): return "" def _cpu_assembly_model_setup_source(self, model_params, parameters): return model_params def _gpu_assembly_model_setup_source(self, block_params, parameters): return block_params def _cpu_assembly_model_eval_source(self, inputs_scalar): return f''' mod = model_scalar({inputs_scalar}) dev = deviance_scalar(point_raw_data, mod, point_weight) los = loss_scalar(point_raw_data, mod, point_weight) fis = fisher_scalar(point_raw_data, mod, point_weight) chi_local += dev''' def _gpu_assembly_model_eval_source(self, inputs_threads): return f''' mod = model_scalar({inputs_threads}) dev = deviance_scalar(thread_raw_data, mod, thread_weight) los = loss_scalar(thread_raw_data, mod, thread_weight) fis = fisher_scalar(thread_raw_data, mod, thread_weight) chi_local += dev''' def _cpu_assembly_derivatives_source(self, parameters, inputs_scalar): return '\n'.join([ f''' if bool2fit[{pos}]: jacob_local[count] = d_{key}({inputs_scalar}) count += 1''' for pos, key in enumerate(parameters) ]) def _gpu_assembly_derivatives_source(self, parameters, inputs_threads): return '\n'.join([ f''' if bool2fit[{pos}]: jacob_local[count] = d_{key}({inputs_threads}) count += 1''' for pos, key in enumerate(parameters) ]) def _cpu_assembly_source(self, estimator): function_name, estimator_name, distribution_name = self._assembly_cache_suffix(estimator) variables = [key for key in self.variables] data = [key for key in self.data] parameters = [key for key in self.parameters.keys()] constants = [key for key in self.constants] inputs = ', '.join(variables + data + parameters + constants) inputs_scalar = ', '.join( [f'point_{key}' for key in variables] + [f'point_{key}' for key in data] + [f'model_{key}' for key in parameters] + constants ) point_variables = '\n '.join([f'point_{key} = {key}[point]' for key in variables]) point_data = '\n '.join([f'point_{key} = {key}[model, point]' for key in data]) model_params = '\n '.join([f'model_{key} = {key}[model]' for key in parameters]) model_setup = self._cpu_assembly_model_setup_source(model_params, parameters) point_setup = '\n '.join([s for s in (point_variables, point_data) if s]) model_eval = self._cpu_assembly_model_eval_source(inputs_scalar) derivatives = self._cpu_assembly_derivatives_source(parameters, inputs_scalar) imports = [ "import numpy as np", "import numba as nb", ( f"from ._{function_name}_cpukernel_function import " f"_{function_name}_cpukernel_function as model_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_cpukernel_deviance " f"import _{estimator_name}_{distribution_name}_cpukernel_deviance as deviance_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_cpukernel_loss " f"import _{estimator_name}_{distribution_name}_cpukernel_loss as loss_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_cpukernel_fisher " f"import _{estimator_name}_{distribution_name}_cpukernel_fisher as fisher_scalar" ), ] for key in parameters: imports.append( f"from ._{function_name}_cpukernel_d_{key} " f"import _{function_name}_cpukernel_d_{key} as d_{key}" ) extra_imports = self._cpu_assembly_extra_imports_source(estimator, function_name, estimator_name, distribution_name, parameters) body = f''' MAX_PARAMS = 8 NHESS = int(8 * (8 + 1) // 2) @nb.njit(parallel=True, nogil=True, fastmath=True, cache=True) def _{function_name}_{estimator_name}_{distribution_name}_cpu_assembly( raw_data, {inputs}, weights, chi2, gradient, hessian, bool2fit, ignore ): nmodels, npoints = raw_data.shape nparams = gradient.shape[1] for model in nb.prange(nmodels): if ignore[model]: continue chi_local = nb.float32(0.0) grad_local = np.zeros(MAX_PARAMS, dtype=np.float32) hess_local = np.zeros(NHESS, dtype=np.float32) jacob_local = np.empty(MAX_PARAMS, dtype=np.float32) {model_setup} for point in range(npoints): {point_setup} point_raw_data = raw_data[model, point] point_weight = weights[model, point] {model_eval} count = 0 {derivatives} for p in range(nparams): Jp = jacob_local[p] grad_local[p] += Jp * los for q in range(p, nparams): idx = p * MAX_PARAMS - (p * (p - 1)) // 2 + (q - p) hess_local[idx] += Jp * jacob_local[q] * fis chi2[model] = chi_local for p in range(nparams): gradient[model, p] = grad_local[p] for p in range(nparams): for q in range(p, nparams): idx = p * MAX_PARAMS - (p * (p - 1)) // 2 + (q - p) v = hess_local[idx] hessian[model, p, q] = v hessian[model, q, p] = v ''' source = '\n'.join(imports) if extra_imports and extra_imports.strip(): source += '\n' + extra_imports.strip('\n') return source + '\n\n' + body def _gpu_assembly_source(self, estimator): function_name, estimator_name, distribution_name = self._assembly_cache_suffix(estimator) variables = [key for key in self.variables] data = [key for key in self.data] parameters = [key for key in self.parameters.keys()] constants = [key for key in self.constants] inputs = ', '.join(variables + data + parameters + constants) inputs_threads = ', '.join( [f'thread_{key}' for key in variables] + [f'thread_{key}' for key in data] + [f'block_{key}' for key in parameters] + constants ) thread_variables = '\n '.join([f'thread_{key} = {key}[point]' for key in variables]) thread_data = '\n '.join([f'thread_{key} = {key}[model, point]' for key in data]) block_params = '\n '.join([f'block_{key} = {key}[model]' for key in parameters]) model_setup = self._gpu_assembly_model_setup_source(block_params, parameters) point_setup = '\n '.join([s for s in (thread_variables, thread_data) if s]) model_eval = self._gpu_assembly_model_eval_source(inputs_threads) derivatives = self._gpu_assembly_derivatives_source(parameters, inputs_threads) imports = [ "import numba as nb", "from numba import cuda", ( f"from ._{function_name}_gpukernel_function import " f"_{function_name}_gpukernel_function as model_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_gpukernel_deviance " f"import _{estimator_name}_{distribution_name}_gpukernel_deviance as deviance_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_gpukernel_loss " f"import _{estimator_name}_{distribution_name}_gpukernel_loss as loss_scalar" ), ( f"from ._{estimator_name}_{distribution_name}_gpukernel_fisher " f"import _{estimator_name}_{distribution_name}_gpukernel_fisher as fisher_scalar" ), ] for key in parameters: imports.append( f"from ._{function_name}_gpukernel_d_{key} " f"import _{function_name}_gpukernel_d_{key} as d_{key}" ) extra_imports = self._gpu_assembly_extra_imports_source(estimator, function_name, estimator_name, distribution_name, parameters) body = f''' TPB = 128 MAX_PARAMS = 8 NHESS = int(8 * (8 + 1) // 2) @nb.cuda.jit(cache=True, fastmath=True) def _{function_name}_{estimator_name}_{distribution_name}_gpu_assembly( raw_data, {inputs}, weights, chi2, gradient, hessian, bool2fit, ignore ): model = nb.cuda.blockIdx.x tid = nb.cuda.threadIdx.x bdim = nb.cuda.blockDim.x nmodels, npoints = raw_data.shape nparams = gradient.shape[1] if model >= nmodels or ignore[model]: return chi_local = nb.float32(0.0) grad_local = nb.cuda.local.array(MAX_PARAMS, nb.float32) hess_local = nb.cuda.local.array(NHESS, nb.float32) jacob_local = nb.cuda.local.array(MAX_PARAMS, nb.float32) for p in range(MAX_PARAMS): grad_local[p] = 0.0 for idx in range(NHESS): hess_local[idx] = 0.0 {model_setup} for point in range(tid, npoints, bdim): {point_setup} thread_raw_data = raw_data[model, point] thread_weight = weights[model, point] {model_eval} count = 0 {derivatives} for p in range(nparams): Jp = jacob_local[p] grad_local[p] += Jp * los for q in range(p, nparams): idx = p * MAX_PARAMS - (p * (p - 1)) // 2 + (q - p) hess_local[idx] += Jp * jacob_local[q] * fis s_chi = nb.cuda.shared.array(TPB, nb.float32) s_grad = nb.cuda.shared.array((TPB, MAX_PARAMS), nb.float32) s_hess = nb.cuda.shared.array((TPB, NHESS), nb.float32) s_chi[tid] = chi_local for p in range(MAX_PARAMS): s_grad[tid, p] = grad_local[p] for idx in range(NHESS): s_hess[tid, idx] = hess_local[idx] nb.cuda.syncthreads() stride = bdim // 2 while stride > 0: if tid < stride: s_chi[tid] += s_chi[tid + stride] for p in range(nparams): s_grad[tid, p] += s_grad[tid + stride, p] for p in range(nparams): for q in range(p, nparams): idx = p * MAX_PARAMS - (p * (p - 1)) // 2 + (q - p) s_hess[tid, idx] += s_hess[tid + stride, idx] nb.cuda.syncthreads() stride //= 2 if tid == 0: chi2[model] = s_chi[0] for p in range(nparams): gradient[model, p] = s_grad[0, p] for p in range(nparams): for q in range(p, nparams): idx = p * MAX_PARAMS - (p * (p - 1)) // 2 + (q - p) v = s_hess[0, idx] hessian[model, p, q] = v hessian[model, q, p] = v ''' source = '\n'.join(imports) if extra_imports and extra_imports.strip(): source += '\n' + extra_imports.strip('\n') return source + '\n\n' + body
[docs] def cpu_assembly_kernel(self, estimator): function_name, estimator_name, distribution_name = self._assembly_cache_suffix(estimator) _ = estimator.cpukernel_deviance _ = estimator.cpukernel_loss _ = estimator.cpukernel_fisher module_name = f'_{function_name}_{estimator_name}_{distribution_name}_cpu_assembly' cache = getattr(self.__class__, '_cpu_assembly_cache', {}) if module_name not in cache: source = self._cpu_assembly_source(estimator) cache[module_name] = kernel_caching(module_name, source, object_name=module_name) setattr(self.__class__, '_cpu_assembly_cache', cache) return cache[module_name]
[docs] def gpu_assembly_kernel(self, estimator): function_name, estimator_name, distribution_name = self._assembly_cache_suffix(estimator) _ = estimator.gpukernel_deviance _ = estimator.gpukernel_loss _ = estimator.gpukernel_fisher module_name = f'_{function_name}_{estimator_name}_{distribution_name}_gpu_assembly' cache = getattr(self.__class__, '_gpu_assembly_cache', {}) if module_name not in cache: source = self._gpu_assembly_source(estimator) cache[module_name] = kernel_caching(module_name, source, object_name=module_name) setattr(self.__class__, '_gpu_assembly_cache', cache) return cache[module_name]
[docs] def get_assembly(self, estimator, cuda, nmodels): if not cuda: return self.cpu_assembly_kernel(estimator) threads_per_block = 128 blocks_per_grid = nmodels return self.gpu_assembly_kernel(estimator)[blocks_per_grid, threads_per_block]
def _assembly_cache_suffix(self, estimator): function_name = self.__class__.__name__ estimator_name = estimator.__class__.__name__ distribution = getattr(estimator, "distribution", None) distribution_name = distribution.__class__.__name__ if distribution is not None else "None" return function_name, estimator_name, distribution_name
def convert(value) : try : dtype = value.dtype if np.issubdtype(dtype, np.bool_) : return value.astype(np.bool_) elif np.issubdtype(dtype, np.integer) : return value.astype(np.int32) elif np.issubdtype(dtype, np.floating) : return value.astype(np.float32) else : raise TypeError(f'Parameter cannot have {dtype} dtype') except AttributeError: if isinstance(value, bool) or isinstance(value, np.bool_) : return np.bool_(value) elif isinstance(value, int) or isinstance(value, np.integer) : return np.int32(value) elif isinstance(value, float) or isinstance(value, np.floating) : return np.float32(value) else : raise TypeError(f'Parameter cannot have {type(value)} dtype') def parameter_code(parameter) : return f'Parameter({parameter.name!r}, {value_code(parameter.default)})' def value_code(value) : if isinstance(value, np.generic) : return repr(value.item()) return repr(value) # %% Test function run if __name__ == "__main__": from corelp import test test(__file__)