diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 427575c..4d29491 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,7 @@ jobs: - run: uv sync --all-extras --dev - run: uv run ruff check - run: uv run ruff format --check + - run: uv run pyrefly check docs: runs-on: ubuntu-latest diff --git a/monitorcontrol/monitorcontrol.py b/monitorcontrol/monitorcontrol.py index ebbb5cb..166f40c 100644 --- a/monitorcontrol/monitorcontrol.py +++ b/monitorcontrol/monitorcontrol.py @@ -458,7 +458,7 @@ def set_input_source(self, value: Union[int, str, InputSource]): self._set_vcp_feature(vcp_codes.input_select, mode_value) -def get_vcps() -> List[Type[vcp.VCP]]: +def get_vcps() -> List[vcp.VCP]: """ Discovers virtual control panels. @@ -587,7 +587,8 @@ def _parse_capabilities(caps_str: str) -> dict: """ Converts the capabilities string into a nice dict """ - caps_dict = { + + caps_dict: dict[str, str | list | dict[int, dict]] = { # Used to specify the protocol class "prot": "", # Identifies the type of display @@ -595,10 +596,10 @@ def _parse_capabilities(caps_str: str) -> dict: # The display model number "model": "", # A list of supported VCP codes. Somehow not the same as "vcp" - "cmds": "", + "cmds": {}, # A list of supported VCP codes with a list of supported values # for each nc code - "vcp": "", + "vcp": {}, # undocumented "mswhql": "", # undocumented @@ -612,12 +613,13 @@ def _parse_capabilities(caps_str: str) -> dict: # Alternate name to be used for control "vcpname": "", # Parsed input sources into text. Not part of capabilities string. - "inputs": "", + "inputs": [], # Parsed color presets into text. Not part of capabilities string. "color_presets": "", } for key in caps_dict: + # The "cmds" and "vcp" caps can be a mapping if key in ["cmds", "vcp"]: caps_dict[key] = _convert_to_dict(_extract_a_cap(caps_str, key)) else: @@ -625,6 +627,10 @@ def _parse_capabilities(caps_str: str) -> dict: # Parse the input sources into a text list for readability input_source_cap = vcp_codes.input_select.value + + # Put this check here to appease the type checker + if isinstance(caps_dict["vcp"], str): + raise ValueError("VCP capabilities dictionary is the wrong type!") if input_source_cap in caps_dict["vcp"]: caps_dict["inputs"] = [] input_val_list = list(caps_dict["vcp"][input_source_cap].keys()) diff --git a/monitorcontrol/vcp/vcp_abc.py b/monitorcontrol/vcp/vcp_abc.py index 7b0a1c5..b50013c 100644 --- a/monitorcontrol/vcp/vcp_abc.py +++ b/monitorcontrol/vcp/vcp_abc.py @@ -21,9 +21,9 @@ class VCPPermissionError(VCPError): pass -class VCP(abc.ABC): +class VCP[T](abc.ABC): @abc.abstractmethod - def __enter__(self): + def __enter__(self) -> T: pass @abc.abstractmethod @@ -64,3 +64,7 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]: VCPError: Failed to get VCP feature. """ pass + + @abc.abstractmethod + def get_vcp_capabilities(self) -> str: + pass diff --git a/monitorcontrol/vcp/vcp_linux.py b/monitorcontrol/vcp/vcp_linux.py index 983bd37..c90e979 100644 --- a/monitorcontrol/vcp/vcp_linux.py +++ b/monitorcontrol/vcp/vcp_linux.py @@ -55,7 +55,7 @@ def __init__(self, bus_number: int): """ self.logger = logging.getLogger(__name__) self.bus_number = bus_number - self.fd: Optional[str] = None + self.fd: Optional[int] = None self.fp: str = f"/dev/i2c-{self.bus_number}" # time of last feature set call self.last_set: Optional[float] = None @@ -64,7 +64,7 @@ def __enter__(self): def cleanup(fd: Optional[int]): if fd is not None: try: - os.close(self.fd) + os.close(fd) except OSError: pass @@ -90,7 +90,8 @@ def __exit__( exception_traceback: Optional[TracebackType], ) -> Optional[bool]: try: - os.close(self.fd) + if self.fd: + os.close(self.fd) except OSError as e: raise VCPIOError("unable to close descriptor") from e self.fd = None @@ -359,11 +360,14 @@ def read_bytes(self, num_bytes: int) -> bytes: VCPIOError: unable to read data """ try: - return os.read(self.fd, num_bytes) + if self.fd: + return os.read(self.fd, num_bytes) + else: + raise VCPIOError("unable read from I2C bus: no open file descriptor") except OSError as e: raise VCPIOError("unable to read from I2C bus") from e - def write_bytes(self, data: bytes): + def write_bytes(self, data: bytes | bytearray): """ Writes bytes to the I2C bus. @@ -374,12 +378,15 @@ def write_bytes(self, data: bytes): VCPIOError: unable to write data """ try: - os.write(self.fd, data) + if self.fd: + os.write(self.fd, data) + else: + VCPIOError("unable write to I2C bus: no open file descriptor found") except OSError as e: raise VCPIOError("unable write to I2C bus") from e -def get_vcps() -> List[LinuxVCP]: +def get_vcps() -> List[VCP]: """ Interrogates I2C buses to determine if they are DDC-CI capable. diff --git a/monitorcontrol/vcp/vcp_windows.py b/monitorcontrol/vcp/vcp_windows.py index c15abf1..d03f944 100644 --- a/monitorcontrol/vcp/vcp_windows.py +++ b/monitorcontrol/vcp/vcp_windows.py @@ -16,6 +16,14 @@ WCHAR, ) +# Move some aliases here so we can have our type +# ignoring in one place + +# pyrefly: ignore[missing-attribute] +c_windll = ctypes.windll +# pyrefly: ignore[missing-attribute] +c_formaterror = ctypes.FormatError + # structure type for a physical monitor class PhysicalMonitor(ctypes.Structure): @@ -37,11 +45,12 @@ def __init__(self, handle: HANDLE, description: str): description: Text description of the physical monitor. """ self.logger = logging.getLogger(__name__) - self.handle = handle + self.handle_p = handle + self.handle = int(handle) self.description = description def __del__(self): - WindowsVCP._destroy_physical_monitor(self.handle) + WindowsVCP._destroy_physical_monitor(self.handle_p) def __enter__(self): pass @@ -70,10 +79,10 @@ def set_vcp_feature(self, code: int, value: int): extra=dict(code=code, value=value), ) try: - if not ctypes.windll.dxva2.SetVCPFeature( - HANDLE(self.handle), BYTE(code), DWORD(value) + if not c_windll.dxva2.SetVCPFeature( + HANDLE(int(self.handle)), BYTE(code), DWORD(value) ): - raise VCPError("failed to set VCP feature: " + ctypes.FormatError()) + raise VCPError("failed to set VCP feature: " + c_formaterror()) except OSError as e: raise VCPError("failed to close handle") from e @@ -97,14 +106,14 @@ def get_vcp_feature(self, code: int) -> Tuple[int, int]: extra=dict(code=code), ) try: - if not ctypes.windll.dxva2.GetVCPFeatureAndVCPFeatureReply( + if not c_windll.dxva2.GetVCPFeatureAndVCPFeatureReply( HANDLE(self.handle), BYTE(code), None, ctypes.byref(feature_current), ctypes.byref(feature_max), ): - raise VCPError("failed to get VCP feature: " + ctypes.FormatError()) + raise VCPError("failed to get VCP feature: " + c_formaterror()) except OSError as e: raise VCPError("failed to get VCP feature") from e self.logger.debug( @@ -131,20 +140,16 @@ def get_vcp_capabilities(self): cap_length = DWORD() self.logger.debug("GetCapabilitiesStringLength") try: - if not ctypes.windll.dxva2.GetCapabilitiesStringLength( + if not c_windll.dxva2.GetCapabilitiesStringLength( HANDLE(self.handle), ctypes.byref(cap_length) ): - raise VCPError( - "failed to get VCP capabilities: " + ctypes.FormatError() - ) + raise VCPError("failed to get VCP capabilities: " + c_formaterror()) cap_string = (ctypes.c_char * cap_length.value)() self.logger.debug("CapabilitiesRequestAndCapabilitiesReply") - if not ctypes.windll.dxva2.CapabilitiesRequestAndCapabilitiesReply( + if not c_windll.dxva2.CapabilitiesRequestAndCapabilitiesReply( HANDLE(self.handle), cap_string, cap_length ): - raise VCPError( - "failed to get VCP capabilities: " + ctypes.FormatError() - ) + raise VCPError("failed to get VCP capabilities: " + c_formaterror()) except OSError as e: raise VCPError("failed to get VCP capabilities") from e return cap_string.value.decode("ascii") @@ -167,7 +172,7 @@ def _get_hmonitors() -> List[HMONITOR]: """ Calls the Windows `EnumDisplayMonitors` API in Python-friendly form. """ - hmonitors = [] # type: List[HMONITOR] + hmonitors: List[HMONITOR] = [] try: def _callback(hmonitor, hdc, lprect, lparam): @@ -175,11 +180,12 @@ def _callback(hmonitor, hdc, lprect, lparam): del hmonitor, hdc, lprect, lparam return True # continue enumeration + # pyrefly: ignore[missing-attribute] MONITORENUMPROC = ctypes.WINFUNCTYPE( # noqa: N806 BOOL, HMONITOR, HDC, ctypes.POINTER(RECT), LPARAM ) callback = MONITORENUMPROC(_callback) - if not ctypes.windll.user32.EnumDisplayMonitors(0, 0, callback, 0): + if not c_windll.user32.EnumDisplayMonitors(0, 0, callback, 0): raise VCPError("Call to EnumDisplayMonitors failed") except OSError as e: raise VCPError("failed to enumerate VCPs") from e @@ -194,12 +200,13 @@ def _physical_monitors_from_hmonitor( """ num_physical = DWORD() try: - if not ctypes.windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR( + # pyrefly: ignore[missing-attribute] + if not c_windll.dxva2.GetNumberOfPhysicalMonitorsFromHMONITOR( hmonitor, ctypes.byref(num_physical) ): raise VCPError( "Call to GetNumberOfPhysicalMonitorsFromHMONITOR failed: " - + ctypes.FormatError() + + c_formaterror() ) except OSError as e: raise VCPError( @@ -208,17 +215,16 @@ def _physical_monitors_from_hmonitor( physical_monitors = (PhysicalMonitor * num_physical.value)() try: - if not ctypes.windll.dxva2.GetPhysicalMonitorsFromHMONITOR( + if not c_windll.dxva2.GetPhysicalMonitorsFromHMONITOR( hmonitor, num_physical.value, physical_monitors ): raise VCPError( - "Call to GetPhysicalMonitorsFromHMONITOR failed: " - + ctypes.FormatError() + "Call to GetPhysicalMonitorsFromHMONITOR failed: " + c_formaterror() ) except OSError as e: raise VCPError("failed to open physical monitor handle") from e return ( - [physical_monitor.handle, physical_monitor.description] + (physical_monitor.handle, physical_monitor.description) for physical_monitor in physical_monitors ) @@ -228,9 +234,9 @@ def _destroy_physical_monitor(handle: HANDLE) -> None: Calls the Windows `DestroyPhysicalMonitor` API in Python-friendly form. """ try: - if not ctypes.windll.dxva2.DestroyPhysicalMonitor(handle): + if not c_windll.dxva2.DestroyPhysicalMonitor(handle): raise VCPError( - "Call to DestroyPhysicalMonitor failed: " + ctypes.FormatError() + "Call to DestroyPhysicalMonitor failed: " + c_formaterror() ) except OSError as e: raise VCPError("failed to close handle") from e diff --git a/pyproject.toml b/pyproject.toml index bcd4295..ef9d2ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,9 +10,17 @@ dependencies = [ "pyudev>=0.23.3 ; sys_platform != 'win32'", ] +[tool.pyrefly] +project-includes = ["**/*"] +project-excludes = [ + "**/__pycache__", + "**/*venv/**/*", +] + [dependency-groups] dev = [ "coveralls>=4.0.1", + "pyrefly>=v0.37.0", "pytest>=8.3.4", "pytest-cov>=6.0.0", "ruff>=0.11.11", diff --git a/tests/test_cli.py b/tests/test_cli.py index e804690..eca2db7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,4 +1,5 @@ from .test_monitorcontrol import UnitTestVCP +import monitorcontrol.__main__ from monitorcontrol import Monitor from monitorcontrol.__main__ import main, count_to_level from unittest import mock diff --git a/tests/test_monitorcontrol.py b/tests/test_monitorcontrol.py index 7255ea2..13e8a14 100644 --- a/tests/test_monitorcontrol.py +++ b/tests/test_monitorcontrol.py @@ -52,7 +52,8 @@ def __exit__( def test_context_manager_assert(): - m = Monitor(None) + vcps = get_test_vcps() + m = Monitor(vcps[0]) with pytest.raises(AssertionError): m.get_power_mode() @@ -65,7 +66,7 @@ def test_get_monitors(): get_monitors() -def get_test_vcps() -> List[Type[vcp.VCP]]: +def get_test_vcps() -> List[vcp.VCP]: if USE_ATTACHED_MONITORS: return get_vcps() else: @@ -222,6 +223,7 @@ def test_input_source_issue_59(monitor: Monitor): def test_input_source_type_error(monitor: Monitor): with pytest.raises(TypeError): + # pyrefly: ignore[bad-argument-type] monitor.set_input_source([]) diff --git a/tests/test_windows_vcp.py b/tests/test_windows_vcp.py index 3bddc92..b6833e2 100644 --- a/tests/test_windows_vcp.py +++ b/tests/test_windows_vcp.py @@ -1,6 +1,9 @@ +import sys import pytest from unittest.mock import patch +if not sys.platform.startswith("win"): + pytest.skip("skipping windows-only tests", allow_module_level=True) from monitorcontrol.vcp.vcp_windows import WindowsVCP