Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
863ccd9
A threaded implemented with blocking connection
thompson318 Dec 4, 2025
9ca869e
Use source stream ID for file name, sourceSystem is not very interesting
thompson318 Dec 8, 2025
72f3293
Add the sourceSystemId to the csv
thompson318 Dec 8, 2025
3416fa8
Update query to use location visit admission times
thompson318 Dec 8, 2025
5cb051b
Check location discharge datetime too
thompson318 Dec 8, 2025
40f2f49
Added failing test
thompson318 Dec 9, 2025
41d9dce
Added ci
thompson318 Dec 9, 2025
b4b5b20
Fix with string substition
thompson318 Dec 9, 2025
702d818
Also change % to percent
thompson318 Dec 9, 2025
e2d88ea
With uv
thompson318 Dec 9, 2025
338af4e
Tidy workflow
thompson318 Dec 9, 2025
7b81b05
Tidied up module layout. Implemented a connection pool to avoid creat…
thompson318 Dec 11, 2025
8ec74b8
Added location string to output to help with debugging. config for my…
thompson318 Dec 11, 2025
bd835ab
Merge branch '17-file-writer-fix' into 16-blocking_with_threading
thompson318 Dec 11, 2025
45d542a
Tidied variable names
thompson318 Dec 11, 2025
f4c8e2d
Handle thread start runtime error
thompson318 Dec 15, 2025
e98b7ea
Rough queue
thompson318 Dec 15, 2025
f6d7ba3
Try single thread with a queue
thompson318 Dec 15, 2025
5a25da7
Should only need one connection
thompson318 Dec 16, 2025
9a8be8b
Simplified SQL and improved error handling
thompson318 Dec 16, 2025
7dbb0dd
pytest is a dev dependency
thompson318 Dec 16, 2025
e1e48b8
Log error if missing data in waveform message
thompson318 Dec 16, 2025
b1b6f7e
Kill the worker thread if there is a no database error
thompson318 Dec 16, 2025
1d86954
Check rows after we've left the connection context, so the we don't e…
thompson318 Dec 16, 2025
d4d4899
Context doesn't seem to handle putconn
thompson318 Dec 16, 2025
8fe2751
If we can't match patient remove message from queue but add to an uma…
thompson318 Dec 16, 2025
28502f2
Return as tuple
thompson318 Dec 16, 2025
ee2f9a5
Remove callbacks for ack and reject as this is now a single threaded …
thompson318 Dec 16, 2025
268bd77
Avoid double message acknol=wlement
thompson318 Dec 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Run pytest

on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"]
workflow_dispatch:

jobs:
build:
name: Run pytest
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v7

- name: Install dependencies
run: uv sync --locked --all-extras --dev

- name: Run the tests
run: uv run pytest tests
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ repos:
- id: ruff
args:
- --fix
- waveform_controller
- ./
- id: ruff-format
# Type-checking python code.
- repo: https://github.com/pre-commit/mirrors-mypy
Expand All @@ -23,7 +23,7 @@ repos:
"types-psycopg2",
"types-pika"
]
files: waveform_controller/
files: src/
# ----------
# Formats docstrings to comply with PEP257
- repo: https://github.com/PyCQA/docformatter
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ dependencies = [
"psycopg2-binary>=2.9.11",
]

[project.optional-dependencies]
dev = ["pytest>=9.0.2"]

[project.scripts]
emap-extract-waveform = "waveform_controller.controller:receiver"
emap-extract-waveform = "controller:receiver"
File renamed without changes.
129 changes: 129 additions & 0 deletions src/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
A script to receive messages in the waveform queue and write them to stdout,
based on https://www.rabbitmq.com/tutorials/tutorial-one-python
"""

import json
from datetime import datetime
import threading
import queue
import logging
import pika
import db as db # type:ignore
import settings as settings # type:ignore
import csv_writer as writer # type:ignore

max_threads = 1 # this needs to stay at 1 as pika is not thread safe.
logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


worker_queue: queue.Queue = queue.Queue(maxsize=max_threads)


class waveform_message:
def __init__(self, ch, delivery_tag, body):
self.ch = ch
self.delivery_tag = delivery_tag
self.body = body


def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which the
message being ACKed was retrieved (AMQP protocol constraint)."""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
logger.warning("Attempting to acknowledge a message on a closed channel.")


def reject_message(ch, delivery_tag, requeue):
if ch.is_open:
ch.basic_reject(delivery_tag, requeue)
else:
logger.warning("Attempting to not acknowledge a message on a closed channel.")


def waveform_callback():
emap_db = db.starDB()
emap_db.init_query()
emap_db.connect()
while True:
message = worker_queue.get()
if message is not None:
data = json.loads(message.body)
try:
location_string = data["mappedLocationString"]
observation_time = data["observationTime"]
except IndexError as e:
reject_message(message.ch, message.delivery_tag, False)
logger.error(
f"Waveform message {message.delivery_tag} is missing required data {e}."
)
worker_queue.task_done()
continue

observation_time = datetime.fromtimestamp(observation_time)
lookup_success = True
try:
matched_mrn = emap_db.get_row(location_string, observation_time)
except ValueError as e:
lookup_success = False
logger.error(f"Ambiguous or non existent match: {e}")
matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn")

if writer.write_frame(data, matched_mrn[2], matched_mrn[0]):
if lookup_success:
ack_message(message.ch, message.delivery_tag)
else:
reject_message(message.ch, message.delivery_tag, False)

worker_queue.task_done()
else:
logger.warning("No message in queue.")


