Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ max-attributes=15

[DESIGN]
max-statements=100
max-locals = 20

[STRING]
check-quote-consistency=yes
Expand Down
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "debugpy",
"request": "launch",
// "program": "src/__main__.py",
"module": "examples.low_level.example_low_level",
"module": "examples.dyscom.example_dyscom_send_file",
"justMyCode": false,
// "args": ["COM3"],
"console": "integratedTerminal"
Expand Down
16 changes: 13 additions & 3 deletions HINTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,24 @@ This page describes implementation details.
## Dyscom layer (I24)
- Contains functions for dyscom level
- This mode is used by I24 to measure EMG or BI
- Usage
- Usage for live data
- Call _power_module()_ to power on measurement module
- Call _init()_ with parameter for measurement
- Call _init()_ with parameter for measurement and DyscomInitFlag for live data
- Call _start()_ to start measurement
- Device sends now _DlSendLiveData_ packets with measurement data
- Call _stop()_ to end measurement
- Call _power_module()_ to power off measurement module
- IMPORTANT: all storage related functions are untested
- Usage for measurement data read from memory card
- Call _power_module()_ to power on measurement module and memory card
- Call _init()_ with parameter for measurement and DyscomInitFlag for storage mode
- Result contains measurement id, that is the filename used later
- Call _start()_ to start measurement
- Device sends nothing but stores measurement data on memory card
- Call _stop()_ to end measurement
- Call _power_module()_ to power off measurement module
- Call _get_meas_file_content()_ with filename from _init()_ to get measurement data
- Call _power_module()_ to power off memory card
- IMPORTANT: not all storage related functions are tested

# Platform hints

Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ Python 3.11 or higher
- Demonstrate how to use dyscom layer to measure data and plotting values using PyPlot
- `python -m examples.dyscom.example_dyscom_write_csv`
- Demonstrate how to use dyscom layer to measure data and writing measurement data to a .csv-file
- `python -m examples.dyscom.example_dyscom_send_file`
- Demonstrate how to use dyscom layer to save measurement data on memory card and reading it afterwards

## Dependencies for examples
- Install all dependencies
Expand Down Expand Up @@ -116,8 +118,11 @@ Python 3.11 or higher
- Improved examples under Linux/MacOS

## 0.0.15
- Clarified readme
- Enhanced readme
- Changed current for ChannelPoint from int to float

## 0.0.16
- Fixed error with PacketLowLevelChannelConfigAck result
- Fixed error with PacketLowLevelChannelConfigAck result

## 0.0.17
- Added sample that demonstrates how to read measurement data files from I24 devices
7 changes: 2 additions & 5 deletions examples/dyscom/example_dyscom_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ async def main() -> int:

####
calibration_filename = f"rehaingest_{device_id}.cal"
# get calibration file info
await dyscom.get_file_info(calibration_filename)
# get calibration file -> does not work
# there should be DL_Send_File commands afterwards
await dyscom.get_file_by_name(calibration_filename)
calibration_content = await dyscom.get_file_content(calibration_filename)
print(f"Calibration content length: {len(calibration_content)}")

