diff --git a/space_packet_parser/xtce/containers.py b/space_packet_parser/xtce/containers.py index e12d1e8..c29a001 100644 --- a/space_packet_parser/xtce/containers.py +++ b/space_packet_parser/xtce/containers.py @@ -1,5 +1,6 @@ """Module with XTCE models related to SequenceContainers""" +import warnings from dataclasses import dataclass, field from typing import Any, Union @@ -12,6 +13,171 @@ from space_packet_parser.xtce import comparisons, parameter_types, parameters +@dataclass +class ParameterRefEntry(common.Parseable, common.XmlObject): + """ + + Represents a reference to a parameter in an EntryList, with optional conditional inclusion + and repeat logic. + + Parameters + ---------- + parameter_ref : str + Name reference to the Parameter + include_condition : Optional[comparisons.MatchCriteria] + Condition for inclusion. If None, parameter is always included. + repeat_entry : Optional[Any] + Repeat information. Not currently supported - will raise NotImplementedError during parse. + """ + + parameter_ref: str + include_condition: Optional[comparisons.MatchCriteria] = None + repeat_entry: Optional[Any] = None + + def parse(self, packet: spp.SpacePacket, parameter_lookup: dict[str, parameters.Parameter]) -> None: + """Parse the parameter reference entry, handling conditional inclusion. + + Parameters + ---------- + packet : spp.SpacePacket + The packet being parsed + parameter_lookup : dict[str, parameters.Parameter] + Dictionary to look up parameter objects by name + + Raises + ------ + NotImplementedError + If RepeatEntry is specified + KeyError + If the referenced parameter is not found in parameter_lookup + """ + # Check if repeat_entry is specified + if self.repeat_entry is not None: + raise NotImplementedError("RepeatEntry is not currently supported in parsing") + + # Evaluate include condition if it exists + if self.include_condition is not None: + if not self.include_condition.evaluate(packet): + # Condition is False, skip this parameter + return + + # Parse the parameter + try: + parameter = parameter_lookup[self.parameter_ref] + except KeyError as err: + raise KeyError( + f"Parameter '{self.parameter_ref}' referenced in ParameterRefEntry not found in parameter lookup. " + f"Available parameters: {list(parameter_lookup.keys())}" + ) from err + parameter.parse(packet) + + @classmethod + def from_xml( + cls, + element: ElementTree.Element, + *, + tree: Optional[ElementTree.ElementTree] = None, + parameter_lookup: Optional[dict[str, parameters.Parameter]] = None, + parameter_type_lookup: Optional[dict[str, parameter_types.ParameterType]] = None, + container_lookup: Optional[dict[str, Any]] = None, + ) -> "ParameterRefEntry": + """Create a ParameterRefEntry from an XML element. + + Parameters + ---------- + element : ElementTree.Element + The ParameterRefEntry XML element + tree : Optional[ElementTree.ElementTree] + Full XTCE tree + parameter_lookup : Optional[dict[str, parameters.Parameter]] + Ignored + parameter_type_lookup : Optional[dict[str, parameter_types.ParameterType]] + Ignored + container_lookup : Optional[dict[str, Any]] + Ignored + + Returns + ------- + : ParameterRefEntry + """ + parameter_ref = element.attrib["parameterRef"] + + # Parse optional IncludeCondition + include_condition = None + if (include_cond_elem := element.find("IncludeCondition")) is not None: + # IncludeCondition contains a single MatchCriteria element (Comparison, ComparisonList, or BooleanExpression) + if (comparison_list_elem := include_cond_elem.find("ComparisonList")) is not None: + # ComparisonList contains multiple Comparison elements with AND semantics per XTCE spec + # Note: The existing restriction_criteria uses the same pattern (iterfind("*")) + # to iterate over Comparison elements in a ComparisonList + comparisons_list = [ + comparisons.Comparison.from_xml(comp) for comp in comparison_list_elem.iterfind("*") + ] + # LIMITATION: MatchCriteria interface expects a single object, but ComparisonList + # requires AND logic across multiple comparisons. For now, we only support single + # comparisons in ComparisonList. Full AND logic would require a new MatchCriteria + # subclass or modifying include_condition to accept a list of MatchCriteria. + # This follows the same limitation pattern used in restriction_criteria where + # multiple MatchCriteria are stored in a list on the SequenceContainer itself. + if len(comparisons_list) == 1: + include_condition = comparisons_list[0] + else: + # For multiple comparisons, warn and use only the first one + warnings.warn( + f"ComparisonList with {len(comparisons_list)} comparisons in IncludeCondition " + f"for parameter '{parameter_ref}'. Only the first comparison will be evaluated. " + f"Full AND logic for ComparisonList in IncludeCondition requires architectural " + f"changes to support multiple MatchCriteria conditions.", + UserWarning, + ) + include_condition = comparisons_list[0] if comparisons_list else None + elif (comparison_elem := include_cond_elem.find("Comparison")) is not None: + include_condition = comparisons.Comparison.from_xml(comparison_elem) + elif (bool_expr_elem := include_cond_elem.find("BooleanExpression")) is not None: + include_condition = comparisons.BooleanExpression.from_xml(bool_expr_elem) + + # Parse optional RepeatEntry + repeat_entry = None + if element.find("RepeatEntry") is not None: + # We'll store a placeholder to indicate it was present + repeat_entry = True # Will cause NotImplementedError during parse + + return cls( + parameter_ref=parameter_ref, + include_condition=include_condition, + repeat_entry=repeat_entry, + ) + + def to_xml(self, *, elmaker: ElementMaker) -> ElementTree.Element: + """Create a ParameterRefEntry XML element. + + Parameters + ---------- + elmaker : ElementMaker + Element factory with predefined namespace + + Returns + ------- + : ElementTree.Element + """ + entry = elmaker.ParameterRefEntry(parameterRef=self.parameter_ref) + + if self.include_condition is not None: + include_cond = elmaker.IncludeCondition(self.include_condition.to_xml(elmaker=elmaker)) + entry.append(include_cond) + + if self.repeat_entry is not None: + # Placeholder for RepeatEntry serialization + # Since we don't fully parse it, we can't fully serialize it either + warnings.warn( + "RepeatEntry serialization is not fully implemented. " + "The RepeatEntry element will not be included in the XML output.", + UserWarning, + ) + + return entry + + @dataclass class SequenceContainer(common.Parseable, common.XmlObject): """ @@ -56,7 +222,11 @@ def parse(self, packet: spp.SpacePacket) -> None: This could be recursive if the entry list contains SequenceContainers. """ for entry in self.entry_list: - entry.parse(packet=packet) + if isinstance(entry, ParameterRefEntry): + # ParameterRefEntry needs parameter_lookup to resolve references + entry.parse(packet=packet, parameter_lookup=self._parameter_lookup) + else: + entry.parse(packet=packet) @classmethod def from_xml( @@ -111,8 +281,18 @@ def from_xml( for entry in element.find("EntryList").iterfind("*"): entry_tag_name = ElementTree.QName(entry).localname if entry_tag_name == "ParameterRefEntry": - parameter_name = entry.attrib["parameterRef"] - entry_list.append(parameter_lookup[parameter_name]) # KeyError if parameter is not in the lookup + # Check if this ParameterRefEntry has IncludeCondition or RepeatEntry child elements + has_include_condition = entry.find("IncludeCondition") is not None + has_repeat_entry = entry.find("RepeatEntry") is not None + + if has_include_condition or has_repeat_entry: + # Create a ParameterRefEntry object to handle conditional/repeated parsing + param_ref_entry = ParameterRefEntry.from_xml(entry, tree=tree) + entry_list.append(param_ref_entry) + else: + # No special handling needed, use the parameter directly for backward compatibility + parameter_name = entry.attrib["parameterRef"] + entry_list.append(parameter_lookup[parameter_name]) # KeyError if parameter is not in the lookup elif entry_tag_name == "ContainerRefEntry": # This container may not have been parsed yet. We need to parse it now so we might as well @@ -129,6 +309,14 @@ def from_xml( ) container_lookup[nested_container.name] = nested_container entry_list.append(nested_container) + else: + warnings.warn( + f"Unrecognized entry type '{entry_tag_name}' in EntryList for container " + f"'{element.attrib['name']}'. Supported types: ParameterRefEntry, ContainerRefEntry. " + f"Skipping this entry.", + category=UserWarning, + ) + continue short_description = element.attrib.get("shortDescription", None) @@ -137,7 +325,7 @@ def from_xml( else: long_description = None - return cls( + container = cls( name=element.attrib["name"], entry_list=entry_list, base_container_name=base_container_name, @@ -146,6 +334,9 @@ def from_xml( short_description=short_description, long_description=long_description, ) + # Store parameter lookup for use during parsing + container._parameter_lookup = parameter_lookup + return container def to_xml(self, *, elmaker: ElementMaker) -> ElementTree.Element: """Create a SequenceContainer XML element @@ -188,7 +379,9 @@ def to_xml(self, *, elmaker: ElementMaker) -> ElementTree.Element: entry_list = em.EntryList() for entry in self.entry_list: - if isinstance(entry, parameters.Parameter): + if isinstance(entry, ParameterRefEntry): + entry_element = entry.to_xml(elmaker=elmaker) + elif isinstance(entry, parameters.Parameter): entry_element = em.ParameterRefEntry(parameterRef=entry.name) elif isinstance(entry, SequenceContainer): entry_element = em.ContainerRefEntry(containerRef=entry.name) diff --git a/tests/integration/test_include_condition_integration.py b/tests/integration/test_include_condition_integration.py new file mode 100644 index 0000000..5ae5c21 --- /dev/null +++ b/tests/integration/test_include_condition_integration.py @@ -0,0 +1,150 @@ +"""Integration tests for IncludeCondition support using full XTCE documents""" + +import struct +from pathlib import Path + +import pytest + +from space_packet_parser import SpacePacket +from space_packet_parser.xtce import definitions + + +@pytest.fixture +def xtce_definition(): + """Load the XTCE definition for IncludeCondition testing""" + test_data_dir = Path(__file__).parent.parent / "test_data" + xtce_file = test_data_dir / "test_include_condition.xml" + return definitions.XtcePacketDefinition.from_xtce(xtce_file) + + +def test_include_condition_both_true(xtce_definition): + """Test parsing when both conditional parameters should be included (Mode=1, Flag=1)""" + # Create packet data: Mode=1, Flag=1, OptionalData1=111, OptionalData2=222, MandatoryData=333 + packet_data = struct.pack(">BBHHI", 1, 1, 111, 222, 333) + packet = SpacePacket(binary_data=packet_data) + + # Parse the packet + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + + # Verify all parameters were parsed + assert packet["Mode"] == 1 + assert packet["Flag"] == 1 + assert packet["OptionalData1"] == 111 + assert packet["OptionalData2"] == 222 + assert packet["MandatoryData"] == 333 + + +def test_include_condition_first_true_second_false(xtce_definition): + """Test parsing when only first conditional parameter should be included (Mode=1, Flag=0)""" + # Create packet data: Mode=1, Flag=0, OptionalData1=444, MandatoryData=555 + # OptionalData2 should be skipped + packet_data = struct.pack(">BBHI", 1, 0, 444, 555) + packet = SpacePacket(binary_data=packet_data) + + # Parse the packet + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + + # Verify correct parameters were parsed + assert packet["Mode"] == 1 + assert packet["Flag"] == 0 + assert packet["OptionalData1"] == 444 + assert "OptionalData2" not in packet # Should be skipped + assert packet["MandatoryData"] == 555 + + +def test_include_condition_first_false_second_true(xtce_definition): + """Test parsing when only second conditional parameter should be included (Mode=0, Flag=1)""" + # Create packet data: Mode=0, Flag=1, OptionalData2=666, MandatoryData=777 + # OptionalData1 should be skipped + packet_data = struct.pack(">BBHI", 0, 1, 666, 777) + packet = SpacePacket(binary_data=packet_data) + + # Parse the packet + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + + # Verify correct parameters were parsed + assert packet["Mode"] == 0 + assert packet["Flag"] == 1 + assert "OptionalData1" not in packet # Should be skipped + assert packet["OptionalData2"] == 666 + assert packet["MandatoryData"] == 777 + + +def test_include_condition_both_false(xtce_definition): + """Test parsing when both conditional parameters should be skipped (Mode=0, Flag=0)""" + # Create packet data: Mode=0, Flag=0, MandatoryData=888 + # Both OptionalData1 and OptionalData2 should be skipped + packet_data = struct.pack(">BBI", 0, 0, 888) + packet = SpacePacket(binary_data=packet_data) + + # Parse the packet + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + + # Verify correct parameters were parsed + assert packet["Mode"] == 0 + assert packet["Flag"] == 0 + assert "OptionalData1" not in packet # Should be skipped + assert "OptionalData2" not in packet # Should be skipped + assert packet["MandatoryData"] == 888 + + +def test_include_condition_with_generator(xtce_definition): + """Test parsing multiple packets using a generator""" + # Create multiple test packets + packets_data = [ + struct.pack(">BBHHI", 1, 1, 100, 200, 300), # Both included + struct.pack(">BBHI", 1, 0, 400, 500), # Only OptionalData1 + struct.pack(">BBHI", 0, 1, 600, 700), # Only OptionalData2 + struct.pack(">BBI", 0, 0, 800), # Neither included + ] + + # Parse all packets + parsed_packets = [] + for packet_data in packets_data: + packet = SpacePacket(binary_data=packet_data) + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + parsed_packets.append(packet) + + # Verify first packet (both included) + assert parsed_packets[0]["Mode"] == 1 + assert parsed_packets[0]["Flag"] == 1 + assert parsed_packets[0]["OptionalData1"] == 100 + assert parsed_packets[0]["OptionalData2"] == 200 + assert parsed_packets[0]["MandatoryData"] == 300 + + # Verify second packet (only OptionalData1) + assert parsed_packets[1]["Mode"] == 1 + assert parsed_packets[1]["Flag"] == 0 + assert parsed_packets[1]["OptionalData1"] == 400 + assert "OptionalData2" not in parsed_packets[1] + assert parsed_packets[1]["MandatoryData"] == 500 + + # Verify third packet (only OptionalData2) + assert parsed_packets[2]["Mode"] == 0 + assert parsed_packets[2]["Flag"] == 1 + assert "OptionalData1" not in parsed_packets[2] + assert parsed_packets[2]["OptionalData2"] == 600 + assert parsed_packets[2]["MandatoryData"] == 700 + + # Verify fourth packet (neither included) + assert parsed_packets[3]["Mode"] == 0 + assert parsed_packets[3]["Flag"] == 0 + assert "OptionalData1" not in parsed_packets[3] + assert "OptionalData2" not in parsed_packets[3] + assert parsed_packets[3]["MandatoryData"] == 800 + + +def test_include_condition_bit_position_tracking(xtce_definition): + """Test that bit position tracking remains accurate when parameters are skipped""" + # Create packet with Mode=0, Flag=0 (both optional params skipped) + packet_data = struct.pack(">BBI", 0, 0, 12345678) + packet = SpacePacket(binary_data=packet_data) + + # Parse the packet + xtce_definition.containers["ConditionalTestPacket"].parse(packet) + + # Verify that MandatoryData was parsed correctly despite skipped parameters + assert packet["MandatoryData"] == 12345678 + + # Verify bit position tracking is correct (2 bytes for Mode+Flag, 4 bytes for MandatoryData = 48 bits) + assert packet._parsing_pos == 48 diff --git a/tests/test_data/test_include_condition.xml b/tests/test_data/test_include_condition.xml new file mode 100644 index 0000000..568d400 --- /dev/null +++ b/tests/test_data/test_include_condition.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/unit/test_xtce/test_include_condition.py b/tests/unit/test_xtce/test_include_condition.py new file mode 100644 index 0000000..d120c8b --- /dev/null +++ b/tests/unit/test_xtce/test_include_condition.py @@ -0,0 +1,414 @@ +"""Tests for IncludeCondition support in ParameterRefEntry""" + +import struct + +import pytest + +from space_packet_parser import SpacePacket +from space_packet_parser.xtce import comparisons, containers, encodings, parameter_types, parameters + + +@pytest.fixture +def parameter_types_fixture(): + """Create common parameter types for testing""" + return { + "uint8": parameter_types.IntegerParameterType( + name="UINT8_Type", encoding=encodings.IntegerDataEncoding(size_in_bits=8, encoding="unsigned") + ), + "uint16": parameter_types.IntegerParameterType( + name="UINT16_Type", encoding=encodings.IntegerDataEncoding(size_in_bits=16, encoding="unsigned") + ), + "uint32": parameter_types.IntegerParameterType( + name="UINT32_Type", encoding=encodings.IntegerDataEncoding(size_in_bits=32, encoding="unsigned") + ), + } + + +def test_parameter_ref_entry_no_condition(): + """Test ParameterRefEntry without any condition - should always parse""" + # Create parameter type and parameter + uint8_type = parameter_types.IntegerParameterType( + name="UINT8_Type", encoding=encodings.IntegerDataEncoding(size_in_bits=8, encoding="unsigned") + ) + param = parameters.Parameter(name="TestParam", parameter_type=uint8_type) + + # Create ParameterRefEntry without condition + param_ref_entry = containers.ParameterRefEntry(parameter_ref="TestParam", include_condition=None) + + # Create packet with test data + test_data = struct.pack(">B", 42) + packet = SpacePacket(binary_data=test_data) + + # Parse the entry + param_ref_entry.parse(packet, parameter_lookup={"TestParam": param}) + + # Verify parameter was parsed + assert "TestParam" in packet + assert packet["TestParam"] == 42 + + +def test_parameter_ref_entry_with_simple_condition_true(parameter_types_fixture): + """Test ParameterRefEntry with simple condition that evaluates to True""" + # Create parameters + flag_param = parameters.Parameter(name="CSFlag", parameter_type=parameter_types_fixture["uint8"]) + checksum_param = parameters.Parameter(name="CheckSum", parameter_type=parameter_types_fixture["uint16"]) + + # Create condition: CheckSum is included if CSFlag == 1 + include_condition = comparisons.Comparison(required_value="1", referenced_parameter="CSFlag", operator="==") + + # Create ParameterRefEntry with condition + checksum_entry = containers.ParameterRefEntry( + parameter_ref="CheckSum", include_condition=include_condition, repeat_entry=None + ) + + # Create packet with CSFlag=1, CheckSum=1234 + test_data = struct.pack(">BH", 1, 1234) + packet = SpacePacket(binary_data=test_data) + + # Parse the flag first + flag_param.parse(packet) + assert packet["CSFlag"] == 1 + + # Now parse the conditional checksum entry + checksum_entry.parse(packet, parameter_lookup={"CheckSum": checksum_param}) + + # Verify checksum was parsed + assert "CheckSum" in packet + assert packet["CheckSum"] == 1234 + + +def test_parameter_ref_entry_with_simple_condition_false(parameter_types_fixture): + """Test ParameterRefEntry with simple condition that evaluates to False""" + # Create parameters + flag_param = parameters.Parameter(name="CSFlag", parameter_type=parameter_types_fixture["uint8"]) + checksum_param = parameters.Parameter(name="CheckSum", parameter_type=parameter_types_fixture["uint16"]) + + # Create condition: CheckSum is included if CSFlag == 1 + include_condition = comparisons.Comparison(required_value="1", referenced_parameter="CSFlag", operator="==") + + # Create ParameterRefEntry with condition + checksum_entry = containers.ParameterRefEntry( + parameter_ref="CheckSum", include_condition=include_condition, repeat_entry=None + ) + + # Create packet with CSFlag=0, CheckSum=1234 (but checksum should be skipped) + test_data = struct.pack(">BH", 0, 1234) + packet = SpacePacket(binary_data=test_data) + + # Parse the flag first + flag_param.parse(packet) + assert packet["CSFlag"] == 0 + + # Now parse the conditional checksum entry + checksum_entry.parse(packet, parameter_lookup={"CheckSum": checksum_param}) + + # Verify checksum was NOT parsed + assert "CheckSum" not in packet + # Verify parsing position didn't advance (still at bit 8) + assert packet._parsing_pos == 8 + + +def test_sequence_container_with_include_condition_true(parameter_types_fixture): + """Test SequenceContainer with ParameterRefEntry that has IncludeCondition evaluating to True""" + # Create parameters + flag_param = parameters.Parameter(name="CSFlag", parameter_type=parameter_types_fixture["uint8"]) + checksum_param = parameters.Parameter(name="CheckSum", parameter_type=parameter_types_fixture["uint16"]) + data_param = parameters.Parameter(name="Data", parameter_type=parameter_types_fixture["uint32"]) + + # Create condition + include_condition = comparisons.Comparison(required_value="1", referenced_parameter="CSFlag", operator="==") + + # Create ParameterRefEntry with condition + checksum_entry = containers.ParameterRefEntry(parameter_ref="CheckSum", include_condition=include_condition) + + # Create container + param_lookup = {"CSFlag": flag_param, "CheckSum": checksum_param, "Data": data_param} + test_container = containers.SequenceContainer( + name="TestContainer", + entry_list=[flag_param, checksum_entry, data_param], + _parameter_lookup=param_lookup, + ) + + # Create packet: CSFlag=1, CheckSum=5678, Data=123456 + test_data = struct.pack(">BHI", 1, 5678, 123456) + packet = SpacePacket(binary_data=test_data) + + # Parse the container + test_container.parse(packet) + + # Verify all parameters were parsed + assert packet["CSFlag"] == 1 + assert packet["CheckSum"] == 5678 + assert packet["Data"] == 123456 + + +def test_sequence_container_with_include_condition_false(parameter_types_fixture): + """Test SequenceContainer with ParameterRefEntry that has IncludeCondition evaluating to False""" + # Create parameters + flag_param = parameters.Parameter(name="CSFlag", parameter_type=parameter_types_fixture["uint8"]) + checksum_param = parameters.Parameter(name="CheckSum", parameter_type=parameter_types_fixture["uint16"]) + data_param = parameters.Parameter(name="Data", parameter_type=parameter_types_fixture["uint32"]) + + # Create condition + include_condition = comparisons.Comparison(required_value="1", referenced_parameter="CSFlag", operator="==") + + # Create ParameterRefEntry with condition + checksum_entry = containers.ParameterRefEntry(parameter_ref="CheckSum", include_condition=include_condition) + + # Create container + param_lookup = {"CSFlag": flag_param, "CheckSum": checksum_param, "Data": data_param} + test_container = containers.SequenceContainer( + name="TestContainer", + entry_list=[flag_param, checksum_entry, data_param], + _parameter_lookup=param_lookup, + ) + + # Create packet: CSFlag=0, CheckSum should be skipped, Data=123456 + # Since checksum is 2 bytes but skipped, we only need CSFlag + Data + test_data = struct.pack(">BI", 0, 123456) + packet = SpacePacket(binary_data=test_data) + + # Parse the container + test_container.parse(packet) + + # Verify flag and data were parsed, but checksum was skipped + assert packet["CSFlag"] == 0 + assert "CheckSum" not in packet + assert packet["Data"] == 123456 + + +def test_parameter_ref_entry_with_repeat_entry_raises(): + """Test that ParameterRefEntry with RepeatEntry raises NotImplementedError""" + # Create parameter + uint8_type = parameter_types.IntegerParameterType( + name="UINT8_Type", encoding=encodings.IntegerDataEncoding(size_in_bits=8, encoding="unsigned") + ) + param = parameters.Parameter(name="TestParam", parameter_type=uint8_type) + + # Create ParameterRefEntry with repeat_entry + param_ref_entry = containers.ParameterRefEntry(parameter_ref="TestParam", include_condition=None, repeat_entry=True) + + # Create packet + test_data = struct.pack(">B", 42) + packet = SpacePacket(binary_data=test_data) + + # Parse should raise NotImplementedError + with pytest.raises(NotImplementedError, match="RepeatEntry is not currently supported"): + param_ref_entry.parse(packet, parameter_lookup={"TestParam": param}) + + +def test_parameter_ref_entry_multiple_conditions(parameter_types_fixture): + """Test ParameterRefEntry with different comparison operators""" + # Create parameters + value_param = parameters.Parameter(name="Value", parameter_type=parameter_types_fixture["uint8"]) + result_param = parameters.Parameter(name="Result", parameter_type=parameter_types_fixture["uint16"]) + + # Test greater than condition + include_condition = comparisons.Comparison(required_value="10", referenced_parameter="Value", operator=">") + result_entry = containers.ParameterRefEntry(parameter_ref="Result", include_condition=include_condition) + + # Test with value > 10 (should include) + test_data = struct.pack(">BH", 15, 999) + packet = SpacePacket(binary_data=test_data) + value_param.parse(packet) + result_entry.parse(packet, parameter_lookup={"Result": result_param}) + assert packet["Result"] == 999 + + # Test with value <= 10 (should skip) + test_data = struct.pack(">BH", 5, 888) + packet = SpacePacket(binary_data=test_data) + value_param.parse(packet) + result_entry.parse(packet, parameter_lookup={"Result": result_param}) + assert "Result" not in packet + + +def test_parameter_ref_entry_to_xml_without_condition(): + """Test ParameterRefEntry.to_xml() without condition""" + from lxml.builder import ElementMaker + + em = ElementMaker(namespace="http://www.omg.org/space/xtce", nsmap={"xtce": "http://www.omg.org/space/xtce"}) + + param_ref_entry = containers.ParameterRefEntry(parameter_ref="TestParam") + xml_element = param_ref_entry.to_xml(elmaker=em) + + # Verify XML structure + assert xml_element.tag == "{http://www.omg.org/space/xtce}ParameterRefEntry" + assert xml_element.attrib["parameterRef"] == "TestParam" + assert len(xml_element) == 0 # No child elements + + +def test_parameter_ref_entry_to_xml_with_condition(): + """Test ParameterRefEntry.to_xml() with IncludeCondition""" + from lxml.builder import ElementMaker + + em = ElementMaker(namespace="http://www.omg.org/space/xtce", nsmap={"xtce": "http://www.omg.org/space/xtce"}) + + include_condition = comparisons.Comparison(required_value="1", referenced_parameter="CSFlag", operator="==") + param_ref_entry = containers.ParameterRefEntry(parameter_ref="TestParam", include_condition=include_condition) + xml_element = param_ref_entry.to_xml(elmaker=em) + + # Verify XML structure + assert xml_element.tag == "{http://www.omg.org/space/xtce}ParameterRefEntry" + assert xml_element.attrib["parameterRef"] == "TestParam" + assert len(xml_element) == 1 # Has IncludeCondition child + + # Verify IncludeCondition + include_cond_elem = xml_element[0] + assert include_cond_elem.tag == "{http://www.omg.org/space/xtce}IncludeCondition" + assert len(include_cond_elem) == 1 # Has Comparison child + + # Verify Comparison + comparison_elem = include_cond_elem[0] + assert comparison_elem.tag == "{http://www.omg.org/space/xtce}Comparison" + assert comparison_elem.attrib["parameterRef"] == "CSFlag" + assert comparison_elem.attrib["value"] == "1" + + +def test_sequence_container_with_multiple_conditional_parameters(parameter_types_fixture): + """Test SequenceContainer with multiple conditional parameters""" + # Create parameters + mode_param = parameters.Parameter(name="Mode", parameter_type=parameter_types_fixture["uint8"]) + flag_param = parameters.Parameter(name="Flag", parameter_type=parameter_types_fixture["uint8"]) + data1_param = parameters.Parameter(name="Data1", parameter_type=parameter_types_fixture["uint16"]) + data2_param = parameters.Parameter(name="Data2", parameter_type=parameter_types_fixture["uint16"]) + data3_param = parameters.Parameter(name="Data3", parameter_type=parameter_types_fixture["uint32"]) + + # Create conditions + condition1 = comparisons.Comparison(required_value="1", referenced_parameter="Mode", operator="==") + condition2 = comparisons.Comparison(required_value="1", referenced_parameter="Flag", operator="==") + + # Create ParameterRefEntries with conditions + data1_entry = containers.ParameterRefEntry(parameter_ref="Data1", include_condition=condition1) + data2_entry = containers.ParameterRefEntry(parameter_ref="Data2", include_condition=condition2) + + # Create container + param_lookup = { + "Mode": mode_param, + "Flag": flag_param, + "Data1": data1_param, + "Data2": data2_param, + "Data3": data3_param, + } + test_container = containers.SequenceContainer( + name="TestContainer", + entry_list=[mode_param, flag_param, data1_entry, data2_entry, data3_param], + _parameter_lookup=param_lookup, + ) + + # Test case 1: Mode=1, Flag=1 - both Data1 and Data2 should be parsed + test_data = struct.pack(">BBHHI", 1, 1, 111, 222, 333) + packet = SpacePacket(binary_data=test_data) + test_container.parse(packet) + assert packet["Mode"] == 1 + assert packet["Flag"] == 1 + assert packet["Data1"] == 111 + assert packet["Data2"] == 222 + assert packet["Data3"] == 333 + + # Test case 2: Mode=1, Flag=0 - only Data1 should be parsed + test_data = struct.pack(">BBHI", 1, 0, 444, 555) + packet = SpacePacket(binary_data=test_data) + test_container.parse(packet) + assert packet["Mode"] == 1 + assert packet["Flag"] == 0 + assert packet["Data1"] == 444 + assert "Data2" not in packet + assert packet["Data3"] == 555 + + # Test case 3: Mode=0, Flag=1 - only Data2 should be parsed + test_data = struct.pack(">BBHI", 0, 1, 666, 777) + packet = SpacePacket(binary_data=test_data) + test_container.parse(packet) + assert packet["Mode"] == 0 + assert packet["Flag"] == 1 + assert "Data1" not in packet + assert packet["Data2"] == 666 + assert packet["Data3"] == 777 + + # Test case 4: Mode=0, Flag=0 - neither Data1 nor Data2 should be parsed + test_data = struct.pack(">BBI", 0, 0, 888) + packet = SpacePacket(binary_data=test_data) + test_container.parse(packet) + assert packet["Mode"] == 0 + assert packet["Flag"] == 0 + assert "Data1" not in packet + assert "Data2" not in packet + assert packet["Data3"] == 888 + + +def test_parameter_ref_entry_from_xml_with_condition(): + """Test parsing ParameterRefEntry from XML with IncludeCondition""" + from lxml import etree + + xml_str = """ + + + + + + """ + element = etree.fromstring(xml_str) + + # Parse the ParameterRefEntry + param_ref_entry = containers.ParameterRefEntry.from_xml(element) + + # Verify structure + assert param_ref_entry.parameter_ref == "CheckSum" + assert param_ref_entry.include_condition is not None + assert isinstance(param_ref_entry.include_condition, comparisons.Comparison) + assert param_ref_entry.include_condition.referenced_parameter == "CSFlag" + assert param_ref_entry.include_condition.required_value == "1" + assert param_ref_entry.include_condition.operator == "==" + assert param_ref_entry.repeat_entry is None + + +def test_parameter_ref_entry_from_xml_with_repeat_entry(): + """Test parsing ParameterRefEntry from XML with RepeatEntry""" + from lxml import etree + + xml_str = """ + + + 5 + + + """ + element = etree.fromstring(xml_str) + + # Parse the ParameterRefEntry + param_ref_entry = containers.ParameterRefEntry.from_xml(element) + + # Verify structure + assert param_ref_entry.parameter_ref == "TestParam" + assert param_ref_entry.include_condition is None + assert param_ref_entry.repeat_entry is True # Placeholder value + + +def test_parameter_ref_entry_from_xml_without_condition(): + """Test parsing ParameterRefEntry from XML without IncludeCondition""" + from lxml import etree + + xml_str = '' + element = etree.fromstring(xml_str) + + # Parse the ParameterRefEntry + param_ref_entry = containers.ParameterRefEntry.from_xml(element) + + # Verify structure + assert param_ref_entry.parameter_ref == "TestParam" + assert param_ref_entry.include_condition is None + assert param_ref_entry.repeat_entry is None + + +def test_parameter_ref_entry_undefined_parameter_error(): + """Test that ParameterRefEntry raises informative KeyError for undefined parameter""" + # Create ParameterRefEntry referencing a non-existent parameter + param_ref_entry = containers.ParameterRefEntry(parameter_ref="NonExistentParam") + + # Create packet + test_data = struct.pack(">B", 42) + packet = SpacePacket(binary_data=test_data) + + # Parse should raise KeyError with helpful message + with pytest.raises(KeyError, match="NonExistentParam.*not found in parameter lookup"): + param_ref_entry.parse(packet, parameter_lookup={})