Source code for ccsds_ndm.kvn_builder

# CCSDS-NDM: CCSDS Navigation Data Messages Read/Write Library
#
# Copyright (C) Egemen Imre
#
# Licensed under GNU GPL v3.0. See LICENSE for more info.
"""
KVN object builder: mapping a :class:`~ccsds_ndm.kvn_parser.KvnDocument`
onto the xsdata dataclass tree for the detected NDM type.

Registry dispatch
-----------------
The ``build_object`` entry point uses the schema registry to decide which
structural variant (flat / segmented / CDM) to use, replacing the old
approach of probing the ``KvnDocument`` structure at runtime.

``_build_sub_object`` uses the registry's per-type handler sentinels to
dispatch to special builders (rotation types, etc.) instead of hard-coding
class names.
"""

from __future__ import annotations

import dataclasses
import enum
import types as _types
import typing
from collections.abc import Sequence

from ccsds_ndm.kvn_handlers import (
    DISPATCH_CDM,
    DISPATCH_SEGMENTED,
    PARSE_ROTATION_ANGLE,
    PARSE_ROTATION_RATE,
)
from ccsds_ndm.kvn_parser import KvnDocument
from ccsds_ndm.kvn_registry import VERSION_REGISTRY
from ccsds_ndm.kvn_tokenizer import (
    BlankLine,
    CommentLine,
    CovarianceRowLine,
    KvLine,
    KvnLine,
    PackedDataLine,
    TdmObsLine,
)

# ---------------------------------------------------------------------------
# Low-level utilities
# ---------------------------------------------------------------------------


def _unwrap(t) -> typing.Any:
    """Strip ``Optional[X]`` / ``None | X`` → ``X``; return *t* unchanged otherwise."""
    origin = getattr(t, "__origin__", None)
    # On Python 3.11-3.13, ``get_type_hints()`` resolves ``None | X`` (PEP 604
    # union syntax) to a ``types.UnionType`` which has no ``__origin__``.
    # On Python 3.14+ it resolves to ``typing.Union`` (origin is typing.Union).
    # We therefore check both the origin attribute and isinstance to cover all
    # supported versions.
    if origin is typing.Union or isinstance(t, _types.UnionType):
        args = [a for a in t.__args__ if a is not type(None)]
        return args[0] if args else str
    return t


def _coerce_value(raw_value, field_type):
    """
    Convert a raw KVN string value to the Python type expected by a dataclass field.

    Handles ``enum.Enum`` subclasses (looked up by value), ``float``, and
    ``int``.  Any other type is returned as-is (already a ``str``).
    ``Optional`` wrappers are stripped first.
    """
    base = _unwrap(field_type)
    if isinstance(base, type) and issubclass(base, enum.Enum):
        return base(raw_value)
    if base is float:
        return float(raw_value)
    if base is int:
        return int(raw_value)
    return raw_value


def _lenient_class_factory(clazz, params):
    """
    Instantiate ``clazz`` from ``params``, filling any missing required field with ``None``.
    """
    for f in dataclasses.fields(clazz):
        if f.init and f.name not in params:
            if (
                f.default is dataclasses.MISSING
                and f.default_factory is dataclasses.MISSING
            ):
                params[f.name] = None
    return clazz(**params)


