Source code for sheap.SheapModelBuilder.Utils

"""
SheapModelBuilder Utilities
=======================

Helpers for constructing parameter ties and grouping spectral lines
into composite (SPAF) profiles used by the SheapModelBuilder pipeline.

Main Features
-------------
- **Fe ties**: Build tied-parameter expressions for Fe regions based on
    element or subregion membership (:func:`fe_ties`).
- **Region ties**: Generate coherent tie maps for narrow/broad components,
    optionally merging with known relations (:func:`_maketies`).
- **Index ties resolution**: Flatten index-based dependencies into a simple
    (coefficient, free-index) mapping (:func:`flatten_index_ties`).
- **Line grouping (SPAF)**: Collapse multiple lines into grouped pseudo-lines
    with amplitude relations for compact modeling (:func:`group_lines`).

Notes
-----
- Tie expressions follow the naming scheme:
    ``{param}_{line_name}_{component}_{region}``, e.g.
    ``center_Hbeta_1_broad`` or ``fwhm_OIIIc_1_narrow``.
- Amplitude handling may depend on your parameterization (e.g., ``logamp`` vs
    linear amplitude). If you moved to logarithmic amplitudes, ensure upstream
    tie generation is consistent with that convention.
- The grouping logic supports modes ``"region"``, ``"subregion"``, and
    ``"element"``; choose according to how you want lines fused.
"""

__author__ = 'felavila'


__all__ = [
    "default_known_tied_relations",
    "fe_ties",
    "flatten_index_ties",
    "group_lines",
]

from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np

from sheap.Core import SpectralLine,SpectralLineList

#TODO check how the ties change now we have logamp and not amplitude. e.g reparametrize 

