From 48233a4260daeefdf1b18739a6b9cd9c99bd0c1c Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Fri, 6 Jun 2025 23:13:48 +0200 Subject: [PATCH 1/3] fixed error with PacketLowLevelChannelConfigAck result --- examples/low_level/example_low_level.py | 42 ++-- pyproject.toml | 2 +- src/__main__.py | 182 +++++++++--------- .../low_level/low_level_channel_config.py | 10 +- .../low_level/low_level_types.py | 8 - src/science_mode_4/protocol/types.py | 22 +++ 6 files changed, 145 insertions(+), 121 deletions(-) diff --git a/examples/low_level/example_low_level.py b/examples/low_level/example_low_level.py index b1aa297..5029bc7 100644 --- a/examples/low_level/example_low_level.py +++ b/examples/low_level/example_low_level.py @@ -9,31 +9,32 @@ from science_mode_4 import LayerLowLevel from science_mode_4 import Connector, Channel from science_mode_4 import LowLevelHighVoltageSource, LowLevelMode +from science_mode_4.low_level.low_level_channel_config import PacketLowLevelChannelConfigAck +from science_mode_4.protocol.commands import Commands from examples.utils.example_utils import ExampleUtils, KeyboardInputThread -def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector): - """Sends channel update""" - # device can store up to 10 channel config commands - for channel in Channel: - # send_channel_config does not wait for an acknowledge - low_level_layer.send_channel_config(True, channel, connector, - [ChannelPoint(4000, 20), ChannelPoint(4000, -20), - ChannelPoint(4000, 0)]) - - async def main() -> int: """Main function""" + stim_current = 10 + # keyboard is our trigger to start specific stimulation def input_callback(input_value: str) -> bool: """Callback call from keyboard input thread""" print(f"Input value {input_value}") + nonlocal stim_current if input_value == "1": send_channel_config(low_level_layer, Connector.GREEN) elif input_value == "2": send_channel_config(low_level_layer, Connector.YELLOW) + elif input_value == "+": + stim_current += 0.5 + print(f"Current: {stim_current}") + elif input_value == "-": + stim_current -= 0.5 + print(f"Current: {stim_current}") elif input_value == "q": # end keyboard input thread return True @@ -42,7 +43,21 @@ def input_callback(input_value: str) -> bool: return False - print("Usage: press 1 or 2 to stimulate green or yellow connector or press q to quit") + + def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector): + """Sends channel update""" + # device can store up to 10 channel config commands + for channel in Channel: + # send_channel_config does not wait for an acknowledge + low_level_layer.send_channel_config(True, channel, connector, + [ChannelPoint(1000, stim_current), ChannelPoint(4000, 0), + ChannelPoint(1000, -stim_current)]) + + + print("Usage:") + print("Press 1 or 2 to stimulate green or yellow connector") + print("Press + or - to increase or decrease current") + print("Press q to quit") # create keyboard input thread for non blocking console input keyboard_input_thread = KeyboardInputThread(input_callback) @@ -69,10 +84,11 @@ def input_callback(input_value: str) -> bool: while keyboard_input_thread.is_alive(): # get new packets from connection ack = low_level_layer.packet_buffer.get_packet_from_buffer() - if ack: + if ack and ack.command == Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK: + cca: PacketLowLevelChannelConfigAck = ack # do something with packet ack # here we print that an acknowledge arrived - print(ack) + print(f"Connector: {cca.connector}, channel: {cca.channel}, result: {cca.result.name}") await asyncio.sleep(0.1) diff --git a/pyproject.toml b/pyproject.toml index 501e2f6..089ae16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "science_mode_4" -version = "0.0.15" +version = "0.0.16" authors = [ { name="Marc Hofmann", email="marc-hofmann@gmx.de" }, ] diff --git a/src/__main__.py b/src/__main__.py index 2a2fb7c..7f360d5 100644 --- a/src/__main__.py +++ b/src/__main__.py @@ -1,116 +1,110 @@ """Test program how to use library without installing the library, DO NOT USE THIS FILE, USE EXAMPLES INSTEAD""" -import logging import sys import asyncio -from science_mode_4.device_i24 import DeviceI24 -from science_mode_4.dyscom.dyscom_get_file_by_name import PacketDyscomGetAckFileByName -from science_mode_4.dyscom.dyscom_get_operation_mode import PacketDyscomGetAckOperationMode -from science_mode_4.dyscom.dyscom_layer import LayerDyscom -from science_mode_4.dyscom.dyscom_send_file import PacketDyscomSendFile -from science_mode_4.dyscom.dyscom_types import DyscomGetType +from science_mode_4 import DeviceP24 +from science_mode_4.low_level.low_level_channel_config import PacketLowLevelChannelConfigAck +from science_mode_4.protocol import ChannelPoint from science_mode_4.protocol.commands import Commands -from science_mode_4.utils import logger -from science_mode_4.utils.serial_port_connection import SerialPortConnection - +from science_mode_4.utils import SerialPortConnection +from science_mode_4.low_level import LayerLowLevel +from science_mode_4.protocol import Connector, Channel +from science_mode_4.low_level import LowLevelHighVoltageSource, LowLevelMode async def main() -> int: """Main function""" - # logger().disabled = True - logger().setLevel(logging.DEBUG) - - devices = SerialPortConnection.list_science_mode_device_ports() - connection = SerialPortConnection(devices[0].device) - # devices = UsbConnection.list_science_mode_devices() - # connection = UsbConnection(devices[0]) - # connection = NullConnection() + # logger().setLevel(logging.DEBUG) + current = 70 + + # # keyboard is our trigger to start specific stimulation + # def input_callback(input_value: str) -> bool: + # """Callback call from keyboard input thread""" + # print(f"Input value {input_value}") + + # nonlocal current + # if input_value == "1": + # send_channel_config(low_level_layer, Connector.GREEN) + # elif input_value == "2": + # send_channel_config(low_level_layer, Connector.YELLOW) + # elif input_value == "+": + # current += 0.5 + # print(f"current: {current}") + # elif input_value == "-": + # current -= 0.5 + # print(f"current: {current}") + # elif input_value == "q": + # # end keyboard input thread + # return True + # else: + # print("Invalid command") + + # return False + + + def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector): + """Sends channel update""" + # device can store up to 10 channel config commands + for channel in Channel: + # send_channel_config does not wait for an acknowledge + low_level_layer.send_channel_config(True, channel, connector, + [ChannelPoint(1000, current), ChannelPoint(4000, 0), + ChannelPoint(1000, -current)]) + + + print("Usage:") + print("Press 1 or 2 to stimulate green or yellow connector") + print("Press + or - to increase or decrease current") + print("Press q to quit") + # create keyboard input thread for non blocking console input + # keyboard_input_thread = KeyboardInputThread(input_callback) + + # get comport from command line argument + # com_port = ExampleUtils.get_comport_from_commandline_argument() + # create serial port connection + connection = SerialPortConnection(SerialPortConnection.list_science_mode_device_ports()[0].device) + # open connection, now we can read and write data connection.open() - device = DeviceI24(connection) + # create science mode device + device = DeviceP24(connection) + # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement + # to have a defined state await device.initialize() - general = device.get_layer_general() - await general.get_version() - # get dyscom layer to call dyscom level commands - dyscom = device.get_layer_dyscom() - - device_id = await dyscom.get_device_id() - - # await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_ON) - # await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_ON) - # params = DyscomInitParams() - # params.flags = [DyscomInitFlag.ENABLE_SD_STORAGE_MODE] - # init_ack = await dyscom.init(params) - - # dyscom.send_start() - # await asyncio.sleep(1) - # dyscom.send_get_operation_mode() - # await asyncio.sleep(4) - # dyscom.send_stop() - # await asyncio.sleep(5) - # process_ack(dyscom) - - # mmi = await dyscom.get_list_of_measurement_meta_info() - - # # get calibration file - calibration_filename = f"rehaingest_{device_id}.cal" - await dyscom.get_file_info(calibration_filename) - - # p = PacketDyscomGetFileByName(calibration_filename) - # dyscom.send_packet(p) - await dyscom.get_file_by_name(calibration_filename) - - # dyscom.send_send_file(get_file_by_name_ack.block_offset) - # for x in range(get_file_by_name_ack.number_of_blocks): - # dyscom.send_send_file(get_file_by_name_ack.block_offset + x) - - # meas_info = await dyscom.get_file_info(init_ack.measurement_file_id) - # await dyscom.get_operation_mode() - - # p = PacketDyscomGetFileByName(init_ack.measurement_file_id) - # dyscom.send_packet(p) - # dyscom.send_get_operation_mode() - # get_file_by_name_ack = await dyscom.get_file_by_name(init_ack.measurement_file_id) - # await dyscom.get_operation_mode() - - await asyncio.sleep(5) - offset = process_ack(dyscom) - dyscom.send_send_file(offset) - await asyncio.sleep(5) - process_ack(dyscom) - + # get low level layer to call low level commands + low_level_layer = device.get_layer_low_level() + + # call init low level + await low_level_layer.init(LowLevelMode.NO_MEASUREMENT, LowLevelHighVoltageSource.STANDARD) + + # now we can start stimulation + # while keyboard_input_thread.is_alive(): + for x in range(500): + if x % 100 == 0: + send_channel_config(low_level_layer, Connector.YELLOW) + # get new packets from connection + ack = low_level_layer.packet_buffer.get_packet_from_buffer() + if ack and ack.command == Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK: + cca: PacketLowLevelChannelConfigAck = ack + # do something with packet ack + # here we print that an acknowledge arrived + print(cca.result.name) + + await asyncio.sleep(0.1) + + # wait until all acknowledges are received + await asyncio.sleep(0.5) + # call stop low level + await low_level_layer.stop() + + # close serial port connection connection.close() - return 0 -def process_ack(dyscom: LayerDyscom) -> int: - """Process all packets read from connection buffer""" - offset = 0 - while True: - # process all available packages - ack = dyscom.packet_buffer.get_packet_from_buffer() - print(ack) - if ack: - if ack.command == Commands.DL_SEND_FILE: - send_file: PacketDyscomSendFile = ack - data = send_file.data - print(data) - elif ack.command == Commands.DL_GET_ACK and ack.kind == DyscomGetType.OPERATION_MODE: - op_mode: PacketDyscomGetAckOperationMode = ack - print(op_mode.operation_mode.name) - elif ack.command == Commands.DL_GET_ACK and ack.kind == DyscomGetType.FILE_BY_NAME: - fbn: PacketDyscomGetAckFileByName = ack - print(fbn.block_offset) - offset = fbn.block_offset - else: - break - - return offset - if __name__ == "__main__": res = asyncio.run(main()) diff --git a/src/science_mode_4/low_level/low_level_channel_config.py b/src/science_mode_4/low_level/low_level_channel_config.py index 1f9a07d..a79f45b 100644 --- a/src/science_mode_4/low_level/low_level_channel_config.py +++ b/src/science_mode_4/low_level/low_level_channel_config.py @@ -1,11 +1,11 @@ """Provides packet classes for low level channel config""" from science_mode_4.protocol.commands import Commands -from science_mode_4.protocol.types import Connector, Channel +from science_mode_4.protocol.types import Connector, Channel, ResultAndError from science_mode_4.protocol.packet import Packet, PacketAck from science_mode_4.protocol.channel_point import ChannelPoint from science_mode_4.utils.byte_builder import ByteBuilder -from .low_level_types import LowLevelMode, LowLevelResult +from .low_level_types import LowLevelMode class PacketLowLevelChannelConfig(Packet): @@ -90,7 +90,7 @@ class PacketLowLevelChannelConfigAck(PacketAck): def __init__(self, data: bytes): super().__init__(data) self._command = Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK - self._result = LowLevelResult.SUCCESSFUL + self._result = ResultAndError.NO_ERROR self._connector = 0 self._channel = 0 self._mode = LowLevelMode.NO_MEASUREMENT @@ -101,7 +101,7 @@ def __init__(self, data: bytes): if not data is None: bb = ByteBuilder() bb.append_bytes(data) - self._result = LowLevelResult(data[0]) + self._result = ResultAndError(data[0]) self._channel = bb.get_bit_from_position(8, 4) self._connector = bb.get_bit_from_position(12, 4) self._mode = LowLevelMode(data[2]) @@ -117,7 +117,7 @@ def __init__(self, data: bytes): @property - def result(self) -> LowLevelResult: + def result(self) -> ResultAndError: """Getter for result""" return self._result diff --git a/src/science_mode_4/low_level/low_level_types.py b/src/science_mode_4/low_level/low_level_types.py index 177d59d..1348b00 100644 --- a/src/science_mode_4/low_level/low_level_types.py +++ b/src/science_mode_4/low_level/low_level_types.py @@ -3,14 +3,6 @@ from enum import IntEnum -class LowLevelResult(IntEnum): - """Represent low level type for result""" - SUCCESSFUL = 0 - TRANSFER_ERROR = 1 - PARAMETER_ERROR = 2 - TIMEOUT_STIMULATION = 3 - - class LowLevelMode(IntEnum): """Represent low level type for measurement""" NO_MEASUREMENT = 0 diff --git a/src/science_mode_4/protocol/types.py b/src/science_mode_4/protocol/types.py index c875406..ade1010 100644 --- a/src/science_mode_4/protocol/types.py +++ b/src/science_mode_4/protocol/types.py @@ -22,9 +22,31 @@ class ResultAndError(IntEnum): NO_ERROR = 0 TRANSFER_ERROR = 1 PARAMETER_ERROR = 2 + PROTOCOL_ERROR = 3 + UC_STIM_TIMEOUT_ERROR = 4 + EMG_TIMEOUT_ERROR = 5 + EMG_REGISTER_ERROR = 6 NOT_INITIALIZED = 7 + HV_ERROR = 8 + DEMUX_TIMEOUT_ERROR = 9 ELECTRODE_ERROR = 10 + INVALID_CMD_ERROR = 11 + DEMUX_PARAMETER_ERROR = 12 + DEMUX_NOT_INITIALIZED_ERROR = 13 + DEMUX_TRANSFER_ERROR = 14 + DEMUX_UNKNOWN_ACK_ERROR = 15 PULSE_TIMEOUT_ERROR = 16 + FUEL_GAUGE_ERROR = 17 + LIVE_SIGNAL_ERROR = 18 + FILE_TRANSMISSION_TIMEOUT = 19 + FILE_NOT_FOUND = 20 + BUSY = 21 + FILE_ERROR = 22 + FLASH_ERASE_ERROR = 23 + FLASH_WRITE_ERROR = 24 + UNKNOWN_CONTROLLER_ERROR = 25 + FIRMWARE_TOO_LARGE_ERROR = 26 + FUEL_GAUGE_NOT_PROGRAMMED = 27 PULSE_LOW_CURRENT_ERROR = 28 From bcd69aba20acd70e00a87dbbcbee030824b8c96c Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Fri, 6 Jun 2025 23:16:06 +0200 Subject: [PATCH 2/3] fixed import --- examples/low_level/example_low_level.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/low_level/example_low_level.py b/examples/low_level/example_low_level.py index 5029bc7..c3eb611 100644 --- a/examples/low_level/example_low_level.py +++ b/examples/low_level/example_low_level.py @@ -9,8 +9,8 @@ from science_mode_4 import LayerLowLevel from science_mode_4 import Connector, Channel from science_mode_4 import LowLevelHighVoltageSource, LowLevelMode -from science_mode_4.low_level.low_level_channel_config import PacketLowLevelChannelConfigAck -from science_mode_4.protocol.commands import Commands +from science_mode_4 import PacketLowLevelChannelConfigAck +from science_mode_4 import Commands from examples.utils.example_utils import ExampleUtils, KeyboardInputThread From 7d6821ec135f31da01816c662b5b1b633e691f03 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Fri, 6 Jun 2025 23:21:27 +0200 Subject: [PATCH 3/3] added changes to readme --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b8dba74..4f5dcf6 100644 --- a/README.md +++ b/README.md @@ -117,4 +117,7 @@ Python 3.11 or higher ## 0.0.15 - Clarified readme -- Changed current for ChannelPoint from int to float \ No newline at end of file +- Changed current for ChannelPoint from int to float + +## 0.0.16 +- Fixed error with PacketLowLevelChannelConfigAck result \ No newline at end of file