Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,7 @@ Python 3.11 or higher

## 0.0.15
- Clarified readme
- Changed current for ChannelPoint from int to float
- Changed current for ChannelPoint from int to float

## 0.0.16
- Fixed error with PacketLowLevelChannelConfigAck result
42 changes: 29 additions & 13 deletions examples/low_level/example_low_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,32 @@
from science_mode_4 import LayerLowLevel
from science_mode_4 import Connector, Channel
from science_mode_4 import LowLevelHighVoltageSource, LowLevelMode
from science_mode_4 import PacketLowLevelChannelConfigAck
from science_mode_4 import Commands
from examples.utils.example_utils import ExampleUtils, KeyboardInputThread


def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector):
"""Sends channel update"""
# device can store up to 10 channel config commands
for channel in Channel:
# send_channel_config does not wait for an acknowledge
low_level_layer.send_channel_config(True, channel, connector,
[ChannelPoint(4000, 20), ChannelPoint(4000, -20),
ChannelPoint(4000, 0)])


async def main() -> int:
"""Main function"""

stim_current = 10

# keyboard is our trigger to start specific stimulation
def input_callback(input_value: str) -> bool:
"""Callback call from keyboard input thread"""
print(f"Input value {input_value}")

nonlocal stim_current
if input_value == "1":
send_channel_config(low_level_layer, Connector.GREEN)
elif input_value == "2":
send_channel_config(low_level_layer, Connector.YELLOW)
elif input_value == "+":
stim_current += 0.5
print(f"Current: {stim_current}")
elif input_value == "-":
stim_current -= 0.5
print(f"Current: {stim_current}")
elif input_value == "q":
# end keyboard input thread
return True
Expand All @@ -42,7 +43,21 @@ def input_callback(input_value: str) -> bool:

return False

print("Usage: press 1 or 2 to stimulate green or yellow connector or press q to quit")

def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector):
"""Sends channel update"""
# device can store up to 10 channel config commands
for channel in Channel:
# send_channel_config does not wait for an acknowledge
low_level_layer.send_channel_config(True, channel, connector,
[ChannelPoint(1000, stim_current), ChannelPoint(4000, 0),
ChannelPoint(1000, -stim_current)])


print("Usage:")
print("Press 1 or 2 to stimulate green or yellow connector")
print("Press + or - to increase or decrease current")
print("Press q to quit")
# create keyboard input thread for non blocking console input
keyboard_input_thread = KeyboardInputThread(input_callback)

Expand All @@ -69,10 +84,11 @@ def input_callback(input_value: str) -> bool:
while keyboard_input_thread.is_alive():
# get new packets from connection
ack = low_level_layer.packet_buffer.get_packet_from_buffer()
if ack:
if ack and ack.command == Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK:
cca: PacketLowLevelChannelConfigAck = ack
# do something with packet ack
# here we print that an acknowledge arrived
print(ack)
print(f"Connector: {cca.connector}, channel: {cca.channel}, result: {cca.result.name}")

await asyncio.sleep(0.1)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "science_mode_4"
version = "0.0.15"
version = "0.0.16"
authors = [
{ name="Marc Hofmann", email="marc-hofmann@gmx.de" },
]
Expand Down
182 changes: 88 additions & 94 deletions src/__main__.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,110 @@
"""Test program how to use library without installing the library,
DO NOT USE THIS FILE, USE EXAMPLES INSTEAD"""

import logging
import sys
import asyncio

from science_mode_4.device_i24 import DeviceI24
from science_mode_4.dyscom.dyscom_get_file_by_name import PacketDyscomGetAckFileByName
from science_mode_4.dyscom.dyscom_get_operation_mode import PacketDyscomGetAckOperationMode
from science_mode_4.dyscom.dyscom_layer import LayerDyscom
from science_mode_4.dyscom.dyscom_send_file import PacketDyscomSendFile
from science_mode_4.dyscom.dyscom_types import DyscomGetType
from science_mode_4 import DeviceP24
from science_mode_4.low_level.low_level_channel_config import PacketLowLevelChannelConfigAck
from science_mode_4.protocol import ChannelPoint
from science_mode_4.protocol.commands import Commands
from science_mode_4.utils import logger
from science_mode_4.utils.serial_port_connection import SerialPortConnection

from science_mode_4.utils import SerialPortConnection
from science_mode_4.low_level import LayerLowLevel
from science_mode_4.protocol import Connector, Channel
from science_mode_4.low_level import LowLevelHighVoltageSource, LowLevelMode


async def main() -> int:
"""Main function"""

