diff --git a/hl7/containers.py b/hl7/containers.py index 1620b38..8e96970 100644 --- a/hl7/containers.py +++ b/hl7/containers.py @@ -10,7 +10,7 @@ ) from .util import escape, generate_message_control_id, unescape -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) _SENTINEL = object() @@ -470,6 +470,27 @@ def extract_field( segment_num, field_num, repeat_num, component_num, subcomponent_num ) + def write_field( + self, + value, + segment, + segment_num=1, + field_num=None, + repeat_num=None, + component_num=None, + subcomponent_num=None, + ): + """ + Write a value, with escaping, into a message using the tree based assignment notation. + The segment must exist. + + Extract a field using a future proofed approach, based on rules in: + http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing + """ + self.segments(segment)(segment_num).write_field( + value, field_num, repeat_num, component_num, subcomponent_num + ) + def assign_field( self, value, @@ -497,7 +518,7 @@ def escape(self, field, app_map=None): To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known. - Pass through the message. Replace recognised characters with their escaped + Pass through the message. Replace recognized characters with their escaped version. Return an ascii encoded string. Functionality: @@ -563,11 +584,11 @@ def create_ack( msh.assign_field(str(source_msh(1)), 1) msh.assign_field(str(source_msh(2)), 2) - # Sending application is source receving application + # Sending application is source receiving application msh.assign_field( str(application) if application is not None else str(source_msh(5)), 3 ) - # Sending facility is source receving facility + # Sending facility is source receiving facility msh.assign_field( str(facility) if facility is not None else str(source_msh(6)), 4 ) @@ -723,6 +744,17 @@ def extract_field( else: return "" # Assume non-present optional value + def write_field( + self, + value, + field_num=None, + repeat_num=None, + component_num=None, + subcomponent_num=None, + ): + """Write a field with escaping.""" + self.assign_field(escape(self, value), field_num, repeat_num, component_num, subcomponent_num) + def assign_field( self, value, @@ -739,28 +771,53 @@ def assign_field( http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing """ + # Extend the segment as needed. while len(self) <= field_num: self.append(self.create_field([])) field = self(field_num) + + # Assign at the field level. if repeat_num is None: field[:] = [value] return + + # Field is never a string so we don't need to test that. + + # Extend the field repeat as needed. while len(field) < repeat_num: field.append(self.create_repetition([])) repetition = field(repeat_num) + + if isinstance(repetition, str): + # If the Field was a leaf (string) convert it to a repetition + repetition = self.create_repetition([]) + field(repeat_num, value=repetition) + + # Assign at the repetition level. if component_num is None: repetition[:] = [value] return + while len(repetition) < component_num: repetition.append(self.create_component([])) component = repetition(component_num) + + if isinstance(component, str): + # if the repetition was a leaf (string), convert it to a component. + component = self.create_component([]) + repetition(component_num, value=component) + + # Assign at the component level if subcomponent_num is None: component[:] = [value] return + + # Assign at the subcomponent level while len(component) < subcomponent_num: component.append("") component(subcomponent_num, value) + def _adjust_index(self, index): # First element is the segment name, so we don't need to adjust to get 1-based return index diff --git a/hl7/util.py b/hl7/util.py index 9418122..c1220fb 100644 --- a/hl7/util.py +++ b/hl7/util.py @@ -4,7 +4,7 @@ import random import string -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) def ishl7(line): @@ -137,7 +137,8 @@ def escape(container, field, app_map=None): return "".join(rv) -def unescape(container, field, app_map=None): # noqa: C901 +def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 + """ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/ @@ -162,6 +163,9 @@ def unescape(container, field, app_map=None): # noqa: C901 It cannot: * switch code pages / ISO IR character sets + + If there is an error decoding an escape set, the original text is appended to + the returned text. """ if not field or field.find(container.esc) == -1: return field @@ -184,6 +188,10 @@ def unescape(container, field, app_map=None): # noqa: C901 ".ce": "\r", } + max_escape = max(10, max(len(x) for x in DEFAULT_MAP.keys())) + if app_map: + max_escape = max(max_escape, max(len(x) for x in app_map.keys())) + rv = [] collecting = [] in_seq = False @@ -194,11 +202,13 @@ def unescape(container, field, app_map=None): # noqa: C901 value = "".join(collecting) collecting = [] if not value: - logger.warn( - "Error unescaping value [%s], empty sequence found at %d", - field, - offset, - ) + if is_log_error: + logger.warn( + "Error unescaping empty sequence found at %d", + offset, + ) + rv.append(container.esc) + rv.append(container.esc) continue if app_map and value in app_map: rv.append(app_map[value]) @@ -219,41 +229,63 @@ def unescape(container, field, app_map=None): # noqa: C901 value[0] == "C" ): # Convert to new Single Byte character set : 2.10.2 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value - logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", - value, - field, - offset, - ) + if is_log_error: + logger.warn( + "Error inline character sets [%s] not implemented, offset [%s]", + value, + offset, + ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) elif value[0] == "M": # Switch to new Multi Byte character set : 2.10.2 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value - logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", - value, - field, - offset, - ) + if is_log_error: + logger.warn( + "Error inline character sets [%s] not implemented, offset [%s]", + value, + offset, + ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) elif value[0] == "X": # Hex encoded Bytes: 2.10.5 value = value[1:] try: for off in range(0, len(value), 2): rv.append(chr(int(value[off : off + 2], 16))) except Exception: + if is_log_error: + logger.exception( + "Error decoding hex value [%s], offset [%s]", + value, + offset, + ) + rv.append(container.esc) + rv.append("X") + rv.append(value) + rv.append(container.esc) + else: + if is_log_error: logger.exception( - "Error decoding hex value [%s], field [%s], offset [%s]", + "Error decoding value [%s], offset [%s]", value, - field, offset, ) - else: - logger.exception( - "Error decoding value [%s], field [%s], offset [%s]", - value, - field, - offset, - ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) else: collecting.append(c) + if (len(collecting) > max_escape and collecting[0] not in "XZ") \ + or (len(collecting) > 10 and collecting[0] in "XZ"): + # We have collected beyond the maximum number of characters in an escape sequence + # Assume the message is badly formed and append the initial escape plus collected + # characters to the output + rv.append(container.esc) + rv.extend(collecting) + collecting = [] + in_seq = False elif c == container.esc: in_seq = True else: diff --git a/tests/test_assign_field.py b/tests/test_assign_field.py new file mode 100644 index 0000000..e8e6a34 --- /dev/null +++ b/tests/test_assign_field.py @@ -0,0 +1,140 @@ +from unittest import TestCase + +import hl7 + +sample_hl7_1 = "\r".join( + [ + "MSH|^~\\&|field|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r", + ] +) + +sample_hl7_2 = "\r".join( + [ + "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&f1r1c2s2~f1r2c1s1&f1r2c1s2^f1r2c2s1&f1r2c2s\r" + ] +) + +SEP = r"|^~\&" +CR_SEP = "\r" + + +class AssignFieldTest(TestCase): + def test_assign_field(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + seg.assign_field("NewField", 3) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 4) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 5) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 6) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|NewField\r") + + seg.assign_field("NewField", 7) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|NewField|NewField\r") + + + def test_assign_repeat(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + seg.assign_field("NewRep", 3, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 3, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep", 4, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 4, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + + seg.assign_field("NewRep", 5, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|NewRep|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 5, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|NewRep~NewRep2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + def test_assign_component(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + seg.assign_field("NewComp", 3, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 3, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp3", 3, 2, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp", 4, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 4, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp", 5, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 5, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^NewComp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + + seg.assign_field("NewComp2", 6, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^NewComp2|rep1comp1^NewComp2~rep2comp1^rep1comp2\r") + + + def test_assign_subcomponent(self): + msg = hl7.parse(sample_hl7_2) + seg = msg[0] + + seg.assign_field("NewSub", 3, 1, 2, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~f1r2c1s1&f1r2c1s2^f1r2c2s1&f1r2c2s\r") + + seg.assign_field("NewComp", 3, 2, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~NewComp^f1r2c2s1&f1r2c2s\r") + + seg.assign_field("NewRep", 3, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~NewRep\r") + + seg.assign_field("NewField", 3) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField\r") + + for field in [3,4]: + for rep in [1,2]: + for comp in [1,2]: + for sub in [1,2]: + seg.assign_field(f"f{field}r{rep}c{comp}s{sub}", field, rep, comp, sub) + new_msg = str(msg) + self.assertEqual(new_msg, 'MSH|^~\\&|f3r1c1s1&f3r1c1s2^f3r1c2s1&f3r1c2s2~f3r2c1s1&f3r2c1s2^f3r2c2s1&f3r2c2s2|f4r1c1s1&f4r1c1s2^f4r1c2s1&f4r1c2s2~f4r2c1s1&f4r2c1s2^f4r2c2s1&f4r2c2s2\r') + diff --git a/tests/test_construction.py b/tests/test_construction.py index f6af481..6c5ff95 100644 --- a/tests/test_construction.py +++ b/tests/test_construction.py @@ -3,7 +3,7 @@ import hl7 -from .samples import rep_sample_hl7 +from samples import rep_sample_hl7 SEP = r"|^~\&" CR_SEP = "\r" diff --git a/tests/test_containers.py b/tests/test_containers.py index b79d9e2..a3dc296 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -4,7 +4,7 @@ import hl7 from hl7 import Field, Segment -from .samples import sample_hl7 +from samples import sample_hl7 class ContainerTest(TestCase): diff --git a/tests/test_parse.py b/tests/test_parse.py index 0b5c251..7df1197 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -4,7 +4,7 @@ import hl7 from hl7 import Accessor, Component, Field, Message, ParseException, Repetition, Segment -from .samples import ( +from samples import ( rep_sample_hl7, sample_bad_batch, sample_bad_batch1, @@ -27,7 +27,6 @@ sample_hl7, ) - class ParseTest(TestCase): def test_parse(self): msg = hl7.parse(sample_hl7) @@ -377,6 +376,23 @@ def test_unescape(self): self.assertEqual(msg.unescape("\\X20202020\\"), " ") self.assertEqual(msg.unescape("\\Xe1\\\\Xe9\\\\Xed\\\\Xf3\\\\Xfa\\"), "áéíóú") + + def test_unescape_broken(self): + msg = hl7.parse(rep_sample_hl7) + + self.assertEqual(msg.unescape("Empty \\\\"), "Empty \\\\") + self.assertEqual(msg.unescape("Undefined \\A\\"), "Undefined \\A\\") + self.assertEqual(msg.unescape("Undefined \\A\\", app_map={"A": "*"}), "Undefined *") + + self.assertEqual(msg.unescape(".br\\ but the text goes on beyond limit"), ".br\\ but the text goes on beyond limit") + + self.assertEqual(msg.unescape("not implemented \\C2842\\"), "not implemented \\C2842\\") + self.assertEqual(msg.unescape("not implemented \\M2442\\"), "not implemented \\M2442\\") + + self.assertEqual(msg.unescape("part convert \\X30no31\\"), "part convert 0\\X30no31\\") + + + def test_escape(self): msg = hl7.parse(rep_sample_hl7) diff --git a/tests/test_util.py b/tests/test_util.py index 8c9de22..639aebb 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,7 +3,7 @@ import hl7 -from .samples import ( +from samples import ( sample_batch, sample_batch1, sample_batch2, diff --git a/tests/test_write_field.py b/tests/test_write_field.py new file mode 100644 index 0000000..df84dcd --- /dev/null +++ b/tests/test_write_field.py @@ -0,0 +1,29 @@ +from unittest import TestCase + +import hl7 + +sample_hl7_1 = "\r".join( + [ + "MSH|^~\\&|field|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r", + ] +) + + + +class WriteFieldTest(TestCase): + """Test the write_field() function, which escapes the value to be written.""" + def test_escape_separators(self): + """Test writing a field with separators in it""" + + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + # Write to field via the Segment object with escaping. + seg.write_field("New with Field | rep ~ sub & and escape \\", 4) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|field|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + # Write to field via the Message object with escaping. + msg.write_field("New with Field | rep ~ sub & and escape \\", "MSH", 1, 5, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|field|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") \ No newline at end of file