# close serial port connection
connection.close()
Expand Down
68 changes: 68 additions & 0 deletions examples/dyscom/example_dyscom_send_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Provides an example how to use dyscom level layer to read stored data from device"""

import asyncio

from science_mode_4 import DeviceI24
from science_mode_4 import SerialPortConnection
from science_mode_4.dyscom.dyscom_types import DyscomFilterType, DyscomInitFlag, DyscomInitParams, DyscomPowerModulePowerType,\
DyscomPowerModuleType, DyscomSignalType
from examples.utils.example_utils import ExampleUtils


async def main() -> int:
"""Main function"""

# get comport from command line argument
com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(com_port)
# open connection, now we can read and write data
connection.open()

# create science mode device
device = DeviceI24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()

# get dyscom layer to call low level commands
dyscom = device.get_layer_dyscom()

# call enable measurement power module and memory card for measurement
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_ON)
await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_ON)
# call init with 1k sample rate
init_params = DyscomInitParams()
init_params.signal_type = [DyscomSignalType.BI, DyscomSignalType.EMG_1]
init_params.filter = DyscomFilterType.PREDEFINED_FILTER_1
# we want no live data and write all data to memory card
init_params.flags = [DyscomInitFlag.ENABLE_SD_STORAGE_MODE]
init_result = await dyscom.init(init_params)

# start dyscom measurement
await dyscom.start()

# wait 10s to have some measurement data
await asyncio.sleep(10)

# stop measurement
await dyscom.stop()
# turn power module off
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_OFF)

# get all meas data
measurement = await dyscom.get_meas_file_content(init_result.measurement_file_id)
print(f"Sample rate: {measurement[0].name}")
for key, value in measurement[1].items():
print(f"Signal type: {key.name}, sample count: {len(value)}")

# turn memory card off
await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_OFF)
# close serial port connection
connection.close()

return 0


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "science_mode_4"
version = "0.0.16"
version = "0.0.17"
authors = [
{ name="Marc Hofmann", email="marc-hofmann@gmx.de" },
]
Expand Down
121 changes: 39 additions & 82 deletions src/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,105 +4,62 @@
import sys
import asyncio

from science_mode_4 import DeviceP24
from science_mode_4.low_level.low_level_channel_config import PacketLowLevelChannelConfigAck
from science_mode_4.protocol import ChannelPoint
from science_mode_4.protocol.commands import Commands
from science_mode_4.utils import SerialPortConnection
from science_mode_4.low_level import LayerLowLevel
from science_mode_4.protocol import Connector, Channel
from science_mode_4.low_level import LowLevelHighVoltageSource, LowLevelMode
from science_mode_4.device_i24 import DeviceI24
from science_mode_4.dyscom.dyscom_types import DyscomFilterType, DyscomInitFlag, DyscomInitParams, DyscomPowerModulePowerType,\
DyscomPowerModuleType, DyscomSignalType
from science_mode_4.utils.serial_port_connection import SerialPortConnection


async def main() -> int:
"""Main function"""

# logger().setLevel(logging.DEBUG)
current = 70

# # keyboard is our trigger to start specific stimulation
# def input_callback(input_value: str) -> bool:
# """Callback call from keyboard input thread"""
# print(f"Input value {input_value}")

# nonlocal current
# if input_value == "1":
# send_channel_config(low_level_layer, Connector.GREEN)
# elif input_value == "2":
# send_channel_config(low_level_layer, Connector.YELLOW)
# elif input_value == "+":
# current += 0.5
# print(f"current: {current}")
# elif input_value == "-":
# current -= 0.5
# print(f"current: {current}")
# elif input_value == "q":
# # end keyboard input thread
# return True
# else:
# print("Invalid command")

# return False


def send_channel_config(low_level_layer: LayerLowLevel, connector: Connector):
"""Sends channel update"""
# device can store up to 10 channel config commands
for channel in Channel:
# send_channel_config does not wait for an acknowledge
low_level_layer.send_channel_config(True, channel, connector,
[ChannelPoint(1000, current), ChannelPoint(4000, 0),
ChannelPoint(1000, -current)])


print("Usage:")
print("Press 1 or 2 to stimulate green or yellow connector")
print("Press + or - to increase or decrease current")
print("Press q to quit")
# create keyboard input thread for non blocking console input
# keyboard_input_thread = KeyboardInputThread(input_callback)

# get comport from command line argument
# com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(SerialPortConnection.list_science_mode_device_ports()[0].device)
# open connection, now we can read and write data
connection.open()

# create science mode device
device = DeviceP24(connection)
device = DeviceI24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()

# get low level layer to call low level commands
low_level_layer = device.get_layer_low_level()

# call init low level
await low_level_layer.init(LowLevelMode.NO_MEASUREMENT, LowLevelHighVoltageSource.STANDARD)

# now we can start stimulation
# while keyboard_input_thread.is_alive():
for x in range(500):
if x % 100 == 0:
send_channel_config(low_level_layer, Connector.YELLOW)
# get new packets from connection
ack = low_level_layer.packet_buffer.get_packet_from_buffer()
if ack and ack.command == Commands.LOW_LEVEL_CHANNEL_CONFIG_ACK:
cca: PacketLowLevelChannelConfigAck = ack
# do something with packet ack
# here we print that an acknowledge arrived
print(cca.result.name)

await asyncio.sleep(0.1)

# wait until all acknowledges are received
await asyncio.sleep(0.5)
# call stop low level
await low_level_layer.stop()

# get dyscom layer to call low level commands
dyscom = device.get_layer_dyscom()

# call enable measurement power module and memory card for measurement
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_ON)
await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_ON)
# call init with lowest sample rate (because of performance issues with plotting values)
init_params = DyscomInitParams()
init_params.signal_type = [DyscomSignalType.BI, DyscomSignalType.EMG_1]
init_params.filter = DyscomFilterType.PREDEFINED_FILTER_1
# we want no live data and write all data to memory card
init_params.flags = [DyscomInitFlag.ENABLE_SD_STORAGE_MODE]
init_result = await dyscom.init(init_params)

# start dyscom measurement
await dyscom.start()

# wait 10s to have some measurement data
await asyncio.sleep(10)

# stop measurement
await dyscom.stop()
# turn power module off
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_OFF)

# get all meas data
measurement = await dyscom.get_meas_file_content(init_result.measurement_file_id)
print(f"Sample rate: {measurement[0].name}")
for key, value in measurement[1].items():
print(f"Signal type: {key.name}, sample count: {len(value)}")

# turn memory card off
await dyscom.power_module(DyscomPowerModuleType.MEMORY_CARD, DyscomPowerModulePowerType.SWITCH_OFF)
# close serial port connection
connection.close()

return 0


Expand Down
4 changes: 3 additions & 1 deletion src/science_mode_4/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async def initialize(self):
if operation_mode in [DyscomGetOperationModeType.LIVE_MEASURING_PRE,
DyscomGetOperationModeType.LIVE_MEASURING,
DyscomGetOperationModeType.RECORD_PRE,
DyscomGetOperationModeType.RECORD]:
DyscomGetOperationModeType.RECORD,
DyscomGetOperationModeType.DATATRANSFER_PRE,
DyscomGetOperationModeType.DATATRANSFER]:
await self.get_layer_dyscom().stop()

await self.get_layer_general().initialize()
Expand Down
23 changes: 22 additions & 1 deletion src/science_mode_4/dyscom/dyscom_get_file_by_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,38 @@ class PacketDyscomGetFileByName(PacketDyscomGet):
"""Packet for dyscom get with type file by name"""


def __init__(self, filename: str = ""):
def __init__(self, filename: str = "", mode: DyscomFileByNameMode = DyscomFileByNameMode.MULTI_BLOCK):
super().__init__()
self._type = DyscomGetType.FILE_BY_NAME
self._kind = int(self._type)
self._filename = filename
self._mode = mode


@property
def filename(self) -> str:
"""Getter for filename"""
return self._filename


@property
def mode(self) -> DyscomFileByNameMode:
"""Getter for mode"""
return self._mode


def get_data(self) -> bytes:
bb = ByteBuilder()
bb.append_bytes(super().get_data())
bb.append_bytes(DyscomHelper.str_to_bytes(self._filename, 128))
# block offset
bb.append_value(0, 4, True)
# file size
bb.append_value(0, 8, True)
# number of blocks
bb.append_value(0, 4, True)
# mode
bb.append_byte(self._mode)
# maybe more parameters are necessary here
# block_offset, file_size, n_blocks, mode
return bb.get_bytes()
Expand Down
Loading