[docs] def fe_ties(entries: SpectralLineList, routine_fe_tied) -> List[List[str]]: """ Generate tied parameter expressions for Fe emission lines. Parameters ---------- entries : list of SpectralLine Spectral line entries to be checked for Fe region membership. routine_fe_tied : dict Dictionary with tie rules, e.g. {'by': 'element', 'tied_params': ('center', 'fwhm')}. Returns ------- list of list of str Each inner list is a pair of tied parameter names [dependent, reference]. """ tied_params = routine_fe_tied.get("tied_params", ('center', 'fwhm')) by = routine_fe_tied.get("by", "element") #fe_tied_params = {"by":"subregion","tied_params": ('center', 'fwhm')} subregion, centers, element,_ = np.array([[e.subregion, e.center, e.element,e.region] for e in entries]).T mask_fe = np.char.find(subregion.astype(str), "fe") >= 0 subregion, centers, element, entries = ( subregion[mask_fe], centers[mask_fe], element[mask_fe], [entries[i] for i in np.where(mask_fe)[0]],) ties: List[List[str]] = [] if by == "element": for reg in np.unique(element): idx_region = np.where(element == reg)[0] entries_region = [entries[i] for i in idx_region] centers_region = np.array([e.center for e in entries_region]) idx_center = int(np.argmin(np.abs(centers_region - np.median(centers_region)))) for i, e in enumerate(entries_region): if i == idx_center or 'fe' not in e.region: continue for p in tied_params: # ties.append( [ f"{p}_{e.line_name}_{e.component}_{e.region}", f"{p}_{entries_region[idx_center].line_name}_{entries_region[idx_center].component}_{entries_region[idx_center].region}", ] ) elif by == "subregion": for reg in np.unique(subregion): idx_region = np.where(subregion == reg)[0] entries_region = [entries[i] for i in idx_region] centers_region = np.array([e.center for e in entries_region]) idx_center = int(np.argmin(np.abs(centers_region - np.median(centers_region)))) for i, e in enumerate(entries_region): if i == idx_center or 'fe' not in e.region: continue for p in tied_params: # ties.append( [ f"{p}_{e.line_name}_{e.component}_{e.region}", f"{p}_{entries_region[idx_center].line_name}_{entries_region[idx_center].component}_{entries_region[idx_center].region}", ] ) else: centers = np.array([e.center for e in entries]) idx_center = int(np.argmin(np.abs(centers - np.median(centers)))) for i, e in enumerate(entries): if i == idx_center or 'fe' not in e.region: continue for p in tied_params: ties.append( [ f"{p}_{e.line_name}_{e.component}_{e.region}", f"{p}_{entries[idx_center].line_name}_{entries[idx_center].component}_{entries[idx_center].region}", ] ) return ties
def _maketies( sheapmodel, tied_narrow_to: Optional[Union[str, Dict[int, Dict[str, Any]]]] = None, tied_broad_to: Optional[Union[str, Dict[int, Dict[str, Any]]]] = None, known_tied_relations: Optional[List[Tuple[Tuple[str, ...], List[str]]]] = None, only_known: bool = False, ) -> List[List[str]]: """ Generate parameter ties for narrow and broad components in a spectral region. Parameters ---------- sheapmodel : ComplexRegion Region containing spectral line definitions. tied_narrow_to : str or dict, optional Line name or component mapping to tie narrow lines to. tied_broad_to : str or dict, optional Line name or component mapping to tie broad lines to. known_tied_relations : list, optional Predefined tie relations to enforce. only_known : bool, optional If True, return only known tied relations (ignore mainline ties). Returns ------- list of list of str List of parameter name ties [dependent, reference]. """ # Determine mainline #print(known_tied_relations) mainline_candidates_broad = ["Halpha","Hbeta","MgII","CIVb","Lyalpha","Pad",] # this can be disscuss in the future mainline_candidates_narrow = ["OIIIc","Halpha","NIIb","MgII","CIII]","SIIb","OIIa",] # this can be disscuss in the future n_components_per_region = sheapmodel.characteristics()["n_components_per_region"] n_broad = n_components_per_region["broad"] n_narrow = n_components_per_region["narrow"] dict_region = sheapmodel.group_by("region") local_region_list = dict_region["broad"].lines + dict_region["narrow"].lines if isinstance(mainline_candidates_broad, (list, tuple)): available = {e.line_name for e in local_region_list if isinstance(e.line_name, str)} mainline_broad = next( (name for name in mainline_candidates_broad if name in available), mainline_candidates_broad[0] if mainline_candidates_broad else '', ) if isinstance(mainline_candidates_narrow, (list, tuple)): available = {e.line_name for e in local_region_list if isinstance(e.line_name, str)} mainline_narrow = next( (name for name in mainline_candidates_narrow if name in available), mainline_candidates_narrow[0] if mainline_candidates_narrow else '',) ties: List[List[str]] = [] for name, mapping in ( ('tied_narrow_to', tied_narrow_to), ('tied_broad_to', tied_broad_to), ): if mapping and not isinstance(mapping, (str, dict)): raise TypeError(f"{name} must be str or dict, got {type(mapping).__name__}") tied_narrow_to = tied_narrow_to or mainline_narrow tied_broad_to = tied_broad_to or mainline_broad def _to_map(target, count): if isinstance(target, str): return {k: {"line_name": target, "component": k} for k in range(1, count + 1)} return { k: { "line_name": target.get(k, {}).get("line_name", tied_broad_to), "component": target.get(k, {}).get("component", k), } for k in range(1, count + 1) } narrow_map = _to_map(tied_narrow_to, n_narrow) broad_map = _to_map(tied_broad_to,n_broad) def add_tie_if_different(source, target): if source != target: ties.append([source, target]) for e in local_region_list: comp = e.component if e.region == "narrow": target = narrow_map[comp] suffix = "narrow" elif e.region == "broad": target = broad_map[comp] suffix = "broad" else: continue # unknown region for p in ("center", "fwhm"): source_name = f"{p}_{e.line_name}_{comp}_{suffix}" target_name = f"{p}_{target['line_name']}_{target['component']}_{suffix}" add_tie_if_different(source_name, target_name) if known_tied_relations: _known_ties = [] present = {e.line_name for e in local_region_list if isinstance(e.line_name, str)} #print(known_tied_relations) for pair, factor in known_tied_relations: if all(name in present for name in pair): for k in range(1, n_narrow + 1): #_ties.append([f.replace("component", str(k)) for f in factor]) #print([f.replace("component", str(k)) for f in factor]) _known_ties.append([f.replace("component", str(k)) for f in factor]) # if only_known: # return local_ties # ties_ = [] # for t in ties: # if t not in ties_: # ties_.append(t) #print(_known_ties) return ties,_known_ties
[docs] def flatten_index_ties(index_ties: List[Tuple[int, int, str, float]]) -> Dict[int, Tuple[float, int]]: """ Resolve index-based tie dependencies into coefficient + free parameter mapping. Parameters ---------- index_ties : list of tuple Each entry is (target_idx, source_idx, operation, value). Returns ------- dict Mapping from target index to (coefficient, source index). """ resolved: Dict[int, Tuple[float, int]] = {} free_indices = set() sources = {src for _, src, _, _ in index_ties} targets = {tgt for tgt, _, _, _ in index_ties} free_indices.update(sources - targets) for i in free_indices: resolved[i] = (1.0, i) for tgt, src, op, val in index_ties: if src not in resolved: resolved[src] = (1.0, src) coef_src, free_idx = resolved[src] if op == '*': coef = coef_src * val elif op == '/': coef = coef_src / val else: raise ValueError(f"Unsupported operation: {op}") resolved[tgt] = (coef, free_idx) return resolved
#?
[docs] def group_lines( lines: SpectralLineList, region: str, profile: str = "gaussian", mode: str = "region", known_tied_relations: List[Tuple[Tuple[str, ...], List[str]]] = [], ) -> SpectralLineList: """ Collapse multiple spectral lines into grouped pseudo-lines with SPAF profiles. Parameters ---------- lines : list of SpectralLine Full list of lines in the region. region : str Region name to group (e.g., "fe", "narrow"). profile : str, default "gaussian" Base profile to assign to grouped line. mode : {"region", "subregion", "element"} Grouping logic applied. known_tied_relations : list, optional Amplitude tie relations to apply. Returns ------- list of SpectralLine Lines with some collapsed into SPAF composite profiles. """ grouped = defaultdict(list) collapsed_lines = [] lines_to_remove = set() for line in lines: if line.region != region: continue if mode == "region": key_base = line.region elif mode == "subregion": key_base = line.subregion elif mode == "element": key_base = line.element else: continue key = (key_base, line.component) grouped[key].append(line) for (region_key, comp), group in grouped.items(): name_to_idx = {line.line_name: i for i, line in enumerate(group)} name_to_comp = {line.line_name: line.component for line in group} index_ties = [] for pair, factor in known_tied_relations: if all(name in name_to_idx for name in pair): if len(factor) < 3: factor += ["*1"] target_str, source_str, op_val = factor if "amplitude" in target_str: target_name = target_str.split("_")[1] source_name = source_str.split("_")[1] if name_to_comp[target_name] != name_to_comp[source_name]: continue target_idx = name_to_idx[target_name] source_idx = name_to_idx[source_name] op, val = op_val[0], float(op_val[1:]) index_ties.append((target_idx, source_idx, op, val)) resolved_map = flatten_index_ties(index_ties) full_rules: List[Tuple[int, float, int]] = [] dependent_list = [] for dx in range(len(group)): if dx in resolved_map: coef, idx = resolved_map[dx] if idx != dx: dependent_list.append(int(dx)) full_rules.append((dx, float(coef) , idx)) else: full_rules.append((dx, 1.0, dx)) #print("dependent_list",dependent_list) amplitudes = np.array([line.amplitude for i, line in enumerate(group) if i not in dependent_list]) arg_max = np.argmax(amplitudes) if region == "fe" and mode == "element": full_rules = [] dependent_list = [] for n, coef in enumerate(amplitudes): full_rules.append((n, float(coef/ np.max(amplitudes)), int(arg_max))) if n != arg_max: dependent_list.append(n) amplitudes = np.array([ float(line.amplitude) for i, line in enumerate(group) if i not in dependent_list]) centers = [line.center for line in group] region_lines = [line.line_name for line in group] elements = [line.element for line in group] subregions = [line.subregion for line in group] line_name = f"{region_key}{comp}" if mode=="region" else f"{region}{region_key}{comp}" #print(line_name) collapsed = SpectralLine( center=centers, line_name=line_name, region=region, component=comp, profile=profile, region_lines=region_lines, element=elements, subregion = subregions, amplitude=amplitudes.tolist(), amplitude_relations=full_rules, ) collapsed_lines.append(collapsed) lines_to_remove.update(id(line) for line in group) new_lines = [line for line in lines if id(line) not in lines_to_remove] new_lines.extend(collapsed_lines) return new_lines
#TODO check how this affect the combination e.g what happends when hbeta depends on halpha and hg and hd depends on hbeta
[docs] def default_known_tied_relations( include_balmer: bool = True, include_forbidden: bool = True, ) -> List[Tuple[Tuple[str, ...], List[str]]]: ties: List[Tuple[Tuple[str, ...], List[str]]] = [] """ Return default tied relations for common line doublets and multiplets. Parameters ---------- include_balmer : bool If True, include Balmer series constraints. include_forbidden : bool If True, include [O III], [N II], [S III], [O I], and [O II] constraints. Returns ------- list of tuple Each tuple contains (line_name pair, constraint expression list). """ ties += [ (("OIIIb", "OIIIc"), ["amplitude_OIIIb_component_narrow", "amplitude_OIIIc_component_narrow", "*0.33"]), (("OIIIb", "OIIIc"), ["center_OIIIb_component_narrow", "center_OIIIc_component_narrow"]), (("NIIa", "NIIb"), ["amplitude_NIIa_component_narrow", "amplitude_NIIb_component_narrow", "*0.33"]), (("NIIa", "NIIb"), ["center_NIIa_component_narrow", "center_NIIb_component_narrow"]), ] if include_forbidden: ties += [ (("OIa", "OIb"), ["amplitude_OIa_component_narrow", "amplitude_OIb_component_narrow", "*3.0"]), (("OIa", "OIb"), ["center_OIa_component_narrow", "center_OIb_component_narrow"]), (("SIIIa", "SIIIb"), ["amplitude_SIIIa_component_narrow", "amplitude_SIIIb_component_narrow", "*0.4"]), (("OIIa", "OIIb"), ["amplitude_OIIa_component_narrow", "amplitude_OIIb_component_narrow", "*0.77"]), (("OIIa", "OIIb"), ["center_OIIa_component_narrow", "center_OIIb_component_narrow"]), ] if include_balmer: ties += [ (("Hbeta", "Halpha"), ["amplitude_Hbeta_component_narrow", "amplitude_Halpha_component_narrow", "*0.35"]), (("Hg", "Hbeta"), ["amplitude_Hgamma_component_narrow", "amplitude_Hbeta_component_narrow", "*0.47"]), (("Hd", "Hbeta"), ["amplitude_Hdelta_component_narrow", "amplitude_Hbeta_component_narrow", "*0.26"]), ] # if include_helium: # ties += [ # (("HeId", "HeI5876"), ["amplitude_HeI4471_component_narrow", "amplitude_HeI5876_component_narrow", "*0.3"]), # (("HeI6678", "HeI5876"), ["amplitude_HeI6678_component_narrow", "amplitude_HeI5876_component_narrow", "*0.2"]), # ] return ties