def on_message(ch, method_frame, _header_frame, body):
wf_message = waveform_message(ch, method_frame.delivery_tag, body)
if not worker_queue.full():
worker_queue.put(wf_message)
else:
logger.warning("Working queue is full.")
reject_message(ch, method_frame.delivery_tag, True)


def receiver():
# set up database connection
rabbitmq_credentials = pika.PlainCredentials(
username=settings.RABBITMQ_USERNAME, password=settings.RABBITMQ_PASSWORD
)
connection_parameters = pika.ConnectionParameters(
credentials=rabbitmq_credentials,
host=settings.RABBITMQ_HOST,
port=settings.RABBITMQ_PORT,
)
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)

threads = []
# I just want on thread, but in theory this should work for more
worker_thread = threading.Thread(target=waveform_callback)
worker_thread.start()
threads.append(worker_thread)

channel.basic_consume(
queue=settings.RABBITMQ_QUEUE,
auto_ack=False,
on_message_callback=on_message,
)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

# Wait for all to complete
for thread in threads:
thread.join()

connection.close()
12 changes: 8 additions & 4 deletions waveform_controller/csv_writer.py → src/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@


def create_file_name(
sourceSystem: str, observationTime: datetime, csn: str, units: str
sourceStreamId: str, observationTime: datetime, csn: str, units: str
) -> str:
"""Create a unique file name based on the patient contact serial number
(csn) the date, and the source system."""
datestring = observationTime.strftime("%Y-%m-%d")
return f"{datestring}.{csn}.{sourceSystem}.{units}.csv"
units = units.replace("/", "p")
units = units.replace("%", "percent")
return f"{datestring}.{csn}.{sourceStreamId}.{units}.csv"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return f"{datestring}.{csn}.{sourceStreamId}.{units}.csv"
// Will need to remove CSN from the pseudonymised version!
return f"{datestring}.{csn}.{sourceStreamId}.{units}.csv"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue for this #6, so I think it's not necessary to add a comment to code as well.



def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool:
Expand All @@ -20,7 +22,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool:

:return: True if write was successful.
"""
sourceSystem = waveform_message.get("sourceSystem", None)
sourceStreamId = waveform_message.get("sourceStreamId", None)
observationTime = waveform_message.get("observationTime", False)

if not observationTime:
Expand All @@ -33,7 +35,7 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool:
Path(out_path).mkdir(exist_ok=True)

filename = out_path + create_file_name(
sourceSystem, observation_datetime, csn, units
sourceStreamId, observation_datetime, csn, units
)
with open(filename, "a") as fileout:
wv_writer = csv.writer(fileout, delimiter=",")
Expand All @@ -45,9 +47,11 @@ def write_frame(waveform_message: dict, csn: str, mrn: str) -> bool:
[
csn,
mrn,
sourceStreamId,
units,
waveform_message.get("samplingRate", ""),
observationTime,
waveform_message.get("mappedLocationString", ""),
waveform_data,
]
)
Expand Down
53 changes: 53 additions & 0 deletions src/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime
import psycopg2
from psycopg2 import sql, pool
import logging

import settings as settings # type:ignore

logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


class starDB:
sql_query: str = ""
connection_string: str = "dbname={} user={} password={} host={} port={}".format(
settings.UDS_DBNAME, # type:ignore
settings.UDS_USERNAME, # type:ignore
settings.UDS_PASSWORD, # type:ignore
settings.UDS_HOST, # type:ignore
settings.UDS_PORT, # type:ignore
)
connection_pool: pool.ThreadedConnectionPool

def connect(self):
self.connection_pool = pool.ThreadedConnectionPool(1, 1, self.connection_string)

def init_query(self):
with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file:
self.sql_query = sql.SQL(file.read())
self.sql_query = self.sql_query.format(
schema_name=sql.Identifier(settings.SCHEMA_NAME)
)

def get_row(self, location_string: str, observation_datetime: datetime):
parameters = {
"location_string": location_string,
"observation_datetime": observation_datetime,
}
try:
with self.connection_pool.getconn() as db_connection:
with db_connection.cursor() as curs:
curs.execute(self.sql_query, parameters)
rows = curs.fetchall()
self.connection_pool.putconn(db_connection)
except psycopg2.errors.UndefinedTable as e:
self.connection_pool.putconn(db_connection)
raise ConnectionError(f"Missing tables in database: {e}")

if len(rows) != 1:
raise ValueError(
f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}"
)

return rows[0]
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ INNER JOIN {schema_name}.location_visit lv
INNER JOIN {schema_name}.location loc
ON lv.location_id = loc.location_id
WHERE loc.location_string = %(location_string)s
AND hv.valid_from BETWEEN %(start_datetime)s AND %(end_datetime)s
ORDER by hv.valid_from DESC
AND lv.admission_datetime <= %(observation_datetime)s
AND ( lv.discharge_datetime >= %(observation_datetime)s OR lv.discharge_datetime IS NULL )
Empty file added tests/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions tests/test_file_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
from src.csv_writer import create_file_name
from datetime import datetime, timezone


@pytest.mark.parametrize(
"units, expected_filename",
[
("uV", "2025-01-01.12345678.11.uV.csv"),
("mL/s", "2025-01-01.12345678.11.mLps.csv"),
("%", "2025-01-01.12345678.11.percent.csv"),
],
)
def test_create_file_name_handles_units(units, expected_filename, tmp_path):
sourceSystem = "11"
observationTime = datetime(2025, 1, 1, tzinfo=timezone.utc)
csn = "12345678"

filename = create_file_name(sourceSystem, observationTime, csn, units)

assert filename == expected_filename

# check we can write to it
with open(f"{tmp_path}/{filename}", "w") as fileout:
fileout.write("Test string")
Loading