# logger().disabled = True
logger().setLevel(logging.DEBUG)

devices = SerialPortConnection.list_science_mode_device_ports()
connection = SerialPortConnection(devices[0].device)
# devices = UsbConnection.list_science_mode_devices()
# connection = UsbConnection(devices[0])
# connection = NullConnection()
# logger().setLevel(logging.DEBUG)
current = 70

# # keyboard is our trigger to start specific stimulation
# def input_callback(input_value: str) -> bool:
# """Callback call from keyboard input thread"""
# print(f"Input value {input_value}")

# nonlocal current
# if input_value == "1":
# send_channel_config(low_level_layer, Connector.GREEN)
# elif input_value == "2":
# send_channel_config(low_level_layer, Connector.YELLOW)
# elif input_value == "+":
# current += 0.5
# print(f"current: {current}")
# elif input_value == "-":
# current -= 0.5
# print(f"current: {current}")
# elif input_value == "q":
# # end keyboard input thread
# return True
# else:
# print("Invalid command")

# return False


def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector):
"""Sends channel update"""
# device can store up to 10 channel config commands
for channel in Channel:
# send_channel_config does not wait for an acknowledge
low_level_layer.send_channel_config(True, channel, connector,
[ChannelPoint(1000, current), ChannelPoint(4000, 0),
ChannelPoint(1000, -current)])


print("Usage:")
print("Press 1 or 2 to stimulate green or yellow connector")
print("Press + or - to increase or decrease current")
print("Press q to quit")
# create keyboard input thread for non blocking console input
# keyboard_input_thread = KeyboardInputThread(input_callback)

# get comport from command line argument
# com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(SerialPortConnection.list_science_mode_device_ports()[0].device)
# open connection, now we can read and write data
connection.open()

device = DeviceI24(connection)
# create science mode device
device = DeviceP24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()

general = device.get_layer_general()
await general.get_version()
# get dyscom layer to call dyscom level commands
dyscom = device.get_layer_dyscom()

device_id = await dyscom.get_device_id()

# await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_ON)
# await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_ON)
# params = DyscomInitParams()
# params.flags = [DyscomInitFlag.ENABLE_SD_STORAGE_MODE]
# init_ack = await dyscom.init(params)

# dyscom.send_start()
# await asyncio.sleep(1)
# dyscom.send_get_operation_mode()
# await asyncio.sleep(4)
# dyscom.send_stop()
# await asyncio.sleep(5)
# process_ack(dyscom)

# mmi = await dyscom.get_list_of_measurement_meta_info()

# # get calibration file
calibration_filename = f"rehaingest_{device_id}.cal"
await dyscom.get_file_info(calibration_filename)

# p = PacketDyscomGetFileByName(calibration_filename)
# dyscom.send_packet(p)
await dyscom.get_file_by_name(calibration_filename)

# dyscom.send_send_file(get_file_by_name_ack.block_offset)
# for x in range(get_file_by_name_ack.number_of_blocks):
# dyscom.send_send_file(get_file_by_name_ack.block_offset + x)

# meas_info = await dyscom.get_file_info(init_ack.measurement_file_id)
# await dyscom.get_operation_mode()

# p = PacketDyscomGetFileByName(init_ack.measurement_file_id)
# dyscom.send_packet(p)
# dyscom.send_get_operation_mode()
# get_file_by_name_ack = await dyscom.get_file_by_name(init_ack.measurement_file_id)
# await dyscom.get_operation_mode()

await asyncio.sleep(5)
offset = process_ack(dyscom)
dyscom.send_send_file(offset)
await asyncio.sleep(5)
process_ack(dyscom)

# get low level layer to call low level commands
low_level_layer = device.get_layer_low_level()

# call init low level
await low_level_layer.init(LowLevelMode.NO_MEASUREMENT, LowLevelHighVoltageSource.STANDARD)

# now we can start stimulation
# while keyboard_input_thread.is_alive():
for x in range(500):
if x % 100 == 0:
send_channel_config(low_level_layer, Connector.YELLOW)
# get new packets from connection
ack = low_level_layer.packet_buffer.get_packet_from_buffer()
if ack and ack.command == Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK:
cca: PacketLowLevelChannelConfigAck = ack
# do something with packet ack
# here we print that an acknowledge arrived
print(cca.result.name)

await asyncio.sleep(0.1)

# wait until all acknowledges are received
await asyncio.sleep(0.5)
# call stop low level
await low_level_layer.stop()

# close serial port connection
connection.close()

return 0

