Source code for ccsds_ndm.kvn_parser

# CCSDS-NDM: CCSDS Navigation Data Messages Read/Write Library
#
# Copyright (C) Egemen Imre
#
# Licensed under GNU GPL v3.0. See LICENSE for more info.
"""
KVN parser: type identification, document dispatch, and block location.

Phase 1 implements:
  1. ``identify_ndm_type``   — scan tokenised lines for ``CCSDS_*_VERS``
  2. ``dispatch_document``   — split lines into header + Segment list
                               (flat / segmented / CDM structural variants)
  3. ``locate_blocks``       — given a list of KvLine objects and a target
                               dataclass, return the keyword-boundary spans
                               for every instance of that class (DEFAULT
                               locator only — no packed/covariance yet)
"""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum, IntEnum

from ccsds_ndm.kvn_registry import VERSION_REGISTRY as _VERSION_REGISTRY
from ccsds_ndm.kvn_tokenizer import (
    BlankLine,
    CommentLine,
    KvLine,
    KvnLine,
    PackedDataLine,
    SectionMarkerLine,
    TdmObsLine,
)
from ccsds_ndm.mapping import _NdmDataType

# ---------------------------------------------------------------------------
# Data structures produced by the dispatcher
# ---------------------------------------------------------------------------


[docs] @dataclass class Segment: """ One logical NDM segment as extracted by the document dispatcher. For segmented types (OEM, AEM, TDM) each ``META_START/STOP`` block produces one ``Segment``; a ``COVARIANCE_START/STOP`` block also produces a ``Segment`` with only ``covariance`` populated. For flat types (OPM, OMM, APM, RDM, CDM) there is exactly one ``Segment`` with all non-header lines in ``data``. """ meta: list[KvnLine] = field(default_factory=list) data: list[KvnLine] = field(default_factory=list) covariance: list[KvnLine] = field(default_factory=list)
[docs] @dataclass class KvnDocument: """ Top-level representation of a dispatched KVN file. Attributes ---------- ndm_type : _NdmDataType Enum member identifying the message type and xsdata class. header : list[KvnLine] Lines that belong to the NDM header (``CCSDS_*_VERS``, ``CREATION_DATE``, ``ORIGINATOR``, and top-level ``COMMENT`` lines). segments : list[Segment] Ordered list of logical segments. One entry for flat types; one per ``META_START/STOP`` block (plus optional covariance segments) for segmented types. """ ndm_type: _NdmDataType header: list[KvnLine] = field(default_factory=list) segments: list[Segment] = field(default_factory=list)
# --------------------------------------------------------------------------- # Step 1 — Identify NDM type # ---------------------------------------------------------------------------
[docs] def identify_ndm_type(lines: list[KvnLine]) -> _NdmDataType: """ Scan tokenised KVN lines and return the matching :class:`_NdmDataType`. Searches for the first ``KvLine`` whose key starts with ``"CCSDS_"`` and delegates to :meth:`_NdmDataType.find_ndm_type_by_class_id`. Parameters ---------- lines : list[KvnLine] Output of :func:`~ccsds_ndm.kvn_utils.kvn_utils_tokenizer.tokenize`. Returns ------- _NdmDataType Raises ------ ValueError If no ``CCSDS_*_VERS`` line is found. """ for line in lines: if isinstance(line, KvLine) and line.key.startswith("CCSDS_"): return _NdmDataType.find_ndm_type_by_class_id(line.key, line.value.strip()) raise ValueError("No CCSDS_*_VERS header line found in KVN data.")
# --------------------------------------------------------------------------- # Step 2 — Document-level dispatch # ---------------------------------------------------------------------------
[docs] class ParserState(Enum): """State machine states for document-level dispatch.""" HEADER = "HEADER" IN_META = "IN_META" AFTER_META = "AFTER_META" IN_DATA = "IN_DATA" IN_COVARIANCE = "IN_COVARIANCE"
[docs] class CdmBucket(IntEnum): """Bucket indices for CDM document structure.""" HEADER = 0 REL_META = 1 OBJECT_1 = 2 OBJECT_2 = 3
[docs] def dispatch_document(lines: list[KvnLine]) -> KvnDocument: """ Split a tokenised KVN line list into a :class:`KvnDocument`. Identifies the NDM type from the ``CCSDS_*_VERS`` header line and then routes to one of three structural variants: * **Segmented** (OEM, AEM, TDM) — state machine over ``META_START/STOP``, ``DATA_START/STOP``, ``COVARIANCE_START/STOP`` markers. * **Flat** (OPM, OMM, APM, RDM) — no markers; all lines after the first ``CCSDS_*_VERS`` line form one segment (meta lines are separated from data lines by the locator later). * **CDM** — split on ``OBJECT_1`` / ``OBJECT_2`` keyword occurrences; ``RELATIVE_METADATA_DATA`` section handled as separate segment. Parameters ---------- lines : list[KvnLine] Output of :func:`tokenize`. Returns ------- KvnDocument """ ndm_type = identify_ndm_type(lines) ndm_id = ndm_type.ndm_id # lowercase, e.g. "oem", "opm" reg = _VERSION_REGISTRY[ndm_type.req_combi_version] if ndm_id in reg.segmented_ids: header, segments = _dispatch_segmented(lines) elif ndm_id in reg.cdm_ids: header, segments = _dispatch_cdm(lines) else: header, segments = _dispatch_flat(lines) return KvnDocument(ndm_type=ndm_type, header=header, segments=segments)
# -- Segmented dispatch (OEM, AEM, TDM) ------------------------------------- def _dispatch_segmented(lines: list[KvnLine]) -> tuple[list[KvnLine], list[Segment]]: """State-machine dispatch for segment-based message types.""" state = ParserState.HEADER header: list[KvnLine] = [] segments: list[Segment] = [] current: Segment | None = None for line in lines: if isinstance(line, SectionMarkerLine): state, current = _handle_section_marker(line, state, current, segments) continue # section markers are not stored in any list if isinstance(line, BlankLine): _route_blank(line, state, header, current) continue state, current = _route_line(line, state, header, current, segments) # Flush any remaining AFTER_META segment (last OEM segment) if state == ParserState.AFTER_META and current is not None: segments.append(current) return header, segments def _handle_section_marker( line: SectionMarkerLine, state: ParserState, current: Segment | None, segments: list[Segment], ) -> tuple[ParserState, Segment | None]: """Update state machine for a section-marker line.""" # Flush an open AFTER_META segment when a new section begins if state == ParserState.AFTER_META and current is not None: if line.key in ("META_START", "COVARIANCE_START"): segments.append(current) current = None match line.key: case "META_START": return ParserState.IN_META, Segment() case "META_STOP": return ParserState.AFTER_META, current case "DATA_START": # TDM: data block follows DATA_START without packed lines return ParserState.IN_DATA, current if current is not None else Segment() case "DATA_STOP": if current is not None: segments.append(current) return ParserState.HEADER, None case "COVARIANCE_START": return ParserState.IN_COVARIANCE, Segment() case "COVARIANCE_STOP": if current is not None: segments.append(current) return ParserState.HEADER, None case _: return state, current def _route_blank( line: BlankLine, state: ParserState, header: list[KvnLine], current: Segment | None, ) -> None: """Append a blank line to the appropriate bucket.""" if state == ParserState.HEADER: header.append(line) elif state in (ParserState.IN_META, ParserState.AFTER_META, ParserState.IN_DATA): if current is not None: current.data.append(line) def _route_line( line: KvnLine, state: ParserState, header: list[KvnLine], current: Segment | None, segments: list[Segment], ) -> tuple[ParserState, Segment | None]: """Append a non-blank, non-marker line to the appropriate bucket.""" if state == ParserState.HEADER: header.append(line) elif state == ParserState.IN_META: if current is None: raise ValueError("Parser state IN_META but no current segment exists.") current.meta.append(line) elif state in (ParserState.AFTER_META, ParserState.IN_DATA): # OEM: packed data lines follow META_STOP without DATA_START if current is None: raise ValueError( f"Parser state {state.value} but no current segment exists." ) current.data.append(line) elif state == ParserState.IN_COVARIANCE: if current is None: raise ValueError( "Parser state IN_COVARIANCE but no current segment exists." ) current.covariance.append(line) return state, current # -- Flat dispatch (OPM, OMM, APM, RDM) ------------------------------------ # Keywords that belong to the header (flat types have no META_START/STOP) _HEADER_KWS = frozenset( { "CREATION_DATE", "ORIGINATOR", "MESSAGE_ID", "MESSAGE_FOR", } ) def _dispatch_flat(lines: list[KvnLine]) -> tuple[list[KvnLine], list[Segment]]: """ Dispatch for flat message types (OPM, OMM, APM, RDM). The first ``KvLine`` is the ``CCSDS_*_VERS`` line (goes to header only). Subsequent header keyword lines (``CREATION_DATE``, ``ORIGINATOR``, ``MESSAGE_ID``) also go to the header. All remaining lines form a single ``Segment.data`` list; the locator will further partition them into metadata / data sub-blocks. """ header: list[KvnLine] = [] body: list[KvnLine] = [] vers_seen = False in_header_section = False for line in lines: dest, vers_seen, in_header_section = _classify_flat_line( line, vers_seen, in_header_section ) if dest == "header": header.append(line) elif dest == "body": body.append(line) return header, [Segment(data=body)] # -- CDM dispatch ----------------------------------------------------------- def _dispatch_cdm(lines: list[KvnLine]) -> tuple[list[KvnLine], list[Segment]]: """ Dispatch for CDM files. CDM structure: - Header section (CCSDS_CDM_VERS, CREATION_DATE, ORIGINATOR, MESSAGE_*) - Relative metadata section (TCA, MISS_DISTANCE, … up to first OBJECT line) - Object 1 section (OBJECT = OBJECT1 … up to next OBJECT line) - Object 2 section (OBJECT = OBJECT2 … to end) The two object sections are delimited by ``OBJECT`` keyword lines whose value is ``OBJECT1`` or ``OBJECT2`` (not separate OBJECT_1/OBJECT_2 keys). We produce: - ``header`` — top-level header lines - ``segments[0]`` — relative metadata lines (Segment.meta) - ``segments[1]`` — OBJECT 1 lines (Segment.data) - ``segments[2]`` — OBJECT 2 lines (Segment.data) """ header: list[KvnLine] = [] rel_meta: list[KvnLine] = [] obj1: list[KvnLine] = [] obj2: list[KvnLine] = [] buckets = [header, rel_meta, obj1, obj2] bucket = CdmBucket.HEADER vers_seen = False pending_comments: list[KvnLine] = [] # comments buffered until bucket is determined for line in lines: if isinstance(line, BlankLine): _cdm_route_blank(line, vers_seen, pending_comments, buckets, bucket) continue # Buffer comments: we don't know which bucket they belong to until the # next KvLine triggers a potential bucket transition (e.g. "Relative # Metadata/Data" appears before TCA which flips HEADER→REL_META). if isinstance(line, CommentLine): pending_comments.append(line) continue if isinstance(line, KvLine): if line.key.startswith("CCSDS_") and not vers_seen: header.append(line) vers_seen = True pending_comments.clear() continue bucket = _cdm_advance_bucket(line, bucket) # Flush buffered comments into the now-resolved bucket, then add line buckets[bucket].extend(pending_comments) pending_comments.clear() buckets[bucket].append(line) return header, [ Segment(meta=rel_meta), Segment(data=obj1), Segment(data=obj2), ] # --------------------------------------------------------------------------- # Step 3 — Default locator # ---------------------------------------------------------------------------
[docs] @dataclass class BlockSpan: """ A located sub-block within a list of KvnLines. Attributes ---------- start : int Index of the first line (inclusive) in the source list. end : int Index one past the last line (exclusive) in the source list. lines : list[KvnLine] The actual lines within [start, end). """ start: int end: int lines: list[KvnLine]
[docs] def locate_blocks( lines: list[KvnLine], cls, ) -> list[BlockSpan]: """ Locate every instance of ``cls`` within ``lines`` using the default keyword-set strategy (``LOC_DEFAULT``). The keyword set for ``cls`` is derived from its dataclass field metadata (``"name"`` entries that are ALL_UPPER strings). A new instance of ``cls`` is considered to start whenever the anchor keyword (first declared uppercase field) is encountered, and to end just before the next occurrence of that same anchor keyword (or at end-of-list). This covers every flat ``KEY = VALUE`` container: ``NdmHeader``, ``OpmMetadata``, ``StateVectorType``, ``ManeuverParametersType``, ``SpacecraftParametersType``, ``KeplerianElementsType``, etc. Packed-data, covariance, and TDM-observation types are NOT handled here; they will be covered by their own locators in later phases. Parameters ---------- lines : list[KvnLine] Source lines for one segment bucket (e.g. ``Segment.data`` for flat types, or ``Segment.meta`` for metadata-only classes). cls : type The dataclass whose instances are to be located. Returns ------- list[BlockSpan] Ordered list of located spans, one per detected instance. """ # Collect every keyword name declared by cls (e.g. {"EPOCH", "X", "Y", …}). # Non-uppercase field names (nested objects, metadata) are excluded. kw_set = _kw_set_for(cls) if not kw_set: # cls has no uppercase KVN fields — nothing to locate return [] # The anchor is the first uppercase field; it delimits individual instances. # Example: for StateVectorType the anchor is "EPOCH". anchor = _anchor_kw_for(cls) if anchor is None: # No single field can serve as an instance delimiter (rare). # Treat the entire line list as one span if it contains any member kw. return _single_span_from_member_kws(lines, kw_set) # Find every line index where the anchor keyword appears. # Each occurrence marks the start of a new instance of cls. anchor_positions = [ i for i, ln in enumerate(lines) if isinstance(ln, KvLine) and ln.key == anchor ] if not anchor_positions: # Anchor is optional in this message (e.g. EPOCH absent in some blocks). # Fall back: form one span from the first to last member keyword. return _single_span_from_member_kws(lines, kw_set) spans: list[BlockSpan] = [] for idx, pos in enumerate(anchor_positions): # The raw chunk runs from this anchor up to (but not including) the # next anchor — or to the end of the list for the last instance. next_pos = ( anchor_positions[idx + 1] if idx + 1 < len(anchor_positions) else len(lines) ) chunk = lines[pos:next_pos] # Clip: find the last line in the chunk whose key belongs to kw_set. # This stops the span from absorbing lines that belong to the next # sibling block (e.g. StateVectorType should not grab keplerian keys). last_kw_offset = _last_member_kw_offset(chunk, kw_set) if last_kw_offset == -1: continue # anchor present but no member keywords — skip empty chunk # Trim the chunk to end at the last recognised keyword chunk = chunk[: last_kw_offset + 1] spans.append(BlockSpan(start=pos, end=pos + last_kw_offset + 1, lines=chunk)) return spans
# --------------------------------------------------------------------------- # Step 3b — Packed-state locator (OEM state vectors) # ---------------------------------------------------------------------------
[docs] def locate_packed_state(lines: list[KvnLine]) -> list[BlockSpan]: """ Locate every ``PackedDataLine`` in *lines* as one ``BlockSpan`` each. Each packed line represents a single epoch-led state vector record (``EPOCH X Y Z X_DOT Y_DOT Z_DOT [X_DDOT …]``). Used by ``StateVectorAccType`` in OEM data sections. Parameters ---------- lines : list[KvnLine] Typically ``Segment.data`` from an OEM segment. Returns ------- list[BlockSpan] """ return [ BlockSpan(start=i, end=i + 1, lines=[ln]) for i, ln in enumerate(lines) if isinstance(ln, PackedDataLine) ]
# --------------------------------------------------------------------------- # Step 3c — Packed-attitude locator (AEM attitude states) # ---------------------------------------------------------------------------
[docs] def locate_packed_attitude(lines: list[KvnLine]) -> list[BlockSpan]: """ Locate every ``PackedDataLine`` in *lines* as one ``BlockSpan`` each. Identical to :func:`locate_packed_state` in structure — each packed line is one attitude state record. Column interpretation (quaternion, Euler, spin, …) is determined later by the parser using segment metadata. Used by ``AttitudeStateType`` in AEM data sections. Parameters ---------- lines : list[KvnLine] Typically ``Segment.data`` from an AEM segment. Returns ------- list[BlockSpan] """ return [ BlockSpan(start=i, end=i + 1, lines=[ln]) for i, ln in enumerate(lines) if isinstance(ln, PackedDataLine) ]
# --------------------------------------------------------------------------- # Step 3d — Covariance locator (OEM covariance matrices) # ---------------------------------------------------------------------------
[docs] def locate_covariance(lines: list[KvnLine]) -> list[BlockSpan]: """ Locate every covariance matrix instance in *lines*. A covariance instance starts at an ``EPOCH`` keyword line and spans until the next ``EPOCH`` line or end-of-list. Each instance typically contains the ``EPOCH`` line, an optional ``COV_REF_FRAME`` line, and 6 ``CovarianceRowLine`` objects (21 lower-triangular values total). Used by ``OemCovarianceMatrixType``. Parameters ---------- lines : list[KvnLine] Typically ``Segment.covariance`` from an OEM segment. Returns ------- list[BlockSpan] """ # Find all EPOCH positions — each starts a new covariance instance epoch_positions = [ i for i, ln in enumerate(lines) if isinstance(ln, KvLine) and ln.key == "EPOCH" ] return _spans_from_anchors(lines, epoch_positions)
# --------------------------------------------------------------------------- # Step 3e — TDM observation locator # ---------------------------------------------------------------------------
[docs] def locate_tdm_obs(lines: list[KvnLine]) -> list[BlockSpan]: """ Locate every ``TdmObsLine`` in *lines* as one ``BlockSpan`` each. Each TDM observation is a single ``KEY = EPOCH VALUE`` line. Used by ``TrackingDataObservationType`` in TDM data sections. Parameters ---------- lines : list[KvnLine] Typically ``Segment.data`` from a TDM segment. Returns ------- list[BlockSpan] """ return [ BlockSpan(start=i, end=i + 1, lines=[ln]) for i, ln in enumerate(lines) if isinstance(ln, TdmObsLine) ]
# --------------------------------------------------------------------------- # Step 3f — Packed-lines locator (OCM/ACM mixed blocks) # ---------------------------------------------------------------------------
[docs] def locate_packed_lines( lines: list[KvnLine], cls, ) -> list[BlockSpan]: """ Locate every instance of ``cls`` in *lines* using the anchor-keyword strategy, including non-KvLine rows within each span. Like :func:`locate_blocks` but does **not** clip spans at the last matching keyword — packed text lines (trajectory / attitude / covariance rows stored as plain strings) that follow the KEY=VALUE header are kept inside the span. Used by ``OcmTrajStateType``, ``OcmCovarianceMatrixType``, ``AcmAttitudeStateType``, and ``AcmCovarianceMatrixType``. Parameters ---------- lines : list[KvnLine] Source lines for one segment bucket. cls : type The dataclass whose instances are to be located. Returns ------- list[BlockSpan] """ anchor = _anchor_kw_for(cls) if anchor is None: return [] anchor_positions = [ i for i, ln in enumerate(lines) if isinstance(ln, KvLine) and ln.key == anchor ] return _spans_from_anchors(lines, anchor_positions)
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _classify_flat_line( line: KvnLine, vers_seen: bool, in_header_section: bool, ) -> tuple[str, bool, bool]: """ Classify one line for flat-dispatch routing. Returns ------- (destination, vers_seen, in_header_section) destination is one of: ``"skip"``, ``"header"``, ``"body"`` """ if isinstance(line, BlankLine): if not vers_seen: return "skip", vers_seen, in_header_section if in_header_section: return "header", vers_seen, in_header_section return "body", vers_seen, in_header_section if isinstance(line, KvLine) and line.key.startswith("CCSDS_") and not vers_seen: return "header", True, True if in_header_section and isinstance(line, KvLine) and line.key in _HEADER_KWS: return "header", vers_seen, in_header_section # First non-header-kw line: switch to body permanently return "body", vers_seen, False def _cdm_route_blank( line: BlankLine, vers_seen: bool, pending_comments: list[KvnLine], buckets: list[list[KvnLine]], bucket: CdmBucket, ) -> None: """Append a blank line to the correct CDM accumulator.""" if not vers_seen: return # If comments are pending, buffer the blank with them so the blank # stays between the comments (preserving the inter-comment gap). if pending_comments: pending_comments.append(line) else: buckets[bucket].append(line) def _cdm_advance_bucket(line: KvLine, bucket: CdmBucket) -> CdmBucket: """ Return the updated CDM bucket after seeing a non-VERS ``KvLine``. Handles the two transition rules: * ``HEADER`` → ``REL_META`` when a non-header keyword is seen. * ``OBJECT`` key value ``OBJECT1``/``OBJECT2`` transitions to the respective object bucket. """ if bucket == CdmBucket.HEADER and line.key not in _HEADER_KWS: bucket = CdmBucket.REL_META if line.key == "OBJECT": if line.value.strip() == "OBJECT1": return CdmBucket.OBJECT_1 if line.value.strip() == "OBJECT2": return CdmBucket.OBJECT_2 return bucket def _last_member_kw_offset(chunk: list[KvnLine], kw_set: frozenset[str]) -> int: """ Return the index of the last ``KvLine`` in *chunk* whose key is in *kw_set*. Returns ``-1`` if no such line exists. """ last = -1 for i, ln in enumerate(chunk): if isinstance(ln, KvLine) and ln.key in kw_set: last = i return last def _single_span_from_member_kws( lines: list[KvnLine], kw_set: frozenset[str] ) -> list[BlockSpan]: """Return a single BlockSpan covering the first-to-last matching keyword.""" member_positions = [ i for i, ln in enumerate(lines) if isinstance(ln, KvLine) and ln.key in kw_set ] if not member_positions: return [] return [ BlockSpan( start=member_positions[0], end=member_positions[-1] + 1, lines=lines[member_positions[0] : member_positions[-1] + 1], ) ] def _spans_from_anchors( lines: list[KvnLine], anchor_positions: list[int] ) -> list[BlockSpan]: """Build one BlockSpan per anchor, each running to the next anchor (or end).""" if not anchor_positions: return [] spans: list[BlockSpan] = [] for idx, pos in enumerate(anchor_positions): next_pos = ( anchor_positions[idx + 1] if idx + 1 < len(anchor_positions) else len(lines) ) chunk = lines[pos:next_pos] spans.append(BlockSpan(start=pos, end=next_pos, lines=chunk)) return spans def _kw_set_for(cls) -> frozenset[str]: """ Return the set of all-uppercase KVN keyword names declared by ``cls``. Reads the ``"name"`` metadata from each dataclass field; keeps only entries that are fully uppercase (leaf KEY = VALUE fields). """ if not hasattr(cls, "__dataclass_fields__"): return frozenset() return frozenset( f.metadata["name"] for f in cls.__dataclass_fields__.values() if "name" in f.metadata and f.metadata["name"].isupper() ) def _anchor_kw_for(cls) -> str | None: """ Return the first all-uppercase KVN keyword declared by ``cls``. Field declaration order is preserved in ``__dataclass_fields__`` (Python 3.7+), so the first uppercase-named field is a stable anchor. ``COMMENT`` is skipped because comment lines are ``CommentLine`` objects, not ``KvLine``s, and therefore cannot serve as instance delimiters. """ if not hasattr(cls, "__dataclass_fields__"): return None for f in cls.__dataclass_fields__.values(): name = f.metadata.get("name", "") if name.isupper() and name != "COMMENT": return name return None