From 08a46e7f38acc354fbb4de27ad3be88bbb2b79a7 Mon Sep 17 00:00:00 2001 From: eight04 Date: Wed, 28 Feb 2024 16:24:10 +0800 Subject: [PATCH 1/4] Add: ByteScreen --- .gitignore | 1 + pyte/__init__.py | 2 +- pyte/screens.py | 46 ++++++++++++++++++++++++++++++++++++++------ tests/test_screen.py | 10 ++++++++++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index d0fa950..c2851c7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ dist/ *.egg-info/ .eggs/ .pytest_cache/ +.venv/ diff --git a/pyte/__init__.py b/pyte/__init__.py index 226e62b..571ca18 100644 --- a/pyte/__init__.py +++ b/pyte/__init__.py @@ -28,7 +28,7 @@ import io from typing import Union -from .screens import Screen, DiffScreen, HistoryScreen, DebugScreen +from .screens import Screen, DiffScreen, HistoryScreen, DebugScreen, ByteScreen from .streams import Stream, ByteStream diff --git a/pyte/screens.py b/pyte/screens.py index ebbac7c..80a67c5 100644 --- a/pyte/screens.py +++ b/pyte/screens.py @@ -47,8 +47,6 @@ ) from .streams import Stream -wcwidth: Callable[[str], int] = lru_cache(maxsize=4096)(_wcwidth) - KT = TypeVar("KT") VT = TypeVar("VT") @@ -135,7 +133,7 @@ def __missing__(self, key: KT) -> VT: _DEFAULT_MODE = set([mo.DECAWM, mo.DECTCEM]) - +_DEFAULT_WCWIDTH: Callable[[str], int] = lru_cache(maxsize=4096)(_wcwidth) class Screen: """ @@ -222,6 +220,7 @@ def __init__(self, columns: int, lines: int) -> None: self.reset() self.mode = _DEFAULT_MODE.copy() self.margins: Optional[Margins] = None + self.wcwidth = _DEFAULT_WCWIDTH def __repr__(self) -> str: return ("{0}({1}, {2})".format(self.__class__.__name__, @@ -237,8 +236,8 @@ def render(line: StaticDefaultDict[int, Char]) -> Generator[str, None, None]: is_wide_char = False continue char = line[x].data - assert sum(map(wcwidth, char[1:])) == 0 - is_wide_char = wcwidth(char[0]) == 2 + assert sum(map(self.wcwidth, char[1:])) == 0 + is_wide_char = self.wcwidth(char[0]) == 2 yield char return ["".join(render(self.buffer[y])) for y in range(self.lines)] @@ -479,7 +478,7 @@ def draw(self, data: str) -> None: self.g1_charset if self.charset else self.g0_charset) for char in data: - char_width = wcwidth(char) + char_width = self.wcwidth(char) # If this was the last column in a line and auto wrap mode is # enabled, move the cursor to the beginning of the next line, @@ -1337,3 +1336,38 @@ def __getattribute__(self, attr: str) -> Callable[..., None]: return self.only_wrapper(attr) else: return lambda *args, **kwargs: None + +def byte_screen_wcwidth(text: str): + # FIXME: should we always return 1? + n = _DEFAULT_WCWIDTH(text) + if n <= 0 and text <= "\xff": + return 1 + return n + +class ByteScreen(Screen): + """A screen that draws bytes and stores byte-string in the buffer, including un-printable/zero-length chars.""" + def __init__(self, *args, encoding: str | None=None, **kwargs): + """ + :param encoding: The encoding of the screen. If set, the byte-string will be decoded when calling :meth:`ByteScreen.display`. + """ + super().__init__(*args, **kwargs) + self.encoding = encoding + self.wcwidth = byte_screen_wcwidth + + def draw(self, data: str | bytes): + if isinstance(data, bytes): + data = data.decode("latin-1") + return super().draw(data) + + @property + def display(self) -> List[str]: + if not self.encoding: + return super().display + + def render(line: StaticDefaultDict[int, Char]) -> Generator[str, None, None]: + for x in range(self.columns): + char = line[x].data + yield char + + return ["".join(render(self.buffer[y])).encode("latin-1").decode(self.encoding) for y in range(self.lines)] + diff --git a/tests/test_screen.py b/tests/test_screen.py index b6ba90d..140ba35 100644 --- a/tests/test_screen.py +++ b/tests/test_screen.py @@ -1583,3 +1583,13 @@ def test_screen_set_icon_name_title(): screen.set_title(text) assert screen.title == text + + +def test_byte_screen() -> None: + screen = pyte.ByteScreen(10, 1, encoding="big5") + + text = "限".encode("big5") + screen.draw(text) + assert screen.display[0].strip() == "限" + assert screen.buffer[0][0].data == "\xad" + From b81be31b4277dacd501e8f6e340b73b2cf0ce544 Mon Sep 17 00:00:00 2001 From: eight04 Date: Wed, 28 Feb 2024 16:57:44 +0800 Subject: [PATCH 2/4] Add: use_c1 option to Stream --- pyte/streams.py | 7 ++++--- tests/test_stream.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pyte/streams.py b/pyte/streams.py index cd0f51c..8417042 100644 --- a/pyte/streams.py +++ b/pyte/streams.py @@ -139,10 +139,11 @@ class Stream: "[^" + "".join(map(re.escape, _special)) + "]+") del _special - def __init__(self, screen: Optional[Screen] = None, strict: bool = True) -> None: + def __init__(self, screen: Optional[Screen] = None, strict: bool = True, use_c1: bool = True) -> None: self.listener: Optional[Screen] = None self.strict = strict self.use_utf8: bool = True + self.use_c1: bool = use_c1 self._taking_plain_text: Optional[bool] = None @@ -304,7 +305,7 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non continue basic_dispatch[char]() - elif char == CSI_C1: + elif char == CSI_C1 and self.use_c1: # All parameters are unsigned, positive decimal integers, with # the most significant digit sent first. Any parameter greater # than 9999 is set to 9999. If you do not specify a value, a 0 @@ -354,7 +355,7 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non else: csi_dispatch[char](*params) break # CSI is finished. - elif char == OSC_C1: + elif char == OSC_C1 and self.use_c1: code = yield None if code == "R": continue # Reset palette. Not implemented. diff --git a/tests/test_stream.py b/tests/test_stream.py index 7a3ad92..79b6a57 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -332,3 +332,13 @@ def test_byte_stream_select_other_charset(): # c) enable utf-8 stream.select_other_charset("G") assert stream.use_utf8 + + +def test_byte_stream_without_c1() -> None: + screen = pyte.ByteScreen(3, 3) + stream = pyte.ByteStream(screen, use_c1=False) + stream.select_other_charset("@") + b = '𡶐'.encode("big5-hkscs") + stream.feed(b) + assert screen.display[0] == "\x9b\xf3 " + assert stream.use_c1 == False From 6faa91ad3809d1aa386c0d21aff56082f456172b Mon Sep 17 00:00:00 2001 From: eight04 Date: Wed, 28 Feb 2024 17:13:14 +0800 Subject: [PATCH 3/4] Fix: typehint --- pyte/screens.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyte/screens.py b/pyte/screens.py index 80a67c5..5ae2d47 100644 --- a/pyte/screens.py +++ b/pyte/screens.py @@ -1337,7 +1337,7 @@ def __getattribute__(self, attr: str) -> Callable[..., None]: else: return lambda *args, **kwargs: None -def byte_screen_wcwidth(text: str): +def byte_screen_wcwidth(text: str) -> int: # FIXME: should we always return 1? n = _DEFAULT_WCWIDTH(text) if n <= 0 and text <= "\xff": @@ -1346,7 +1346,7 @@ def byte_screen_wcwidth(text: str): class ByteScreen(Screen): """A screen that draws bytes and stores byte-string in the buffer, including un-printable/zero-length chars.""" - def __init__(self, *args, encoding: str | None=None, **kwargs): + def __init__(self, *args: Any, encoding: str|None = None, **kwargs: Any): """ :param encoding: The encoding of the screen. If set, the byte-string will be decoded when calling :meth:`ByteScreen.display`. """ @@ -1354,7 +1354,7 @@ def __init__(self, *args, encoding: str | None=None, **kwargs): self.encoding = encoding self.wcwidth = byte_screen_wcwidth - def draw(self, data: str | bytes): + def draw(self, data: str|bytes) -> None: if isinstance(data, bytes): data = data.decode("latin-1") return super().draw(data) From c50465c9c2fc335b8f3dc243bea446636fba8c75 Mon Sep 17 00:00:00 2001 From: eight04 Date: Wed, 28 Feb 2024 17:52:49 +0800 Subject: [PATCH 4/4] Fix: c0 is broken --- pyte/streams.py | 13 +++++++++++-- tests/test_stream.py | 9 +++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pyte/streams.py b/pyte/streams.py index 8417042..262cbe0 100644 --- a/pyte/streams.py +++ b/pyte/streams.py @@ -238,6 +238,9 @@ def _parser_fsm(self) -> ParserGenerator: draw = listener.draw debug = listener.debug + use_c1 = self.use_c1 + in_control = False + ESC, CSI_C1 = ctrl.ESC, ctrl.CSI_C1 OSC_C1 = ctrl.OSC_C1 SP_OR_GT = ctrl.SP + ">" @@ -275,6 +278,7 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non # recognizes ``ESC % C`` sequences for selecting control # character set. However, in the current version these # are noop. + in_control = True char = yield None if char == "[": char = CSI_C1 # Go to CSI. @@ -305,7 +309,7 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non continue basic_dispatch[char]() - elif char == CSI_C1 and self.use_c1: + elif char == CSI_C1 and (use_c1 or in_control): # All parameters are unsigned, positive decimal integers, with # the most significant digit sent first. Any parameter greater # than 9999 is set to 9999. If you do not specify a value, a 0 @@ -355,11 +359,14 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non else: csi_dispatch[char](*params) break # CSI is finished. - elif char == OSC_C1 and self.use_c1: + in_control = False + elif char == OSC_C1 and (use_c1 or in_control): code = yield None if code == "R": + in_control = False continue # Reset palette. Not implemented. elif code == "P": + in_control = False continue # Set palette. Not implemented. param = "" @@ -377,6 +384,8 @@ def create_dispatcher(mapping: Mapping[str, str]) -> Dict[str, Callable[..., Non listener.set_icon_name(param) if code in "02": listener.set_title(param) + + in_control = False elif char not in NUL_OR_DEL: draw(char) diff --git a/tests/test_stream.py b/tests/test_stream.py index 79b6a57..59f0fed 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -342,3 +342,12 @@ def test_byte_stream_without_c1() -> None: stream.feed(b) assert screen.display[0] == "\x9b\xf3 " assert stream.use_c1 == False + +def test_byte_stream_without_c1_with_c0() -> None: + screen = pyte.ByteScreen(3, 3) + stream = pyte.ByteStream(screen, use_c1=False) + stream.select_other_charset("@") + stream.feed(b"\x1b[1;36m") + assert screen.display[0] == " " + +