Skip to content

Commit 209bdff

Browse files
authored
Type DecodePDU. (#2392)
1 parent b907133 commit 209bdff

File tree

5 files changed

+34
-43
lines changed

5 files changed

+34
-43
lines changed

pymodbus/client/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async def connect(self) -> bool:
6666
)
6767
return await self.ctx.connect()
6868

69-
def register(self, custom_response_class: ModbusPDU) -> None:
69+
def register(self, custom_response_class: type[ModbusPDU]) -> None:
7070
"""Register a custom response class with the decoder (call **sync**).
7171
7272
:param custom_response_class: (optional) Modbus response class.
@@ -197,7 +197,7 @@ def __init__(
197197
# ----------------------------------------------------------------------- #
198198
# Client external interface
199199
# ----------------------------------------------------------------------- #
200-
def register(self, custom_response_class: ModbusPDU) -> None:
200+
def register(self, custom_response_class: type[ModbusPDU]) -> None:
201201
"""Register a custom response class with the decoder.
202202
203203
:param custom_response_class: (optional) Modbus response class.

pymodbus/pdu/decoders.py

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Modbus Request/Response Decoders."""
2-
from collections.abc import Callable
2+
from __future__ import annotations
33

44
import pymodbus.pdu.bit_read_message as bit_r_msg
55
import pymodbus.pdu.bit_write_message as bit_w_msg
@@ -17,7 +17,7 @@
1717
class DecodePDU:
1818
"""Decode pdu requests/responses (server/client)."""
1919

20-
_pdu_class_table = {
20+
_pdu_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
2121
(reg_r_msg.ReadHoldingRegistersRequest, reg_r_msg.ReadHoldingRegistersResponse),
2222
(bit_r_msg.ReadDiscreteInputsRequest, bit_r_msg.ReadDiscreteInputsResponse),
2323
(reg_r_msg.ReadInputRegistersRequest, reg_r_msg.ReadInputRegistersResponse),
@@ -39,7 +39,7 @@ class DecodePDU:
3939
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
4040
}
4141

42-
_pdu_sub_class_table = [
42+
_pdu_sub_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
4343
(diag_msg.ReturnQueryDataRequest, diag_msg.ReturnQueryDataResponse),
4444
(diag_msg.RestartCommunicationsOptionRequest, diag_msg.RestartCommunicationsOptionResponse),
4545
(diag_msg.ReturnDiagnosticRegisterRequest, diag_msg.ReturnDiagnosticRegisterResponse),
@@ -58,57 +58,51 @@ class DecodePDU:
5858
(diag_msg.ClearOverrunCountRequest, diag_msg.ClearOverrunCountResponse),
5959
(diag_msg.GetClearModbusPlusRequest, diag_msg.GetClearModbusPlusResponse),
6060
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
61-
]
61+
}
6262

6363
def __init__(self, is_server: bool) -> None:
6464
"""Initialize function_tables."""
6565
inx = 0 if is_server else 1
66-
self.lookup: dict[int, Callable] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table} # type: ignore[attr-defined]
67-
self.sub_lookup: dict[int, dict[int, Callable]] = {f: {} for f in self.lookup}
66+
self.lookup: dict[int, type[base.ModbusPDU]] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table}
67+
self.sub_lookup: dict[int, dict[int, type[base.ModbusPDU]]] = {f: {} for f in self.lookup}
6868
for f in self._pdu_sub_class_table:
69-
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx] # type: ignore[attr-defined]
69+
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx]
7070

71-
def lookupPduClass(self, function_code):
71+
def lookupPduClass(self, function_code: int) -> type[base.ModbusPDU]:
7272
"""Use `function_code` to determine the class of the PDU."""
7373
return self.lookup.get(function_code, base.ExceptionResponse)
7474

75-
def register(self, function):
75+
def register(self, custom_class: type[base.ModbusPDU]) -> None:
7676
"""Register a function and sub function class with the decoder."""
77-
if not issubclass(function, base.ModbusPDU):
77+
if not issubclass(custom_class, base.ModbusPDU):
7878
raise MessageRegisterException(
79-
f'"{function.__class__.__name__}" is Not a valid Modbus Message'
79+
f'"{custom_class.__class__.__name__}" is Not a valid Modbus Message'
8080
". Class needs to be derived from "
8181
"`pymodbus.pdu.ModbusPDU` "
8282
)
83-
self.lookup[function.function_code] = function
84-
if hasattr(function, "sub_function_code"):
85-
if function.function_code not in self.sub_lookup:
86-
self.sub_lookup[function.function_code] = {}
87-
self.sub_lookup[function.function_code][
88-
function.sub_function_code
89-
] = function
83+
self.lookup[custom_class.function_code] = custom_class
84+
if custom_class.sub_function_code >= 0:
85+
if custom_class.function_code not in self.sub_lookup:
86+
self.sub_lookup[custom_class.function_code] = {}
87+
self.sub_lookup[custom_class.function_code][
88+
custom_class.sub_function_code
89+
] = custom_class
9090

91-
def decode(self, frame):
91+
def decode(self, frame: bytes) -> base.ModbusPDU | None:
9292
"""Decode a frame."""
9393
try:
9494
if (function_code := int(frame[0])) > 0x80:
95-
pdu = base.ExceptionResponse(function_code & 0x7F)
96-
pdu.decode(frame[1:])
97-
return pdu
98-
if (pdu := self.lookup.get(function_code, lambda: None)()):
99-
fc_string = "{}: {}".format( # pylint: disable=consider-using-f-string
100-
str(self.lookup[function_code]) # pylint: disable=use-maxsplit-arg
101-
.split(".")[-1]
102-
.rstrip('">"'),
103-
function_code,
104-
)
105-
Log.debug("decode PDU for {}", fc_string)
106-
else:
95+
pdu_exp = base.ExceptionResponse(function_code & 0x7F)
96+
pdu_exp.decode(frame[1:])
97+
return pdu_exp
98+
if not (pdu_type := self.lookup.get(function_code, None)):
10799
Log.debug("decode PDU failed for function code {}", function_code)
108100
raise ModbusException(f"Unknown response {function_code}")
101+
pdu = pdu_type(0, 0, False)
102+
Log.debug("decode PDU for {}", function_code)
109103
pdu.decode(frame[1:])
110104

111-
if hasattr(pdu, "sub_function_code"):
105+
if pdu.sub_function_code >= 0:
112106
lookup = self.sub_lookup.get(pdu.function_code, {})
113107
if subtype := lookup.get(pdu.sub_function_code, None):
114108
pdu.__class__ = subtype

pymodbus/pdu/diag_message.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ def decode(self, data):
6161
6262
:param data: The data to decode into the function code
6363
"""
64-
(
65-
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
66-
) = struct.unpack(">H", data[:2])
64+
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
6765
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
6866
self.message = data[2:]
6967
else:
@@ -123,9 +121,7 @@ def decode(self, data):
123121
124122
:param data: The data to decode into the function code
125123
"""
126-
(
127-
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
128-
) = struct.unpack(">H", data[:2])
124+
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
129125
data = data[2:]
130126
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
131127
self.message = data

pymodbus/pdu/pdu.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class ModbusPDU:
1313
"""Base class for all Modbus messages."""
1414

1515
function_code: int = 0
16+
sub_function_code: int = -1
1617
_rtu_frame_size: int = 0
1718
_rtu_byte_count_pos: int = 0
1819

pymodbus/server/simulator/http_server.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def build_html_calls(self, params: dict, html: str) -> str:
390390
for function in self.request_lookup.values():
391391
selected = (
392392
"selected"
393-
if function.function_code == self.call_monitor.function #type: ignore[attr-defined]
393+
if function.function_code == self.call_monitor.function
394394
else ""
395395
)
396396
function_codes += f"<option value={function.function_code} {selected}>{function.function_code_name}</option>" #type: ignore[attr-defined]
@@ -556,9 +556,9 @@ def build_json_calls(self, params: dict) -> dict:
556556
function_codes = []
557557
for function in self.request_lookup.values():
558558
function_codes.append({
559-
"value": function.function_code, # type: ignore[attr-defined]
559+
"value": function.function_code,
560560
"text": function.function_code_name, # type: ignore[attr-defined]
561-
"selected": function.function_code == self.call_monitor.function # type: ignore[attr-defined]
561+
"selected": function.function_code == self.call_monitor.function
562562
})
563563

564564
simulation_action = "ACTIVE" if self.call_response.active != RESPONSE_INACTIVE else ""

0 commit comments

Comments
 (0)