Source code for ccsds_ndm.ndm_xml_io

# CCSDS-NDM: CCSDS Navigation Data Messages Read/Write Library
#
# Copyright (C) Egemen Imre
#
# Licensed under GNU GPL v3.0. See LICENSE for more info.
"""
CCSDS Navigation Data Messages XML File I/O.

"""

from pathlib import Path

from lxml import etree
from xsdata.formats.dataclass.parsers import XmlParser
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.serializers import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig

from ccsds_ndm.mapping import _NdmDataType


[docs] class NdmXmlIo: """ Unified I/O Model for XML input and output. """ def __init__(self): self.parser = None self.parser_config = ParserConfig(fail_on_unknown_properties=True) self.serializer = None
[docs] def from_path(self, xml_read_file_path: Path | str): """ Reads the file to extract contents to an object of correct type. Parameters ---------- xml_read_file_path : Path or AnyStr Path of the XML file to be read Returns ------- object Object tree from the file contents """ # read file contents as text file_contents = Path(xml_read_file_path).read_text() # parse as `from_string()` return self.from_string(file_contents)
[docs] def from_bytes(self, xml_source: bytes): """ Reads the input bytes array to extract contents to an object of correct type. Parameters ---------- xml_source : bytes input bytes array Returns ------- object Object tree from the file contents """ # decode bytes and parse as `from_string()` return self.from_string(xml_source.decode())
[docs] def from_string(self, xml_source: str): """ Reads the input string to extract contents to an object of correct type. Parameters ---------- xml_source : str input string data Returns ------- object Object tree from the file contents """ # lazy init parser if self.parser is None: self.parser = self._init_parser(self.parser_config) # Identify data type of the string (Oem, Apm etc.) and parse the data # Also overwrite the xml_source with the fixed one from lxml data_type, xml_source = self._identify_data_type(xml_source) ndm = self.parser.from_string(xml_source, data_type.clazz) # if the file is NDM, downcast the elements to their respective subclasses if data_type.is_combi: for tag, ndm_item_list in vars(ndm).items(): if tag == "comment" or tag == "message_id": continue for ndm_item in ndm_item_list: subclazz = type(ndm_item).__subclasses__()[0] ndm_item.__class__ = subclazz # File is NDM Combined Instantiation # If it actually has a single element, strip the ndm tags return _strip_multi_ndm(ndm) else: # Usual single element file return ndm
[docs] def to_string( self, ndm_obj, schema_location: str | None = None, no_namespace_schema_location: str | None = None, ) -> str: """ Convert and return the given object tree as xml string. Parameters ---------- ndm_obj input object tree schema_location: str | None Specify the xsi:schemaLocation attribute value no_namespace_schema_location: str | None Specify the xsi:noNamespaceSchemaLocation attribute value Returns ------- str given object tree as xml string """ # lazy init serializer self.serializer = self._init_serializer( no_namespace_schema_location=no_namespace_schema_location, schema_location=schema_location, ) return self.serializer.render(ndm_obj)
[docs] def to_file( self, ndm_obj, xml_write_file_path: Path, schema_location: str | None = None, no_namespace_schema_location: str | None = None, ): """ Convert the given object tree as xml file. Parameters ---------- ndm_obj input object tree xml_write_file_path : Path Path of the XML file to be written schema_location: str | None Specify the xsi:schemaLocation attribute value no_namespace_schema_location: str | None Specify the xsi:noNamespaceSchemaLocation attribute value """ xml_txt = self.to_string( ndm_obj, no_namespace_schema_location=no_namespace_schema_location, schema_location=schema_location, ) Path(xml_write_file_path).write_text(xml_txt)
@staticmethod def _identify_data_type(xml_source: str) -> tuple[_NdmDataType, str]: """ Identify the NDM XML data type from an XML string. The function parses the XML using lxml.etree with recover=True and ns_clean=True to tolerate and clean malformed input. Parameters ---------- xml_source : str NDM Data as XML string Returns ------- data_type : _NdmDataType The identified data type (as returned by the _NdmDataType lookup helpers). fixed_source : str The XML source string, possibly cleaned up by lxml during parsing. Behavior / Notes ---------------- - If the root element is "ndm" the function treats the document as a Combined NDM file: it skips child elements named "comment" and "message_id" and uses the first other child to determine the internal data type via _NdmDataType.find_element(child.tag, version), then maps that to a combined NDM type via _NdmDataType.find_combi_version(...). - Otherwise the root element name and its "version" attribute are used to lookup the data type via _NdmDataType.find_element(root.tag, version). """ parser = etree.XMLParser(recover=True, ns_clean=True) # parse the XML string to get the root element root = etree.fromstring(xml_source.encode("utf-8"), parser=parser) # this can feed the fixed XML back to the parser, if needed. Some files have # unescaped characters that cause parsing issues, but can be fixed by # lxml's recover mode. fixed_xml = etree.tostring(root, pretty_print=True, encoding="unicode") if root.tag == "ndm": # if the root tag is "ndm", this is a Combined Instantiation file, # and we need to look at the children to identify the data type # find the first child element that is not "comment" or "message_id" for child in root: if child.tag != "comment" and child.tag != "message_id": # find data type of the child element ndm_id = child.tag version = child.attrib.get("version") internal_data_type = _NdmDataType.find_ndm_type_by_id( ndm_id, version ) # return the first combined NDM version that supports this data type return ( _NdmDataType.find_combi_version(internal_data_type), fixed_xml, ) # Reached here without a valid file inside the NDM Combi raise ValueError("No child found in the Combined Instantiation NDM Data.") else: # This is a usual single element file, the root tag corresponds # to the data type # find data type ndm_id = root.tag version = root.attrib.get("version") data_type = _NdmDataType.find_ndm_type_by_id(ndm_id, version) return data_type, fixed_xml @staticmethod def _init_parser(config: ParserConfig): """ Inits the internal parser. """ return XmlParser(config=config) @staticmethod def _init_serializer( schema_location: str | None = None, no_namespace_schema_location: str | None = None, ): """ Inits the internal serializer. Parameters ---------- schema_location: str | None Specify the xsi:schemaLocation attribute value no_namespace_schema_location: str | None Specify the xsi:noNamespaceSchemaLocation attribute value """ config = SerializerConfig( indent=" ", schema_location=schema_location, no_namespace_schema_location=no_namespace_schema_location, ) return XmlSerializer(config=config)
def _strip_multi_ndm(ndm): """ Identifies whether the Combined Instantiation NDM actually contains a single element (OMM, APM etc.) with a single member and, if so, returns this element. Otherwise returns this Combined Instantiation NDM. Parameters ---------- ndm NDM data object Returns ------- ndm_elem : NDM element Identified and stripped NDM element or the original Combi-NDM """ # Find the elements that have non-zero members (omit the "comment" # and "message_id" tags) non_zero_elem_list = _get_non_zero_elem_names(ndm) if len(non_zero_elem_list) == 1: # single element available, check number of members ndm_elem = vars(ndm)[non_zero_elem_list[0]] if len(ndm_elem) == 1: # single element available, return it return ndm_elem[0] # multiple elements available, return them return ndm else: # multiple elements available, return them return ndm # def _is_multi_ndm(ndm) -> bool: # """ # Identifies whether the Combined Instantiation NDM actually contains # a single element (OMM, APM etc.) with a single member. # Parameters # ---------- # ndm # NDM data object # Returns # ------- # bool # True if this `ndm` is a Combi-NDM, False otherwise. # """ # # Find the elements that have non-zero members (omit the "comment" # # and "message_id" tags) # non_zero_elem_list = _get_non_zero_elem_names(ndm) # if len(non_zero_elem_list) == 1: # # single element available, check number of members # ndm_elem = vars(ndm)[non_zero_elem_list[0]] # if len(ndm_elem) == 1: # # single element available, return it # return False # # multiple elements available, return them # return True # else: # # multiple elements available # return True def _get_non_zero_elem_names(ndm): """Return names of ndm attributes that are non-empty, excluding meta fields.""" return [ name for name, val in vars(ndm).items() if name not in ("comment", "message_id") and len(val) > 0 ]