-
Notifications
You must be signed in to change notification settings - Fork 1
16 blocking with threading #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
863ccd9
A threaded implemented with blocking connection
thompson318 9ca869e
Use source stream ID for file name, sourceSystem is not very interesting
thompson318 72f3293
Add the sourceSystemId to the csv
thompson318 3416fa8
Update query to use location visit admission times
thompson318 5cb051b
Check location discharge datetime too
thompson318 40f2f49
Added failing test
thompson318 41d9dce
Added ci
thompson318 b4b5b20
Fix with string substition
thompson318 702d818
Also change % to percent
thompson318 e2d88ea
With uv
thompson318 338af4e
Tidy workflow
thompson318 7b81b05
Tidied up module layout. Implemented a connection pool to avoid creat…
thompson318 8ec74b8
Added location string to output to help with debugging. config for my…
thompson318 bd835ab
Merge branch '17-file-writer-fix' into 16-blocking_with_threading
thompson318 45d542a
Tidied variable names
thompson318 f4c8e2d
Handle thread start runtime error
thompson318 e98b7ea
Rough queue
thompson318 f6d7ba3
Try single thread with a queue
thompson318 5a25da7
Should only need one connection
thompson318 9a8be8b
Simplified SQL and improved error handling
thompson318 7dbb0dd
pytest is a dev dependency
thompson318 e1e48b8
Log error if missing data in waveform message
thompson318 b1b6f7e
Kill the worker thread if there is a no database error
thompson318 1d86954
Check rows after we've left the connection context, so the we don't e…
thompson318 d4d4899
Context doesn't seem to handle putconn
thompson318 8fe2751
If we can't match patient remove message from queue but add to an uma…
thompson318 28502f2
Return as tuple
thompson318 ee2f9a5
Remove callbacks for ack and reject as this is now a single threaded …
thompson318 268bd77
Avoid double message acknol=wlement
thompson318 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| name: Run pytest | ||
|
|
||
| on: | ||
| push: | ||
| branches: [main, dev] | ||
| pull_request: | ||
| branches: [main, dev] | ||
| types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"] | ||
| workflow_dispatch: | ||
|
|
||
| jobs: | ||
| build: | ||
| name: Run pytest | ||
| runs-on: ubuntu-latest | ||
|
|
||
| steps: | ||
| - uses: actions/checkout@v5 | ||
| - name: Install uv | ||
| uses: astral-sh/setup-uv@v7 | ||
|
|
||
| - name: Install dependencies | ||
| run: uv sync --locked --all-extras --dev | ||
|
|
||
| - name: Run the tests | ||
| run: uv run pytest tests |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| """ | ||
| A script to receive messages in the waveform queue and write them to stdout, | ||
| based on https://www.rabbitmq.com/tutorials/tutorial-one-python | ||
| """ | ||
|
|
||
| import json | ||
| from datetime import datetime | ||
| import threading | ||
| import queue | ||
| import logging | ||
| import pika | ||
| import db as db # type:ignore | ||
| import settings as settings # type:ignore | ||
| import csv_writer as writer # type:ignore | ||
|
|
||
| max_threads = 1 # this needs to stay at 1 as pika is not thread safe. | ||
| logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| worker_queue: queue.Queue = queue.Queue(maxsize=max_threads) | ||
|
|
||
|
|
||
| class waveform_message: | ||
| def __init__(self, ch, delivery_tag, body): | ||
| self.ch = ch | ||
| self.delivery_tag = delivery_tag | ||
| self.body = body | ||
|
|
||
|
|
||
| def ack_message(ch, delivery_tag): | ||
| """Note that `ch` must be the same pika channel instance via which the | ||
| message being ACKed was retrieved (AMQP protocol constraint).""" | ||
| if ch.is_open: | ||
| ch.basic_ack(delivery_tag) | ||
| else: | ||
| logger.warning("Attempting to acknowledge a message on a closed channel.") | ||
|
|
||
|
|
||
| def reject_message(ch, delivery_tag, requeue): | ||
| if ch.is_open: | ||
| ch.basic_reject(delivery_tag, requeue) | ||
| else: | ||
| logger.warning("Attempting to not acknowledge a message on a closed channel.") | ||
|
|
||
|
|
||
| def waveform_callback(): | ||
| emap_db = db.starDB() | ||
| emap_db.init_query() | ||
| emap_db.connect() | ||
| while True: | ||
| message = worker_queue.get() | ||
| if message is not None: | ||
| data = json.loads(message.body) | ||
| try: | ||
| location_string = data["mappedLocationString"] | ||
| observation_time = data["observationTime"] | ||
| except IndexError as e: | ||
| reject_message(message.ch, message.delivery_tag, False) | ||
| logger.error( | ||
| f"Waveform message {message.delivery_tag} is missing required data {e}." | ||
| ) | ||
| worker_queue.task_done() | ||
| continue | ||
|
|
||
| observation_time = datetime.fromtimestamp(observation_time) | ||
| lookup_success = True | ||
| try: | ||
| matched_mrn = emap_db.get_row(location_string, observation_time) | ||
| except ValueError as e: | ||
| lookup_success = False | ||
| logger.error(f"Ambiguous or non existent match: {e}") | ||
| matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn") | ||
|
|
||
| if writer.write_frame(data, matched_mrn[2], matched_mrn[0]): | ||
| if lookup_success: | ||
| ack_message(message.ch, message.delivery_tag) | ||
| else: | ||
| reject_message(message.ch, message.delivery_tag, False) | ||
|
|
||
| worker_queue.task_done() | ||
| else: | ||
| logger.warning("No message in queue.") | ||
|
|
||
|
|
||
| def on_message(ch, method_frame, _header_frame, body): | ||
| wf_message = waveform_message(ch, method_frame.delivery_tag, body) | ||
| if not worker_queue.full(): | ||
| worker_queue.put(wf_message) | ||
| else: | ||
| logger.warning("Working queue is full.") | ||
| reject_message(ch, method_frame.delivery_tag, True) | ||
|
|
||
|
|
||
| def receiver(): | ||
| # set up database connection | ||
| rabbitmq_credentials = pika.PlainCredentials( | ||
| username=settings.RABBITMQ_USERNAME, password=settings.RABBITMQ_PASSWORD | ||
| ) | ||
| connection_parameters = pika.ConnectionParameters( | ||
| credentials=rabbitmq_credentials, | ||
| host=settings.RABBITMQ_HOST, | ||
| port=settings.RABBITMQ_PORT, | ||
| ) | ||
| connection = pika.BlockingConnection(connection_parameters) | ||
| channel = connection.channel() | ||
| channel.basic_qos(prefetch_count=1) | ||
|
|
||
| threads = [] | ||
| # I just want on thread, but in theory this should work for more | ||
| worker_thread = threading.Thread(target=waveform_callback) | ||
| worker_thread.start() | ||
| threads.append(worker_thread) | ||
|
|
||
| channel.basic_consume( | ||
| queue=settings.RABBITMQ_QUEUE, | ||
| auto_ack=False, | ||
| on_message_callback=on_message, | ||
| ) | ||
| try: | ||
| channel.start_consuming() | ||
| except KeyboardInterrupt: | ||
| channel.stop_consuming() | ||
|
|
||
| # Wait for all to complete | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| connection.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| from datetime import datetime | ||
| import psycopg2 | ||
| from psycopg2 import sql, pool | ||
| import logging | ||
|
|
||
| import settings as settings # type:ignore | ||
|
|
||
| logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s") | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class starDB: | ||
| sql_query: str = "" | ||
| connection_string: str = "dbname={} user={} password={} host={} port={}".format( | ||
| settings.UDS_DBNAME, # type:ignore | ||
| settings.UDS_USERNAME, # type:ignore | ||
| settings.UDS_PASSWORD, # type:ignore | ||
| settings.UDS_HOST, # type:ignore | ||
| settings.UDS_PORT, # type:ignore | ||
| ) | ||
| connection_pool: pool.ThreadedConnectionPool | ||
|
|
||
| def connect(self): | ||
| self.connection_pool = pool.ThreadedConnectionPool(1, 1, self.connection_string) | ||
|
|
||
| def init_query(self): | ||
| with open("src/sql/mrn_based_on_bed_and_datetime.sql", "r") as file: | ||
| self.sql_query = sql.SQL(file.read()) | ||
| self.sql_query = self.sql_query.format( | ||
| schema_name=sql.Identifier(settings.SCHEMA_NAME) | ||
| ) | ||
|
|
||
| def get_row(self, location_string: str, observation_datetime: datetime): | ||
| parameters = { | ||
| "location_string": location_string, | ||
| "observation_datetime": observation_datetime, | ||
| } | ||
| try: | ||
| with self.connection_pool.getconn() as db_connection: | ||
| with db_connection.cursor() as curs: | ||
| curs.execute(self.sql_query, parameters) | ||
| rows = curs.fetchall() | ||
| self.connection_pool.putconn(db_connection) | ||
| except psycopg2.errors.UndefinedTable as e: | ||
| self.connection_pool.putconn(db_connection) | ||
| raise ConnectionError(f"Missing tables in database: {e}") | ||
|
|
||
| if len(rows) != 1: | ||
| raise ValueError( | ||
| f"Wrong number of rows returned from database. {len(rows)} != 1, for {location_string}:{observation_datetime}" | ||
| ) | ||
|
|
||
| return rows[0] |
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| import pytest | ||
| from src.csv_writer import create_file_name | ||
| from datetime import datetime, timezone | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "units, expected_filename", | ||
| [ | ||
| ("uV", "2025-01-01.12345678.11.uV.csv"), | ||
| ("mL/s", "2025-01-01.12345678.11.mLps.csv"), | ||
| ("%", "2025-01-01.12345678.11.percent.csv"), | ||
| ], | ||
| ) | ||
| def test_create_file_name_handles_units(units, expected_filename, tmp_path): | ||
| sourceSystem = "11" | ||
| observationTime = datetime(2025, 1, 1, tzinfo=timezone.utc) | ||
| csn = "12345678" | ||
|
|
||
| filename = create_file_name(sourceSystem, observationTime, csn, units) | ||
|
|
||
| assert filename == expected_filename | ||
|
|
||
| # check we can write to it | ||
| with open(f"{tmp_path}/{filename}", "w") as fileout: | ||
| fileout.write("Test string") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue for this #6, so I think it's not necessary to add a comment to code as well.