[docs] def init_root_ndm_object(clazz): """ Instantiate a root NDM class (e.g. ``Apm``, ``Oem``) with ``None`` placeholders. """ init_kwargs = { f.name: None for f in dataclasses.fields(clazz) if f.init and f.default is dataclasses.MISSING and f.default_factory is dataclasses.MISSING } return clazz(**init_kwargs)
[docs] def get_ccsds_kw_list(clazz): """ Returns the list of KVN keyword names recognised by ``clazz``. Handles dataclasses (field metadata ``"name"``), Enum types (member names), and edge classes (returns ``[]``). """ if "__dataclass_fields__" in vars(clazz).keys(): kw_list = [ var.metadata["name"] for var in vars(clazz)["__dataclass_fields__"].values() if "name" in var.metadata.keys() ] return kw_list elif "_member_names_" in vars(clazz).keys(): kw_list = [enum_tag for enum_tag in vars(clazz)["_member_names_"]] return kw_list else: return []
[docs] def build_ndm_object(clazz, local_lines, prefix=None): """ Build a dataclass instance directly from KVN ``local_lines``. Each entry in ``local_lines`` is a list of 2 or 3 strings: ``[key, value]`` - plain field or nested single-value dataclass ``[key, value, units]`` - nested dataclass that also carries a units attribute """ # Resolve forward-reference string annotations to actual type objects. # xsdata generates fields whose f.type is a plain string, not a type. resolved_hints = typing.get_type_hints(clazz) # Build a map from KVN keyword name → (field, resolved_type) for quick lookup. # The KVN keyword is stored in the dataclass field's metadata["name"]. name_to_field = { f.metadata.get("name"): (f, resolved_hints[f.name]) for f in dataclasses.fields(clazz) if f.name in resolved_hints } # This dictionary will hold the keyword arguments for instantiating `clazz`. params = {} # Handle the special case for USER_DEFINED parameters, which are prefixed. if prefix: # Find the dataclass field that corresponds to the prefix (e.g., "USER_DEFINED"). entry = name_to_field.get(prefix) if entry is not None: list_field, list_type = entry # The field should be a list of a specific item type (e.g., UserDefinedParameterType). item_clazz = _unwrap(list_type.__args__[0]) entries = [] # Iterate over the KVN lines to find all user-defined parameters. for item in local_lines: if len(item) < 2 or not item[0].startswith(prefix + "_"): continue param_name = item[0].replace(prefix + "_", "") entries.append(item_clazz(value=item[1], parameter=param_name)) # Assign the list of user-defined parameters to the corresponding field. params[list_field.name] = entries else: # This is the general case for all other KVN key-value pairs. for item in local_lines: key, raw_val = item[0], item[1] # Find the dataclass field corresponding to the KVN key. entry = name_to_field.get(key) if entry is None: # If the key is not defined in the dataclass, ignore it. continue field, resolved_type = entry # Unwrap Optional[T] to get T. base_type = _unwrap(resolved_type) # Case 1: The field is a list (e.g., for multiple COMMENT lines). # xsdata marks list-like fields with a default_factory. if field.default_factory is not dataclasses.MISSING: # Get the type of items in the list. elem_type = _unwrap(base_type.__args__[0]) # Append the coerced value to the list for this field. # setdefault ensures the list is created on first access. params.setdefault(field.name, []).append( _coerce_value(raw_val, elem_type) ) # Case 2: The field is a nested dataclass that represents a single value with units. elif dataclasses.is_dataclass(base_type): # These are "leaf" dataclasses like MomentType, with a 'value' and optional 'units'. leaf_hints = typing.get_type_hints(base_type) leaf_fields = dataclasses.fields(base_type) # The first field is assumed to be the main value. value_field = leaf_fields[0] leaf_params = { value_field.name: _coerce_value( raw_val, leaf_hints[value_field.name] ) } # If a unit is provided in the KVN line, find the 'units' attribute and set it. if len(item) > 2: units_field = next( ( f for f in leaf_fields if f.metadata.get("type") == "Attribute" ), None, ) if units_field is not None: leaf_params[units_field.name] = _coerce_value( item[2], leaf_hints[units_field.name] ) # Instantiate the leaf dataclass and assign it to the parent's parameter dict. params[field.name] = typing.cast(typing.Callable, base_type)( **leaf_params ) else: # Case 3: The field is a simple scalar type (str, int, float, Enum). params[field.name] = _coerce_value(raw_val, resolved_type) # Instantiate the main dataclass. _lenient_class_factory fills missing # required fields with None, which is necessary because we build the object # piece by piece. return _lenient_class_factory(clazz, params)
# --------------------------------------------------------------------------- # Builder helpers # --------------------------------------------------------------------------- # Hard-coded keywords for RotationAngleType / RotationRateType # (their fields have name=None, so get_ccsds_kw_list returns []) _ROTATION_KWS: dict[str, list[str]] = { "RotationAngleType": ["X_ANGLE", "Y_ANGLE", "Z_ANGLE"], "RotationRateType": ["X_RATE", "Y_RATE", "Z_RATE"], } _AEM_FIELD_MAP: dict[str, str] = { "QUATERNION": "quaternion_state", "QUATERNION/DERIVATIVE": "quaternion_derivative", "QUATERNION/RATE": "quaternion_euler_rate", "EULER_ANGLE": "euler_angle", "EULER_ANGLE/RATE": "euler_angle_rate", "SPIN": "spin", "SPIN/NUTATION": "spin_nutation", } def _hints(clazz) -> dict: """Return the resolved type hints for *clazz*.""" return typing.get_type_hints(clazz) def _kvlines_to_rows(lines: Sequence[KvnLine]) -> list[list[str]]: """ Convert a list of :class:`KvnLine` objects to the ``[key, value(, unit)]`` row format expected by :func:`build_ndm_object`. """ rows: list[list[str]] = [] for line in lines: if isinstance(line, KvLine): row: list[str] = [line.key, line.value] if line.unit: row.append(line.unit) rows.append(row) elif isinstance(line, CommentLine): rows.append(["COMMENT", line.text]) return rows def _collect_all_kws(clazz) -> list[str]: """ Return all KVN keywords reachable from *clazz*, recursing into no-``"name"`` sub-containers and camelCase container fields. """ if clazz.__name__ in _ROTATION_KWS: return list(_ROTATION_KWS[clazz.__name__]) kws = [] hints = _hints(clazz) for f in dataclasses.fields(clazz): meta_name = f.metadata.get("name") if meta_name is None or meta_name == "": ftype = _unwrap(hints[f.name]) if dataclasses.is_dataclass(ftype): kws.extend(_collect_all_kws(ftype)) elif not meta_name.isupper(): ftype_raw = hints[f.name] is_list_field = getattr(ftype_raw, "__origin__", None) is list if is_list_field: ftype = _unwrap(ftype_raw.__args__[0]) else: ftype = _unwrap(ftype_raw) if dataclasses.is_dataclass(ftype): kws.extend(_collect_all_kws(ftype)) else: kws.append(meta_name) return kws def _build_rotation_type(rot_clazz, inst_lines: Sequence[KvnLine]) -> object: """Build a ``RotationAngleType`` or ``RotationRateType`` from ordered KvnLines.""" rot_hints = _hints(rot_clazz) rot_fields = dataclasses.fields(rot_clazz) kv_rows = [ (ln.key, ln.value, ln.unit) for ln in inst_lines if isinstance(ln, KvLine) ] params = {} for rf, (kw, val, unit) in zip(rot_fields, kv_rows): comp_type = _unwrap(rot_hints[rf.name]) comp_hints = _hints(comp_type) comp_fields = dataclasses.fields(comp_type) value_field = comp_fields[0] angle_field = comp_fields[1] units_field = next( ( f for f in comp_fields if f.metadata.get("type") == "Attribute" and f.name == "units" ), None, ) angle_type = _unwrap(comp_hints[angle_field.name]) comp_params = { value_field.name: float(val), angle_field.name: angle_type(kw), } if units_field and unit: units_type = _unwrap(comp_hints[units_field.name]) try: comp_params[units_field.name] = units_type(unit.strip()) except (ValueError, KeyError): pass params[rf.name] = comp_type(**comp_params) return _lenient_class_factory(rot_clazz, params) def _att_column_template(meta_lines: Sequence[KvnLine]) -> list[str]: """ Derive the packed-data column template for an AEM segment from its metadata lines. Returns a list of KVN keyword strings giving the column order, e.g. ``["EPOCH", "Q1", "Q2", "Q3", "QC"]``. """ kv: dict[str, str] = {} for line in meta_lines: if isinstance(line, KvLine): kv[line.key] = line.value att_type = kv.get("ATTITUDE_TYPE", "") quat_type = kv.get("QUATERNION_TYPE", "FIRST").upper() rot_seq = kv.get("EULER_ROT_SEQ", "") _angle = {"1": "X_ANGLE", "2": "Y_ANGLE", "3": "Z_ANGLE"} _rate = {"1": "X_RATE", "2": "Y_RATE", "3": "Z_RATE"} att_upper = att_type.upper().replace(" ", "") if att_upper in ("QUATERNION", "QUATERNION_1"): if quat_type == "LAST": return ["EPOCH", "Q1", "Q2", "Q3", "QC"] return ["EPOCH", "QC", "Q1", "Q2", "Q3"] if att_upper in ("QUATERNION/DERIVATIVE", "QUATERNION_DERIVATIVE"): if quat_type == "LAST": base = [ "EPOCH", "Q1", "Q2", "Q3", "QC", "Q1_DOT", "Q2_DOT", "Q3_DOT", "QC_DOT", ] else: base = [ "EPOCH", "QC", "Q1", "Q2", "Q3", "QC_DOT", "Q1_DOT", "Q2_DOT", "Q3_DOT", ] return base if att_upper in ("QUATERNION/RATE", "QUATERNION_RATE"): rates = [_rate[d] for d in rot_seq if d in _rate] if quat_type == "LAST": return ["EPOCH", "Q1", "Q2", "Q3", "QC"] + rates return ["EPOCH", "QC", "Q1", "Q2", "Q3"] + rates if att_upper in ("EULER_ANGLE", "EULER_ANGLE_1"): angles = [_angle[d] for d in rot_seq if d in _angle] return ["EPOCH"] + angles if att_upper in ("EULER_ANGLE/RATE", "EULER_ANGLE_RATE"): angles = [_angle[d] for d in rot_seq if d in _angle] rates = [_rate[d] for d in rot_seq if d in _rate] return ["EPOCH"] + angles + rates if att_upper in ("SPIN", "SPIN_1"): return ["EPOCH", "SPIN_ALPHA", "SPIN_DELTA", "SPIN_ANGLE", "SPIN_ANGLE_VEL"] if att_upper in ("SPIN/NUTATION", "SPIN_NUTATION"): return [ "EPOCH", "SPIN_ALPHA", "SPIN_DELTA", "SPIN_ANGLE", "SPIN_ANGLE_VEL", "NUTATION", "NUTATION_PER", "NUTATION_PHASE", ] # Fallback return ["EPOCH"] def _forward_looking_pass( all_lines: Sequence[KvnLine], labels: list[str | None], fallback: str, ) -> list[str | None]: """ Forward-looking pass: assign labels to unlabelled non-blank lines. Each unlabelled line looks ahead to the next labelled line. If no blank line intervenes, it inherits that label; otherwise it falls back to *fallback*. Mutates and returns *labels*. """ n = len(all_lines) for i in range(n): if labels[i] is not None or isinstance(all_lines[i], BlankLine): continue has_blank = False next_label: str | None = None for j in range(i + 1, n): if isinstance(all_lines[j], BlankLine): has_blank = True elif labels[j] is not None: next_label = labels[j] break labels[i] = ( next_label if (next_label is not None and not has_blank) else fallback ) return labels def _label_lines( all_lines: Sequence[KvnLine], container_map: dict[str, tuple[str, type, bool]], ) -> list[str | None]: """ Assign a label to every line in *all_lines*. Returns a parallel list of label strings (or ``None`` for BlankLines). Each label is either ``"own"`` or a field name string. """ n = len(all_lines) labels: list[str | None] = [None] * n for i, ln in enumerate(all_lines): if isinstance(ln, KvLine): kw = ln.key if kw.startswith("USER_DEFINED_") or kw not in container_map: labels[i] = "own" else: labels[i] = container_map[kw][0] return _forward_looking_pass(all_lines, labels, "own") def _partition_lines( all_lines: Sequence[KvnLine], labels: list[str | None], container_map: dict[str, tuple[str, type, bool]], list_first_kw: dict[str, str], ) -> tuple[list[KvnLine], dict[str, list[list[KvnLine]]]]: """ Partition lines into "own" and per-container buckets using labels. """ own_lines: list[KvnLine] = [] container_buckets: dict[str, list[list[KvnLine]]] = {} for i, ln in enumerate(all_lines): if isinstance(ln, BlankLine): continue lbl = labels[i] if lbl == "own": own_lines.append(ln) continue if lbl is None: raise ValueError( f"Label for line index {i} is None — routing table is incomplete." ) fname = lbl if isinstance(ln, KvLine) and ln.key in container_map: _, _, is_list = container_map[ln.key] if is_list: first_kw = list_first_kw.get(fname) buckets = container_buckets.setdefault(fname, []) if ln.key == first_kw or not buckets: stolen: list[KvnLine] = [] if buckets and buckets[-1]: while buckets[-1] and isinstance(buckets[-1][-1], CommentLine): stolen.append(buckets[-1].pop()) if not buckets[-1]: buckets.pop() stolen.reverse() buckets.append(stolen + [ln]) else: buckets[-1].append(ln) else: buckets = container_buckets.setdefault(fname, [[]]) buckets[-1].append(ln) else: buckets = container_buckets.setdefault(fname, []) if not buckets: buckets.append([ln]) else: buckets[-1].append(ln) return own_lines, container_buckets def _partition_flat_type_lines( all_flat: list[KvnLine], header_kws: set[str], meta_kws: set[str] ) -> tuple[list[KvnLine], list[KvnLine], list[KvnLine]]: """ Partition flat-type document lines into header, metadata, and data sections. """ n_flat = len(all_flat) flat_labels: list[str | None] = [None] * n_flat for i, ln in enumerate(all_flat): if isinstance(ln, KvLine): if ln.key in header_kws: flat_labels[i] = "header" elif ln.key in meta_kws: flat_labels[i] = "meta" else: flat_labels[i] = "data" for i in range(n_flat): if flat_labels[i] is not None or isinstance(all_flat[i], BlankLine): continue has_blank = False next_label: str | None = None for j in range(i + 1, n_flat): if isinstance(all_flat[j], BlankLine): has_blank = True elif flat_labels[j] is not None: next_label = flat_labels[j] break if next_label is not None and not has_blank: flat_labels[i] = next_label else: flat_labels[i] = "data" # Rule 4: orphan COMMENTs above the first header keyword → header # (only applies when header keywords exist in the partition) first_hdr_pos = next( (i for i, lb in enumerate(flat_labels) if lb == "header"), None ) if first_hdr_pos is not None: for i in range(first_hdr_pos): if isinstance(all_flat[i], CommentLine) and flat_labels[i] == "data": flat_labels[i] = "header" hdr_lines: list[KvnLine] = [] meta_lines: list[KvnLine] = [] data_lines: list[KvnLine] = [] for i, ln in enumerate(all_flat): lbl = flat_labels[i] if isinstance(ln, BlankLine): data_lines.append(ln) elif lbl == "header": hdr_lines.append(ln) elif lbl == "meta": meta_lines.append(ln) else: data_lines.append(ln) return hdr_lines, meta_lines, data_lines # --------------------------------------------------------------------------- # Core recursive builders (adapted for registry dispatch) # --------------------------------------------------------------------------- def _build_sub_object(sub_clazz, inst_lines: Sequence[KvnLine], schema_reg) -> object: """ Build a sub-container xsdata object from a flat list of KvnLines. Uses the registry to dispatch to special builders (rotation types, etc.) instead of hard-coding class names. """ handler = schema_reg.get_handler(sub_clazz.__name__) if handler.parse in (PARSE_ROTATION_ANGLE, PARSE_ROTATION_RATE): return _build_rotation_type(sub_clazz, inst_lines) # PARSE_DEFAULT path has_nested = any( f.metadata.get("name", "") and not f.metadata["name"].isupper() for f in dataclasses.fields(sub_clazz) ) if has_nested: return _build_nested_data(sub_clazz, inst_lines, schema_reg) # Standard: all UPPER_CASE keywords rows = _kvlines_to_rows(inst_lines) obj = build_ndm_object(sub_clazz, rows) # Handle no-"name" transparent sub-sub-containers sub_hints = _hints(sub_clazz) for f in dataclasses.fields(sub_clazz): meta_name = f.metadata.get("name") if (meta_name is None or meta_name == "") and dataclasses.is_dataclass( _unwrap(sub_hints[f.name]) ): inner_clazz = _unwrap(sub_hints[f.name]) inner_obj = build_ndm_object(inner_clazz, rows) setattr(obj, f.name, inner_obj) return obj def _build_nested_data(data_cls, all_lines: Sequence[KvnLine], schema_reg) -> object: """ Build a data object whose fields include camelCase sub-containers. Partitions flat KvnLines into per-container buckets by keyword membership, builds each sub-container with :func:`build_ndm_object` or a specialised builder, and attaches the results to the data object. """ data_hints_local = _hints(data_cls) # --- Step 1: Build keyword → (field_name, sub_clazz, is_list) map --- container_map: dict[str, tuple[str, type, bool]] = {} list_first_kw: dict[str, str] = {} for f in dataclasses.fields(data_cls): meta_name = f.metadata.get("name", "") if not meta_name or meta_name.isupper(): continue ftype_raw = data_hints_local[f.name] is_list_field = getattr(ftype_raw, "__origin__", None) is list if is_list_field: item_type = _unwrap(ftype_raw.__args__[0]) kws = _collect_all_kws(item_type) for kw in kws: if kw and kw != "COMMENT": container_map[kw] = (f.name, item_type, True) first = next((k for k in kws if k and k != "COMMENT"), None) if first: list_first_kw[f.name] = first else: sub_clazz = _unwrap(ftype_raw) kws = _collect_all_kws(sub_clazz) for kw in kws: if kw and kw != "COMMENT": container_map[kw] = (f.name, sub_clazz, False) # --- Step 2: Label lines --- labels = _label_lines(all_lines, container_map) # --- Step 3: Partition lines --- own_lines, container_buckets = _partition_lines( all_lines, labels, container_map, list_first_kw ) # --- Step 4: Build data object from own_rows --- own_rows = _kvlines_to_rows(own_lines) seg_data = build_ndm_object(data_cls, own_rows) # --- Step 5: Handle no-"name" sub-containers --- for f in dataclasses.fields(data_cls): meta_name = f.metadata.get("name") if (meta_name is None or meta_name == "") and dataclasses.is_dataclass( _unwrap(data_hints_local[f.name]) ): inner_clazz = _unwrap(data_hints_local[f.name]) inner_obj = build_ndm_object(inner_clazz, own_rows) setattr(seg_data, f.name, inner_obj) # --- Step 6: Build and attach each camelCase sub-container --- for f in dataclasses.fields(data_cls): meta_name = f.metadata.get("name", "") if not meta_name or meta_name.isupper(): continue if f.name not in container_buckets: continue ftype_raw = data_hints_local[f.name] is_list_field = getattr(ftype_raw, "__origin__", None) is list buckets = container_buckets[f.name] if is_list_field: item_type = _unwrap(ftype_raw.__args__[0]) built_list = [ _build_sub_object(item_type, inst, schema_reg) for inst in buckets ] setattr(seg_data, f.name, built_list) else: sub_clazz = _unwrap(ftype_raw) inst = buckets[0] if buckets else [] setattr(seg_data, f.name, _build_sub_object(sub_clazz, inst, schema_reg)) # --- Step 7: USER_DEFINED parameters --- ud_field = next( ( f for f in dataclasses.fields(data_cls) if f.name == "user_defined_parameters" ), None, ) if ud_field is not None: ud_rows = [r for r in own_rows if r[0].startswith("USER_DEFINED_")] if ud_rows: ud_type = _unwrap(data_hints_local["user_defined_parameters"]) ud_obj = build_ndm_object(ud_type, ud_rows, prefix="USER_DEFINED") setattr(seg_data, "user_defined_parameters", ud_obj) return seg_data # --------------------------------------------------------------------------- # Flat-type builder (OPM, OMM, APM, RDM) # --------------------------------------------------------------------------- def _build_flat( doc: KvnDocument, root_clazz, header_clazz, body_clazz, segment_clazz, meta_clazz, data_clazz, schema_reg, ) -> object: """Build a flat-type (OPM, OMM, APM, RDM) NDM object.""" header_kws = set(get_ccsds_kw_list(header_clazz)) - {"COMMENT"} meta_kws = set(get_ccsds_kw_list(meta_clazz)) - {"COMMENT"} all_flat = doc.segments[0].data hdr_from_data, meta_lines, data_lines = _partition_flat_type_lines( all_flat, header_kws, meta_kws ) hdr_lines = list(doc.header) + hdr_from_data ndm_header = build_ndm_object(header_clazz, _kvlines_to_rows(hdr_lines)) seg_meta = build_ndm_object(meta_clazz, _kvlines_to_rows(meta_lines)) seg_data = _build_nested_data(data_clazz, data_lines, schema_reg) built_seg = _lenient_class_factory( segment_clazz, {"metadata": seg_meta, "data": seg_data} ) ndm_body = _lenient_class_factory(body_clazz, {"segment": built_seg}) return _lenient_class_factory(root_clazz, {"header": ndm_header, "body": ndm_body}) # --------------------------------------------------------------------------- # CDM builder # --------------------------------------------------------------------------- def _label_cdm_lines( all_lines: list[KvnLine], header_kws: set[str] ) -> list[str | None]: """Label CDM data lines as "header" or "other".""" n = len(all_lines) labels: list[str | None] = [None] * n for i, ln in enumerate(all_lines): if isinstance(ln, KvLine): labels[i] = "header" if ln.key in header_kws else "other" _forward_looking_pass(all_lines, labels, "other") # Rule 4: orphan COMMENTs above first header keyword → header first_hdr = next((i for i, lb in enumerate(labels) if lb == "header"), n) for i in range(first_hdr): if isinstance(all_lines[i], CommentLine) and labels[i] == "other": labels[i] = "header" return labels def _split_cdm_objects( all_lines: list[KvnLine], labels: list[str | None], doc_header: list[KvnLine] ) -> tuple[list[KvnLine], list[KvnLine], list[list[KvnLine]]]: """Split CDM data lines into header, relative metadata, and object segments.""" cdm_hdr_lines: list[KvnLine] = list(doc_header) cdm_rel_meta_lines: list[KvnLine] = [] cdm_obj_splits: list[list[KvnLine]] = [] cdm_rel_done = False current_cdm_obj: list[KvnLine] = [] for i, ln in enumerate(all_lines): if isinstance(ln, BlankLine): current_cdm_obj.append(ln) continue if labels[i] == "header": cdm_hdr_lines.append(ln) elif isinstance(ln, KvLine) and ln.key == "OBJECT": stolen_comments: list[KvnLine] = [] while current_cdm_obj and isinstance( current_cdm_obj[-1], (CommentLine, BlankLine) ): stolen_comments.append(current_cdm_obj.pop()) stolen_comments.reverse() if not cdm_rel_done: cdm_rel_meta_lines = current_cdm_obj cdm_rel_done = True else: cdm_obj_splits.append(current_cdm_obj) current_cdm_obj = stolen_comments + [ln] else: current_cdm_obj.append(ln) if current_cdm_obj: if not cdm_rel_done: cdm_rel_meta_lines = current_cdm_obj else: cdm_obj_splits.append(current_cdm_obj) return cdm_hdr_lines, cdm_rel_meta_lines, cdm_obj_splits def _build_cdm_object_segment( obj_lines: list[KvnLine], meta_kws: set[str], meta_clazz, data_clazz, segment_clazz, schema_reg, ) -> object: """Build a single CDM object segment from a list of lines.""" obj_n = len(obj_lines) obj_labels: list[str | None] = [None] * obj_n for idx, ln in enumerate(obj_lines): if isinstance(ln, KvLine): obj_labels[idx] = "meta" if ln.key in meta_kws else "data" for idx in range(obj_n): if obj_labels[idx] is not None or isinstance(obj_lines[idx], BlankLine): continue has_blank = False next_label: str | None = None for j in range(idx + 1, obj_n): if isinstance(obj_lines[j], BlankLine): has_blank = True elif obj_labels[j] is not None: next_label = obj_labels[j] break obj_labels[idx] = ( next_label if (next_label is not None and not has_blank) else "data" ) obj_meta_lines: list[KvnLine] = [] obj_data_lines: list[KvnLine] = [] for idx, ln in enumerate(obj_lines): if isinstance(ln, BlankLine): obj_data_lines.append(ln) elif obj_labels[idx] == "meta": obj_meta_lines.append(ln) else: obj_data_lines.append(ln) seg_meta = build_ndm_object(meta_clazz, _kvlines_to_rows(obj_meta_lines)) seg_data = _build_nested_data(data_clazz, obj_data_lines, schema_reg) return _lenient_class_factory( segment_clazz, {"metadata": seg_meta, "data": seg_data} ) def _build_cdm_flat_object( doc: KvnDocument, header_clazz, meta_clazz, data_clazz, body_clazz, segment_clazz, schema_reg, root_clazz, ) -> object: """ Build a CDM (Conjunction Data Message) object. The new ``_dispatch_cdm`` in ``kvn_parser.py`` already splits the document into pre-structured segments: - ``doc.header`` — top-level header lines - ``doc.segments[0].meta`` — relative metadata lines - ``doc.segments[1].data`` — OBJECT 1 lines - ``doc.segments[2].data`` — OBJECT 2 lines """ meta_kws = set(get_ccsds_kw_list(meta_clazz)) - {"COMMENT"} rel_meta_clazz = _unwrap(_hints(body_clazz)["relative_metadata_data"]) # Build header ndm_header = build_ndm_object(header_clazz, _kvlines_to_rows(doc.header)) # Build relative metadata from pre-split segment rel_meta_obj = _build_nested_data(rel_meta_clazz, doc.segments[0].meta, schema_reg) # Build object segments from pre-split segments built_segments = [ _build_cdm_object_segment( seg.data, meta_kws, meta_clazz, data_clazz, segment_clazz, schema_reg ) for seg in doc.segments[1:] ] ndm_body = _lenient_class_factory( body_clazz, { "relative_metadata_data": rel_meta_obj, "segment": built_segments, }, ) return _lenient_class_factory(root_clazz, {"header": ndm_header, "body": ndm_body}) # --------------------------------------------------------------------------- # Segment-based builders (OEM, AEM, TDM) # --------------------------------------------------------------------------- def _init_segment(segment, meta_clazz, data_clazz) -> tuple[object, typing.Any]: """Build the common seg_meta and seg_data objects for a single segment.""" meta_rows = _kvlines_to_rows(segment.meta) data_rows = _kvlines_to_rows(segment.data) seg_meta = build_ndm_object(meta_clazz, meta_rows) seg_data = init_root_ndm_object(data_clazz) for row in data_rows: if row[0] == "COMMENT": seg_data.comment.append(row[1]) return seg_meta, seg_data def _attach_oem_covariance( block_covariance: list[KvnLine], data_clazz, oem_segments: list[tuple[typing.Any, typing.Any]], ) -> None: """ Build covariance matrix objects from a covariance block and attach them to the most recent OEM state-vector segment. """ cov_clazz = _unwrap(_hints(data_clazz)["covariance_matrix"].__args__[0]) cov_fields = [ f for f in dataclasses.fields(cov_clazz) if f.metadata.get("name", "").isupper() and f.metadata.get("name") not in ("COMMENT", "EPOCH", "COV_REF_FRAME") ] cov_hints = _hints(cov_clazz) cov_groups: list[list[KvnLine]] = [] for ln in block_covariance: if isinstance(ln, KvLine) and ln.key == "EPOCH": cov_groups.append([]) if cov_groups: cov_groups[-1].append(ln) for group in cov_groups: group_rows = _kvlines_to_rows(group) group_matrix_values = [ ln.tokens for ln in group if isinstance(ln, CovarianceRowLine) ] cov_obj = build_ndm_object(cov_clazz, group_rows) flat_vals = [v for row in group_matrix_values for v in row] for cf, val_str in zip(cov_fields, flat_vals): cf_type = _unwrap(cov_hints[cf.name]) leaf_hints = _hints(cf_type) leaf_val = float(val_str) setattr( cov_obj, cf.name, cf_type(**{list(leaf_hints.keys())[0]: leaf_val}), ) last_data = oem_segments[-1][1] last_data.covariance_matrix.append(cov_obj) def _build_oem_segments( doc: KvnDocument, meta_clazz, data_clazz, segment_clazz, schema_reg=None, ) -> list[object]: """Build segments for OEM (Orbit Ephemeris Message) documents.""" oem_segments: list[tuple[typing.Any, typing.Any]] = [] for segment in doc.segments: if segment.covariance: if not oem_segments: continue _attach_oem_covariance(segment.covariance, data_clazz, oem_segments) continue seg_meta, seg_data = _init_segment(segment, meta_clazz, data_clazz) sv_clazz = _unwrap(_hints(data_clazz)["state_vector"].__args__[0]) sv_kws = [ f.metadata.get("name") for f in dataclasses.fields(sv_clazz) if f.metadata.get("name") and f.metadata.get("name") != "COMMENT" ] for ln in segment.data: if isinstance(ln, PackedDataLine): sv_rows = [[kw, tok] for kw, tok in zip(sv_kws, ln.tokens)] seg_data.state_vector.append(build_ndm_object(sv_clazz, sv_rows)) oem_segments.append((seg_meta, seg_data)) return [ _lenient_class_factory(segment_clazz, {"metadata": m, "data": d}) for m, d in oem_segments ] def _build_aem_segments( doc: KvnDocument, meta_clazz, data_clazz, segment_clazz, schema_reg=None, ) -> list[object]: """Build segments for AEM (Attitude Ephemeris Message) documents.""" built_segments: list[object] = [] att_state_clazz = _unwrap(_hints(data_clazz)["attitude_state"].__args__[0]) for segment in doc.segments: seg_meta, seg_data = _init_segment(segment, meta_clazz, data_clazz) template = _att_column_template(segment.meta) kv_meta = {ln.key: ln.value for ln in segment.meta if isinstance(ln, KvLine)} att_type_str = kv_meta.get("ATTITUDE_TYPE", "").upper() att_field_name = _AEM_FIELD_MAP.get(att_type_str, "quaternion_state") att_sub_clazz = _unwrap(_hints(att_state_clazz)[att_field_name]) for ln in segment.data: if isinstance(ln, PackedDataLine): packed_kvlines = [ KvLine(key=kw, value=tok) for kw, tok in zip(template, ln.tokens) ] sub_obj = _build_sub_object(att_sub_clazz, packed_kvlines, schema_reg) att_obj = _lenient_class_factory( att_state_clazz, {att_field_name: sub_obj} ) seg_data.attitude_state.append(att_obj) built_segments.append( _lenient_class_factory( segment_clazz, {"metadata": seg_meta, "data": seg_data} ) ) return built_segments def _build_tdm_segments( doc: KvnDocument, meta_clazz, data_clazz, segment_clazz, schema_reg=None, ) -> list[object]: """Build segments for TDM (Tracking Data Message) documents.""" built_segments: list[object] = [] obs_clazz = _unwrap(_hints(data_clazz)["observation"].__args__[0]) for segment in doc.segments: seg_meta, seg_data = _init_segment(segment, meta_clazz, data_clazz) for ln in segment.data: if isinstance(ln, TdmObsLine): obs_rows = [["EPOCH", ln.epoch], [ln.key, ln.value]] seg_data.observation.append(build_ndm_object(obs_clazz, obs_rows)) built_segments.append( _lenient_class_factory( segment_clazz, {"metadata": seg_meta, "data": seg_data} ) ) return built_segments # Dispatch table: data class name → segment builder function _SEGMENT_BUILDERS: dict[str, typing.Callable] = { "OemData": _build_oem_segments, "AemData": _build_aem_segments, "TdmData": _build_tdm_segments, } def _build_segment_based_object( doc: KvnDocument, header_clazz, meta_clazz, data_clazz, segment_clazz, schema_reg=None, ) -> tuple[object, list[object]]: """Build header and segments for segment-based types (OEM, AEM, TDM).""" header_rows = _kvlines_to_rows(doc.header) ndm_header = build_ndm_object(header_clazz, header_rows) builder = _SEGMENT_BUILDERS[data_clazz.__name__] built_segments = builder(doc, meta_clazz, data_clazz, segment_clazz, schema_reg) return ndm_header, built_segments # --------------------------------------------------------------------------- # Public entry point # ---------------------------------------------------------------------------
[docs] def build_object(doc: KvnDocument) -> object: """ Map a :class:`KvnDocument` onto the xsdata dataclass tree for the detected NDM type. Uses the schema registry to determine the document dispatch strategy (flat / segmented / CDM) instead of probing the document structure. Parameters ---------- doc : KvnDocument Output of :func:`~ccsds_ndm.kvn_parser.dispatch_document`. Returns ------- object Fully populated root xsdata dataclass instance. """ root_clazz = doc.ndm_type.clazz schema_reg = VERSION_REGISTRY[doc.ndm_type.req_combi_version] # Resolve the type chain root_hints = _hints(root_clazz) header_clazz = _unwrap(root_hints["header"]) body_clazz = _unwrap(root_hints["body"]) body_hints = _hints(body_clazz) segment_field_type_raw = body_hints["segment"] is_list_segment = getattr(segment_field_type_raw, "__origin__", None) is list if is_list_segment: segment_clazz = _unwrap(segment_field_type_raw.__args__[0]) else: segment_clazz = _unwrap(segment_field_type_raw) seg_hints = _hints(segment_clazz) meta_clazz = _unwrap(seg_hints["metadata"]) data_clazz = _unwrap(seg_hints["data"]) # Use registry dispatch sentinel dispatch = schema_reg.dispatch.get(root_clazz.__name__) if dispatch == DISPATCH_CDM: return _build_cdm_flat_object( doc, header_clazz, meta_clazz, data_clazz, body_clazz, segment_clazz, schema_reg, root_clazz, ) elif dispatch == DISPATCH_SEGMENTED: ndm_header, built_segments = _build_segment_based_object( doc, header_clazz, meta_clazz, data_clazz, segment_clazz, schema_reg ) ndm_body = _lenient_class_factory(body_clazz, {"segment": built_segments}) return _lenient_class_factory( root_clazz, {"header": ndm_header, "body": ndm_body} ) else: # DISPATCH_FLAT return _build_flat( doc, root_clazz, header_clazz, body_clazz, segment_clazz, meta_clazz, data_clazz, schema_reg, )