def process_ack(dyscom: LayerDyscom) -> int:
"""Process all packets read from connection buffer"""
offset = 0
while True:
# process all available packages
ack = dyscom.packet_buffer.get_packet_from_buffer()
print(ack)
if ack:
if ack.command == Commands.DL_SEND_FILE:
send_file: PacketDyscomSendFile = ack
data = send_file.data
print(data)
elif ack.command == Commands.DL_GET_ACK and ack.kind == DyscomGetType.OPERATION_MODE:
op_mode: PacketDyscomGetAckOperationMode = ack
print(op_mode.operation_mode.name)
elif ack.command == Commands.DL_GET_ACK and ack.kind == DyscomGetType.FILE_BY_NAME:
fbn: PacketDyscomGetAckFileByName = ack
print(fbn.block_offset)
offset = fbn.block_offset
else:
break

return offset


if __name__ == "__main__":
res = asyncio.run(main())
Expand Down
10 changes: 5 additions & 5 deletions src/science_mode_4/low_level/low_level_channel_config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Provides packet classes for low level channel config"""

from science_mode_4.protocol.commands import Commands
from science_mode_4.protocol.types import Connector, Channel
from science_mode_4.protocol.types import Connector, Channel, ResultAndError
from science_mode_4.protocol.packet import Packet, PacketAck
from science_mode_4.protocol.channel_point import ChannelPoint
from science_mode_4.utils.byte_builder import ByteBuilder
from .low_level_types import LowLevelMode, LowLevelResult
from .low_level_types import LowLevelMode


class PacketLowLevelChannelConfig(Packet):
Expand Down Expand Up @@ -90,7 +90,7 @@ class PacketLowLevelChannelConfigAck(PacketAck):
def __init__(self, data: bytes):
super().__init__(data)
self._command = Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK
self._result = LowLevelResult.SUCCESSFUL
self._result = ResultAndError.NO_ERROR
self._connector = 0
self._channel = 0
self._mode = LowLevelMode.NO_MEASUREMENT
Expand All @@ -101,7 +101,7 @@ def __init__(self, data: bytes):
if not data is None:
bb = ByteBuilder()
bb.append_bytes(data)
self._result = LowLevelResult(data[0])
self._result = ResultAndError(data[0])
self._channel = bb.get_bit_from_position(8, 4)
self._connector = bb.get_bit_from_position(12, 4)
self._mode = LowLevelMode(data[2])
Expand All @@ -117,7 +117,7 @@ def __init__(self, data: bytes):


@property
def result(self) -> LowLevelResult:
def result(self) -> ResultAndError:
"""Getter for result"""
return self._result

Expand Down
8 changes: 0 additions & 8 deletions src/science_mode_4/low_level/low_level_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@
from enum import IntEnum


class LowLevelResult(IntEnum):
"""Represent low level type for result"""
SUCCESSFUL = 0
TRANSFER_ERROR = 1
PARAMETER_ERROR = 2
TIMEOUT_STIMULATION = 3


class LowLevelMode(IntEnum):
"""Represent low level type for measurement"""
NO_MEASUREMENT = 0
Expand Down
22 changes: 22 additions & 0 deletions src/science_mode_4/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,31 @@ class ResultAndError(IntEnum):
NO_ERROR = 0
TRANSFER_ERROR = 1
PARAMETER_ERROR = 2
PROTOCOL_ERROR = 3
UC_STIM_TIMEOUT_ERROR = 4
EMG_TIMEOUT_ERROR = 5
EMG_REGISTER_ERROR = 6
NOT_INITIALIZED = 7
HV_ERROR = 8
DEMUX_TIMEOUT_ERROR = 9
ELECTRODE_ERROR = 10
INVALID_CMD_ERROR = 11
DEMUX_PARAMETER_ERROR = 12
DEMUX_NOT_INITIALIZED_ERROR = 13
DEMUX_TRANSFER_ERROR = 14
DEMUX_UNKNOWN_ACK_ERROR = 15
PULSE_TIMEOUT_ERROR = 16
FUEL_GAUGE_ERROR = 17
LIVE_SIGNAL_ERROR = 18
FILE_TRANSMISSION_TIMEOUT = 19
FILE_NOT_FOUND = 20
BUSY = 21
FILE_ERROR = 22
FLASH_ERASE_ERROR = 23
FLASH_WRITE_ERROR = 24
UNKNOWN_CONTROLLER_ERROR = 25
FIRMWARE_TOO_LARGE_ERROR = 26
FUEL_GAUGE_NOT_PROGRAMMED = 27
PULSE_LOW_CURRENT_ERROR = 28


Expand Down