From 56eb2848e0aa5c793bbe72254817f57431e297b3 Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 27 Feb 2023 08:18:08 -0800 Subject: [PATCH 1/7] Remove relative imports from tests. --- tests/test_construction.py | 2 +- tests/test_containers.py | 2 +- tests/test_parse.py | 2 +- tests/test_util.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_construction.py b/tests/test_construction.py index f6af481..6c5ff95 100644 --- a/tests/test_construction.py +++ b/tests/test_construction.py @@ -3,7 +3,7 @@ import hl7 -from .samples import rep_sample_hl7 +from samples import rep_sample_hl7 SEP = r"|^~\&" CR_SEP = "\r" diff --git a/tests/test_containers.py b/tests/test_containers.py index b79d9e2..a3dc296 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -4,7 +4,7 @@ import hl7 from hl7 import Field, Segment -from .samples import sample_hl7 +from samples import sample_hl7 class ContainerTest(TestCase): diff --git a/tests/test_parse.py b/tests/test_parse.py index 0b5c251..fae9840 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -4,7 +4,7 @@ import hl7 from hl7 import Accessor, Component, Field, Message, ParseException, Repetition, Segment -from .samples import ( +from samples import ( rep_sample_hl7, sample_bad_batch, sample_bad_batch1, diff --git a/tests/test_util.py b/tests/test_util.py index 8c9de22..639aebb 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,7 +3,7 @@ import hl7 -from .samples import ( +from samples import ( sample_batch, sample_batch1, sample_batch2, From 33a133fd94d4431d1b3a659d988d48aed75065eb Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 27 Feb 2023 08:21:37 -0800 Subject: [PATCH 2/7] Rewrite HL7 fields with new structure. (test_assign_fields.py/EscapeTest is known to fail. Where the escape of a field is done needs to be resolved.) --- hl7/containers.py | 31 +++++++- tests/test_assign_field.py | 155 +++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 tests/test_assign_field.py diff --git a/hl7/containers.py b/hl7/containers.py index 1620b38..a9fedf6 100644 --- a/hl7/containers.py +++ b/hl7/containers.py @@ -497,7 +497,7 @@ def escape(self, field, app_map=None): To process this correctly, the full set of separators (MSH.1/MSH.2) needs to be known. - Pass through the message. Replace recognised characters with their escaped + Pass through the message. Replace recognized characters with their escaped version. Return an ascii encoded string. Functionality: @@ -563,11 +563,11 @@ def create_ack( msh.assign_field(str(source_msh(1)), 1) msh.assign_field(str(source_msh(2)), 2) - # Sending application is source receving application + # Sending application is source receiving application msh.assign_field( str(application) if application is not None else str(source_msh(5)), 3 ) - # Sending facility is source receving facility + # Sending facility is source receiving facility msh.assign_field( str(facility) if facility is not None else str(source_msh(6)), 4 ) @@ -739,28 +739,53 @@ def assign_field( http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing """ + # Extend the segment as needed. while len(self) <= field_num: self.append(self.create_field([])) field = self(field_num) + + # Assign at the field level. if repeat_num is None: field[:] = [value] return + + # Field is never a string so we don't need to test that. + + # Extend the field repeat as needed. while len(field) < repeat_num: field.append(self.create_repetition([])) repetition = field(repeat_num) + + if isinstance(repetition, str): + # If the Field was a leaf (string) convert it to a repetition + repetition = self.create_repetition([]) + field(repeat_num, value=repetition) + + # Assign at the repetition level. if component_num is None: repetition[:] = [value] return + while len(repetition) < component_num: repetition.append(self.create_component([])) component = repetition(component_num) + + if isinstance(component, str): + # if the repetition was a leaf (string), convert it to a component. + component = self.create_component([]) + repetition(component_num, value=component) + + # Assign at the component level if subcomponent_num is None: component[:] = [value] return + + # Assign at the subcomponent level while len(component) < subcomponent_num: component.append("") component(subcomponent_num, value) + def _adjust_index(self, index): # First element is the segment name, so we don't need to adjust to get 1-based return index diff --git a/tests/test_assign_field.py b/tests/test_assign_field.py new file mode 100644 index 0000000..e28321f --- /dev/null +++ b/tests/test_assign_field.py @@ -0,0 +1,155 @@ +from unittest import TestCase + +import hl7 + +sample_hl7_1 = "\r".join( + [ + "MSH|^~\\&|field|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r", + ] +) + +sample_hl7_2 = "\r".join( + [ + "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&f1r1c2s2~f1r2c1s1&f1r2c1s2^f1r2c2s1&f1r2c2s\r" + ] +) + +SEP = r"|^~\&" +CR_SEP = "\r" + + +class AssignFieldTest(TestCase): + def test_assign_field(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + seg.assign_field("NewField", 3) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 4) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 5) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewField", 6) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|NewField\r") + + seg.assign_field("NewField", 7) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField|NewField|NewField|NewField|NewField\r") + + + def test_assign_repeat(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + seg.assign_field("NewRep", 3, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 3, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep", 4, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 4, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + + seg.assign_field("NewRep", 5, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|NewRep|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewRep2", 5, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewRep~NewRep2|NewRep~NewRep2|NewRep~NewRep2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + def test_assign_component(self): + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + seg.assign_field("NewComp", 3, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 3, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp3", 3, 2, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp", 4, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 4, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp", 5, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + seg.assign_field("NewComp2", 5, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^NewComp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + + seg.assign_field("NewComp2", 6, 1, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewComp^NewComp2~^NewComp3|NewComp^NewComp2~rep2|NewComp^NewComp2|rep1comp1^NewComp2~rep2comp1^rep1comp2\r") + + + def test_assign_subcomponent(self): + msg = hl7.parse(sample_hl7_2) + seg = msg[0] + + seg.assign_field("NewSub", 3, 1, 2, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~f1r2c1s1&f1r2c1s2^f1r2c2s1&f1r2c2s\r") + + seg.assign_field("NewComp", 3, 2, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~NewComp^f1r2c2s1&f1r2c2s\r") + + seg.assign_field("NewRep", 3, 2) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|f1r1c1s1&f1r1c1s2^f1r1c2s1&NewSub~NewRep\r") + + seg.assign_field("NewField", 3) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|NewField\r") + + for field in [3,4]: + for rep in [1,2]: + for comp in [1,2]: + for sub in [1,2]: + seg.assign_field(f"f{field}r{rep}c{comp}s{sub}", field, rep, comp, sub) + new_msg = str(msg) + self.assertEqual(new_msg, 'MSH|^~\\&|f3r1c1s1&f3r1c1s2^f3r1c2s1&f3r1c2s2~f3r2c1s1&f3r2c1s2^f3r2c2s1&f3r2c2s2|f4r1c1s1&f4r1c1s2^f4r1c2s1&f4r1c2s2~f4r2c1s1&f4r2c1s2^f4r2c2s1&f4r2c2s2\r') + + + + +class EscapeTest(TestCase): + def test_escape_separators(self): + """Test assigning a field with separators in it""" + # 2/27/2023: This is known to fail. assign_field does not escape the incoming data. + # however, to re-code it to do so breaks some of the parsing code that expects to write + # the separators. + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + seg.assign_field("New with Field | rep ~ sub & and escape \\", 4) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") \ No newline at end of file From 6a7e035d49947ef471014d87b4e6ea27eab790e7 Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Thu, 9 Mar 2023 07:04:06 -0800 Subject: [PATCH 3/7] Add write_field() to Message and Segment. Writes an escaped field value. --- hl7/containers.py | 32 ++++++++++++++++++++++++++++++++ tests/test_assign_field.py | 15 --------------- tests/test_write_field.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 15 deletions(-) create mode 100644 tests/test_write_field.py diff --git a/hl7/containers.py b/hl7/containers.py index a9fedf6..d83c8e8 100644 --- a/hl7/containers.py +++ b/hl7/containers.py @@ -470,6 +470,27 @@ def extract_field( segment_num, field_num, repeat_num, component_num, subcomponent_num ) + def write_field( + self, + value, + segment, + segment_num=1, + field_num=None, + repeat_num=None, + component_num=None, + subcomponent_num=None, + ): + """ + Write a value, with escaping, into a message using the tree based assignment notation. + The segment must exist. + + Extract a field using a future proofed approach, based on rules in: + http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing + """ + self.segments(segment)(segment_num).write_field( + value, field_num, repeat_num, component_num, subcomponent_num + ) + def assign_field( self, value, @@ -723,6 +744,17 @@ def extract_field( else: return "" # Assume non-present optional value + def write_field( + self, + value, + field_num=None, + repeat_num=None, + component_num=None, + subcomponent_num=None, + ): + """Write a field with escaping.""" + self.assign_field(escape(self, value), field_num, repeat_num, component_num, subcomponent_num) + def assign_field( self, value, diff --git a/tests/test_assign_field.py b/tests/test_assign_field.py index e28321f..e8e6a34 100644 --- a/tests/test_assign_field.py +++ b/tests/test_assign_field.py @@ -138,18 +138,3 @@ def test_assign_subcomponent(self): new_msg = str(msg) self.assertEqual(new_msg, 'MSH|^~\\&|f3r1c1s1&f3r1c1s2^f3r1c2s1&f3r1c2s2~f3r2c1s1&f3r2c1s2^f3r2c2s1&f3r2c2s2|f4r1c1s1&f4r1c1s2^f4r1c2s1&f4r1c2s2~f4r2c1s1&f4r2c1s2^f4r2c2s1&f4r2c2s2\r') - - - -class EscapeTest(TestCase): - def test_escape_separators(self): - """Test assigning a field with separators in it""" - # 2/27/2023: This is known to fail. assign_field does not escape the incoming data. - # however, to re-code it to do so breaks some of the parsing code that expects to write - # the separators. - msg = hl7.parse(sample_hl7_1) - seg = msg[0] - - seg.assign_field("New with Field | rep ~ sub & and escape \\", 4) - new_msg = str(msg) - self.assertEqual(new_msg, "MSH|^~\\&|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") \ No newline at end of file diff --git a/tests/test_write_field.py b/tests/test_write_field.py new file mode 100644 index 0000000..df84dcd --- /dev/null +++ b/tests/test_write_field.py @@ -0,0 +1,29 @@ +from unittest import TestCase + +import hl7 + +sample_hl7_1 = "\r".join( + [ + "MSH|^~\\&|field|rep1~rep2|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r", + ] +) + + + +class WriteFieldTest(TestCase): + """Test the write_field() function, which escapes the value to be written.""" + def test_escape_separators(self): + """Test writing a field with separators in it""" + + msg = hl7.parse(sample_hl7_1) + seg = msg[0] + + # Write to field via the Segment object with escaping. + seg.write_field("New with Field | rep ~ sub & and escape \\", 4) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|field|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|comp1^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") + + # Write to field via the Message object with escaping. + msg.write_field("New with Field | rep ~ sub & and escape \\", "MSH", 1, 5, 1, 1) + new_msg = str(msg) + self.assertEqual(new_msg, "MSH|^~\\&|field|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\|New with Field \\F\\ rep \\R\\ sub \\T\\ and escape \\E\\^comp2|rep1comp1^rep1comp2~rep2comp1^rep1comp2\r") \ No newline at end of file From 7f4764d2936f2bdc60558a4d666e51bfde5c8a1d Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 10 Apr 2023 07:08:26 -0700 Subject: [PATCH 4/7] Change logger names from __file__ to __name__ --- hl7/containers.py | 2 +- hl7/util.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hl7/containers.py b/hl7/containers.py index d83c8e8..8e96970 100644 --- a/hl7/containers.py +++ b/hl7/containers.py @@ -10,7 +10,7 @@ ) from .util import escape, generate_message_control_id, unescape -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) _SENTINEL = object() diff --git a/hl7/util.py b/hl7/util.py index 9418122..2254517 100644 --- a/hl7/util.py +++ b/hl7/util.py @@ -4,7 +4,7 @@ import random import string -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) def ishl7(line): From 650c6cfe68dc50156769f31b192c8fc887f78c64 Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 10 Apr 2023 07:59:28 -0700 Subject: [PATCH 5/7] Instead of logging escape errors - and there are many - simply write the un-parsable escape to the output. An optional parameter allows calling for errors to be logged. --- hl7/util.py | 76 ++++++++++++++++++++++++++++++++------------- tests/test_parse.py | 14 +++++++++ 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/hl7/util.py b/hl7/util.py index 2254517..41d80bd 100644 --- a/hl7/util.py +++ b/hl7/util.py @@ -137,7 +137,8 @@ def escape(container, field, app_map=None): return "".join(rv) -def unescape(container, field, app_map=None): # noqa: C901 +def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 + """ See: http://www.hl7standards.com/blog/2006/11/02/hl7-escape-sequences/ @@ -162,6 +163,9 @@ def unescape(container, field, app_map=None): # noqa: C901 It cannot: * switch code pages / ISO IR character sets + + If there is an error decoding an escape set, the original text is appended to + the returned text. """ if not field or field.find(container.esc) == -1: return field @@ -184,6 +188,10 @@ def unescape(container, field, app_map=None): # noqa: C901 ".ce": "\r", } + max_escape = max(len(x) for x in DEFAULT_MAP.keys()) + if app_map: + max_escape = max(max_escape, max(len(x) for x in app_map.keys())) + rv = [] collecting = [] in_seq = False @@ -219,41 +227,67 @@ def unescape(container, field, app_map=None): # noqa: C901 value[0] == "C" ): # Convert to new Single Byte character set : 2.10.2 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value - logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", - value, - field, - offset, - ) + if is_log_error: + logger.warn( + "Error inline character sets [%s] not implemented, field [%s], offset [%s]", + value, + field, + offset, + ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) elif value[0] == "M": # Switch to new Multi Byte character set : 2.10.2 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value - logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", - value, - field, - offset, - ) + if is_log_error: + logger.warn( + "Error inline character sets [%s] not implemented, field [%s], offset [%s]", + value, + field, + offset, + ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) elif value[0] == "X": # Hex encoded Bytes: 2.10.5 value = value[1:] try: for off in range(0, len(value), 2): rv.append(chr(int(value[off : off + 2], 16))) except Exception: + if is_log_error: + logger.exception( + "Error decoding hex value [%s], field [%s], offset [%s]", + value, + field, + offset, + ) + rv.append(container.esc) + rv.append("X") + rv.append(value) + rv.append(container.esc) + else: + if is_log_error: logger.exception( - "Error decoding hex value [%s], field [%s], offset [%s]", + "Error decoding value [%s], field [%s], offset [%s]", value, field, offset, ) - else: - logger.exception( - "Error decoding value [%s], field [%s], offset [%s]", - value, - field, - offset, - ) + rv.append(container.esc) + rv.append(value) + rv.append(container.esc) else: collecting.append(c) + if (len(collecting) > max_escape and collecting[0] not in "XZMC") \ + or (len(collecting) > 10 and collecting[0] in "XZMC"): + # We have collected beyond the maximum number of characters in an escape sequence + # Assume the message is badly formed and append the initial escape plus collected + # characters to the output + rv.append(container.esc) + rv.extend(collecting) + collecting = [] + in_seq = False elif c == container.esc: in_seq = True else: diff --git a/tests/test_parse.py b/tests/test_parse.py index fae9840..e5e3ee2 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -377,6 +377,20 @@ def test_unescape(self): self.assertEqual(msg.unescape("\\X20202020\\"), " ") self.assertEqual(msg.unescape("\\Xe1\\\\Xe9\\\\Xed\\\\Xf3\\\\Xfa\\"), "áéíóú") + + def test_unescape_broken(self): + msg = hl7.parse(rep_sample_hl7) + + self.assertEqual(msg.unescape("Undefined \\A\\"), "Undefined \\A\\") + self.assertEqual(msg.unescape("Undefined \\A\\", app_map={"A": "*"}), "Undefined *") + + self.assertEqual(msg.unescape(".br\\ but the text goes on beyond limit"), ".br\\ but the text goes on beyond limit") + + self.assertEqual(msg.unescape("not implemented \\C2842\\"), "not implemented \\C2842\\") + self.assertEqual(msg.unescape("not implemented \\M2442\\"), "not implemented \\M2442\\") + + self.assertEqual(msg.unescape("part convert \\X30no31\\"), "part convert 0\\X30no31\\") + def test_escape(self): msg = hl7.parse(rep_sample_hl7) From 3e97909ca7dbe33c25c9ddf4dd0f2f20fe859f19 Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 10 Apr 2023 08:53:36 -0700 Subject: [PATCH 6/7] set a minimum escape length of 10 to cover /Mdddddd/ --- hl7/util.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hl7/util.py b/hl7/util.py index 41d80bd..27ad527 100644 --- a/hl7/util.py +++ b/hl7/util.py @@ -188,7 +188,7 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 ".ce": "\r", } - max_escape = max(len(x) for x in DEFAULT_MAP.keys()) + max_escape = max(10, max(len(x) for x in DEFAULT_MAP.keys())) if app_map: max_escape = max(max_escape, max(len(x) for x in app_map.keys())) @@ -279,8 +279,8 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 rv.append(container.esc) else: collecting.append(c) - if (len(collecting) > max_escape and collecting[0] not in "XZMC") \ - or (len(collecting) > 10 and collecting[0] in "XZMC"): + if (len(collecting) > max_escape and collecting[0] not in "XZ") \ + or (len(collecting) > 10 and collecting[0] in "XZ"): # We have collected beyond the maximum number of characters in an escape sequence # Assume the message is badly formed and append the initial escape plus collected # characters to the output From f94144bc0a1e51805165b52754e1975d278e4878 Mon Sep 17 00:00:00 2001 From: Tom Unger Date: Mon, 10 Apr 2023 13:29:58 -0700 Subject: [PATCH 7/7] Handle empty escape case. Don't log the field value. In the case of reports, the field maybe very long --- hl7/util.py | 24 +++++++++++------------- tests/test_parse.py | 4 +++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/hl7/util.py b/hl7/util.py index 27ad527..c1220fb 100644 --- a/hl7/util.py +++ b/hl7/util.py @@ -202,11 +202,13 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 value = "".join(collecting) collecting = [] if not value: - logger.warn( - "Error unescaping value [%s], empty sequence found at %d", - field, - offset, - ) + if is_log_error: + logger.warn( + "Error unescaping empty sequence found at %d", + offset, + ) + rv.append(container.esc) + rv.append(container.esc) continue if app_map and value in app_map: rv.append(app_map[value]) @@ -229,9 +231,8 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 # Two HEX values, first value chooses the character set (ISO-IR), second gives the value if is_log_error: logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", + "Error inline character sets [%s] not implemented, offset [%s]", value, - field, offset, ) rv.append(container.esc) @@ -241,9 +242,8 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 # Three HEX values, first value chooses the character set (ISO-IR), rest give the value if is_log_error: logger.warn( - "Error inline character sets [%s] not implemented, field [%s], offset [%s]", + "Error inline character sets [%s] not implemented, offset [%s]", value, - field, offset, ) rv.append(container.esc) @@ -257,9 +257,8 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 except Exception: if is_log_error: logger.exception( - "Error decoding hex value [%s], field [%s], offset [%s]", + "Error decoding hex value [%s], offset [%s]", value, - field, offset, ) rv.append(container.esc) @@ -269,9 +268,8 @@ def unescape(container, field, app_map=None, is_log_error=False): # noqa: C901 else: if is_log_error: logger.exception( - "Error decoding value [%s], field [%s], offset [%s]", + "Error decoding value [%s], offset [%s]", value, - field, offset, ) rv.append(container.esc) diff --git a/tests/test_parse.py b/tests/test_parse.py index e5e3ee2..7df1197 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -27,7 +27,6 @@ sample_hl7, ) - class ParseTest(TestCase): def test_parse(self): msg = hl7.parse(sample_hl7) @@ -381,6 +380,7 @@ def test_unescape(self): def test_unescape_broken(self): msg = hl7.parse(rep_sample_hl7) + self.assertEqual(msg.unescape("Empty \\\\"), "Empty \\\\") self.assertEqual(msg.unescape("Undefined \\A\\"), "Undefined \\A\\") self.assertEqual(msg.unescape("Undefined \\A\\", app_map={"A": "*"}), "Undefined *") @@ -391,6 +391,8 @@ def test_unescape_broken(self): self.assertEqual(msg.unescape("part convert \\X30no31\\"), "part convert 0\\X30no31\\") + + def test_escape(self): msg = hl7.parse(rep_sample_hl7)