From 1820f97e8d810019054945aff06930c23e1e16d5 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 22 Aug 2025 14:56:23 -0700 Subject: [PATCH 01/19] PYTHON-5506 Prototype adaptive token bucket retry (#2501) Add adaptive token bucket based retry policy. Successfully completed commands deposit 0.1 token. Failed retry attempts consume 1 token. A retry is only permitted if there is an available token. Token bucket starts full with the maximum 1000 tokens. --- test/asynchronous/test_backpressure.py | 66 ++++++++++++++++++++++++++ test/test_backpressure.py | 66 ++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/test/asynchronous/test_backpressure.py b/test/asynchronous/test_backpressure.py index 11f8edde67..871b369ecf 100644 --- a/test/asynchronous/test_backpressure.py +++ b/test/asynchronous/test_backpressure.py @@ -182,6 +182,72 @@ async def test_limit_retry_command(self): self.assertIn("RetryableError", str(error.exception)) +class TestRetryPolicy(AsyncPyMongoTestCase): + async def test_retry_policy(self): + capacity = 10 + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) + self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) + self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) + for i in range(1, helpers._MAX_RETRIES + 1): + self.assertTrue(await retry_policy.should_retry(i, 0)) + self.assertFalse(await retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) + for i in range(capacity - helpers._MAX_RETRIES): + self.assertTrue(await retry_policy.should_retry(1, 0)) + # No tokens left, should not retry. + self.assertFalse(await retry_policy.should_retry(1, 0)) + self.assertEqual(retry_policy.token_bucket.tokens, 0) + + # record_success should generate tokens. + for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): + await retry_policy.record_success(retry=False) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) + for i in range(2): + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertFalse(await retry_policy.should_retry(1, 0)) + + # Recording a successful retry should return 1 additional token. + await retry_policy.record_success(retry=True) + self.assertAlmostEqual( + retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN + ) + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertFalse(await retry_policy.should_retry(1, 0)) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) + + async def test_retry_policy_csot(self): + retry_policy = _RetryPolicy(_TokenBucket()) + self.assertTrue(await retry_policy.should_retry(1, 0.5)) + with pymongo.timeout(0.5): + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertTrue(await retry_policy.should_retry(1, 0.1)) + # Would exceed the timeout, should not retry. + self.assertFalse(await retry_policy.should_retry(1, 1.0)) + self.assertTrue(await retry_policy.should_retry(1, 1.0)) + + @async_client_context.require_failCommand_appName + async def test_limit_retry_command(self): + client = await self.async_rs_or_single_client() + client._retry_policy.token_bucket.tokens = 1 + db = client.pymongo_test + await db.t.insert_one({"x": 1}) + + # Ensure command is retried once overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": 1} + async with self.fail_point(fail_many): + await db.command("find", "t") + + # Ensure command stops retrying when there are no tokens left. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": 2} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + class TestRetryPolicy(AsyncPyMongoTestCase): async def test_retry_policy(self): capacity = 10 diff --git a/test/test_backpressure.py b/test/test_backpressure.py index fac1d6236d..0a145b32fc 100644 --- a/test/test_backpressure.py +++ b/test/test_backpressure.py @@ -182,6 +182,72 @@ def test_limit_retry_command(self): self.assertIn("RetryableError", str(error.exception)) +class TestRetryPolicy(PyMongoTestCase): + def test_retry_policy(self): + capacity = 10 + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) + self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) + self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) + for i in range(1, helpers._MAX_RETRIES + 1): + self.assertTrue(retry_policy.should_retry(i, 0)) + self.assertFalse(retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) + for i in range(capacity - helpers._MAX_RETRIES): + self.assertTrue(retry_policy.should_retry(1, 0)) + # No tokens left, should not retry. + self.assertFalse(retry_policy.should_retry(1, 0)) + self.assertEqual(retry_policy.token_bucket.tokens, 0) + + # record_success should generate tokens. + for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): + retry_policy.record_success(retry=False) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) + for i in range(2): + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertFalse(retry_policy.should_retry(1, 0)) + + # Recording a successful retry should return 1 additional token. + retry_policy.record_success(retry=True) + self.assertAlmostEqual( + retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN + ) + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertFalse(retry_policy.should_retry(1, 0)) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) + + def test_retry_policy_csot(self): + retry_policy = _RetryPolicy(_TokenBucket()) + self.assertTrue(retry_policy.should_retry(1, 0.5)) + with pymongo.timeout(0.5): + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertTrue(retry_policy.should_retry(1, 0.1)) + # Would exceed the timeout, should not retry. + self.assertFalse(retry_policy.should_retry(1, 1.0)) + self.assertTrue(retry_policy.should_retry(1, 1.0)) + + @client_context.require_failCommand_appName + def test_limit_retry_command(self): + client = self.rs_or_single_client() + client._retry_policy.token_bucket.tokens = 1 + db = client.pymongo_test + db.t.insert_one({"x": 1}) + + # Ensure command is retried once overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": 1} + with self.fail_point(fail_many): + db.command("find", "t") + + # Ensure command stops retrying when there are no tokens left. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": 2} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + class TestRetryPolicy(PyMongoTestCase): def test_retry_policy(self): capacity = 10 From a99645eca58859647594caf92ca9cfdfcb79036b Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Thu, 9 Oct 2025 11:43:03 -0500 Subject: [PATCH 02/19] PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers (#2509) Co-authored-by: Iris <58442094+sleepyStick@users.noreply.github.com> Co-authored-by: Noah Stapp Co-authored-by: Shane Harvey Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- test/connection_logging/connection-logging.json | 8 ++++++-- .../unified/auth-network-error.json | 6 +++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 5799e834d7..60190c7dc0 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,7 +331,9 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000 + "heartbeatFrequencyMS": 10000, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeLogMessages": { "connection": "debug" @@ -355,7 +357,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "clientAppName" } } diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 84763af32e..656b291366 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,7 +53,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "authNetworkErrorTest" } } @@ -75,6 +77,8 @@ ], "uriOptions": { "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } From f8ce3b1ba0f7704edb0d76c08c74977b9d3c067a Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 10 Oct 2025 12:02:09 -0700 Subject: [PATCH 03/19] add withTransaction script --- withTransaction.py | 111 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 withTransaction.py diff --git a/withTransaction.py b/withTransaction.py new file mode 100644 index 0000000000..a24f950c0c --- /dev/null +++ b/withTransaction.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import os +import time +from concurrent.futures import ThreadPoolExecutor + +from pymongo import MongoClient + + +class RunOrderTransaction: + def __init__(self, client): + super(RunOrderTransaction, self).__init__() # noqa:UP008 + self.retry_attempts = -1 + self.time = 0 + self.client = client + + def run(self): + start = time.time() + with self.client.start_session() as session: + try: + session.with_transaction(self.callback) + finally: + self.time = time.time() - start + return self + + def callback(self, session): + self.retry_attempts += 1 + return callback(session, self.client) + + +def callback(session, client): + order_id = client.test.orders1.insert_one({"sku": "foo", "qty": 1}, session=session).inserted_id + res = client.test.inventory1.update_one( + {"sku": "foo", "qty": {"$gte": 1}}, {"$inc": {"qty": -1}}, session=session + ) + if not res.modified_count: + raise TypeError("Insufficient inventory count") + + return order_id + + +def run(num_threads: int, local: bool): + if local: + client = MongoClient() + else: + client = MongoClient(os.getenv("ATLAS_URI")) + try: + client.drop_database("test") + except Exception: # noqa: S110 + # fails on atlas? + pass + db = client.test + db.drop_collection("orders1") + db.create_collection("orders1") + db.drop_collection("inventory1") + inventory = db.create_collection("inventory1") + inventory.insert_one({"sku": "foo", "qty": 1000000}) + + f.write("Testing %s threads\n" % num_threads) + start = time.time() + N_TXNS = 512 + results = [] + ops = [RunOrderTransaction(client) for _ in range(N_TXNS)] + with ThreadPoolExecutor(max_workers=num_threads) as exc: + futures = [exc.submit(op.run) for op in ops] + for future in futures: + result = future.result() + results.append(result) + + end = time.time() + total_time = end - start + total_attempts = sum(r.retry_attempts for r in results) + + f.write("All threads completed after %s seconds\n" % (end - start)) + f.write(f"Total number of retry attempts: {total_attempts}\n") + client.close() + + latencies = sorted(r.time for r in results) + avg_latency = sum(latencies) / N_TXNS + p50 = latencies[int(N_TXNS * 0.5)] + p90 = latencies[int(N_TXNS * 0.9)] + p99 = latencies[int(N_TXNS * 0.99)] + p100 = latencies[int(N_TXNS * 1.0) - 1] + # print(f'avg latency: {avg_latency:.2f}s p50: {p50:.2f}s p90: {p90:.2f}s p99: {p99:.2f}s p100: {p100:.2f}s') + return total_time, total_attempts, avg_latency, p50, p90, p99, p100 + + +def main(f, local=True): + NUM_THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] + data = {} + for num in NUM_THREADS: + times, attempts, avg_latency, p50, p90, p99, p100 = run(num, local) + data[num] = { + "avg": avg_latency, + "p50": p50, + "p90": p90, + "p99": p99, + "p100": p100, + } + f.write("\n") + time.sleep(10) + f.write("\nthreads | avg | p50 | p90 | p99 | p100\n") + for num in NUM_THREADS: + f.write( + f"{num:7} | {data[num]['avg']:5.2f} | {data[num]['p50']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p100']:5.2f}\n" + ) + + +if __name__ == "__main__": + with open("/Users/iris.ho/Github/backpressure/final/local_original_1.5.txt", "w") as f: + main(f, local=True) From 0770e6c2148a47a046693e9e11a7e5f97c3c225d Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 10 Oct 2025 13:32:14 -0700 Subject: [PATCH 04/19] add summarize script --- summarize.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 summarize.py diff --git a/summarize.py b/summarize.py new file mode 100644 index 0000000000..bcac39512a --- /dev/null +++ b/summarize.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import csv +import pprint +import re +from collections import defaultdict + + +def testing_n_threads(f_in, data): + threads = re.match(r"Testing (?P.*) threads", f_in.readline()).group("n_threads") + seconds = re.match( + r"All threads completed after (?P.*) seconds", f_in.readline() + ).group("n_seconds") + tries = re.match(r"Total number of retry attempts: (?P.*)", f_in.readline()).group( + "n_tries" + ) + data[f"{threads}_sec"].append(float(seconds)) + data[f"{threads}_try"].append(int(tries)) + return data + + +def read_table(f_in, data): + # Initialize the CSV reader with the pipe '|' as the delimiter + reader = csv.reader(f_in, delimiter="|") + next(reader) # skip header + + for row in reader: + if "threads " in row: + continue + row = [col.strip() for col in row] # noqa:PLW2901 + if row == []: + continue + # Convert numbers to appropriate types (int for threads, float for statistics) + threads = int(row[0]) + avg, p50, p90, p99, p100 = map(float, row[1:]) + # Append the parsed row to the list + data[f"{threads}_avg"].append(avg) + data[f"{threads}_p50"].append(p50) + data[f"{threads}_p90"].append(p90) + data[f"{threads}_p99"].append(p99) + data[f"{threads}_p100"].append(p100) + return data + + +path = "/Users/iris.ho/Github/backpressure/final" +files = ["main", "local_original_1.5", "local_original_2", "local_server_algo"] +print_data = {} +pp = pprint.PrettyPrinter(width=80) +THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] +for f in files: + data = defaultdict(list) + with open(f"{path}/{f}.txt") as f_in: + for _ in THREADS: + data = testing_n_threads(f_in, data) + f_in.readline() + f_in.readline() + data = read_table(f_in, data) + print_data[f] = { + "avg": [data[f"{thread}_avg"] for thread in THREADS], + "p50": [data[f"{thread}_p50"] for thread in THREADS], + "p90": [data[f"{thread}_p90"] for thread in THREADS], + "p99": [data[f"{thread}_p99"] for thread in THREADS], + "p100": [data[f"{thread}_p100"] for thread in THREADS], + } +print(print_data) # noqa: T201 From a415468f3c139495bc1f135449e7113c1f4d9ff1 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Fri, 10 Oct 2025 13:34:54 -0700 Subject: [PATCH 05/19] Create graph.py --- graph.py | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 graph.py diff --git a/graph.py b/graph.py new file mode 100644 index 0000000000..ffb278f741 --- /dev/null +++ b/graph.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import matplotlib.cm as mplcm +import matplotlib.colors as colors +import matplotlib.pyplot as plt +import numpy as np + +threads = [1, 2, 4, 8, 16, 32, 64, 128, 256] + +data = { + "local_main": { + "avg": [[0.01], [0.03], [0.05], [0.09], [0.21], [0.38], [0.95], [4.05], [11.11]], + "p50": [[0.01], [0.02], [0.03], [0.05], [0.12], [0.25], [0.67], [3.22], [9.8]], + "p90": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], + "p99": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], + "p100": [[0.04], [0.21], [0.3], [0.58], [1.22], [2.59], [7.25], [17.03], [25.38]], + }, + "local_original_1.5": { + "avg": [[0.01], [0.02], [0.04], [0.09], [0.2], [0.35], [0.65], [1.14], [2.21]], + "p50": [[0.01], [0.01], [0.01], [0.02], [0.06], [0.09], [0.2], [0.45], [1.91]], + "p90": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], + "p99": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], + "p100": [[0.04], [0.62], [1.11], [3.33], [4.71], [6.07], [5.64], [6.05], [6.76]], + }, + "local_original_2": { + "avg": [[0.01], [0.02], [0.04], [0.09], [0.18], [0.39], [0.63], [1.23], [2.28]], + "p50": [[0.01], [0.01], [0.01], [0.02], [0.02], [0.06], [0.17], [0.41], [1.9]], + "p90": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], + "p99": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], + "p100": [[0.04], [1.3], [1.54], [3.07], [3.72], [5.55], [5.44], [6.42], [7.06]], + }, + "local_server_algo": { + "avg": [[0.01], [0.02], [0.05], [0.1], [0.19], [0.36], [0.73], [1.23], [2.19]], + "p50": [[0.01], [0.01], [0.01], [0.01], [0.03], [0.09], [0.19], [0.59], [2.04]], + "p90": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], + "p99": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], + "p100": [[0.09], [0.65], [1.35], [2.87], [3.31], [4.4], [6.55], [5.84], [6.88]], + }, +} + + +metrics = ["avg", "p90", "p100"] +metric_titles = { + "avg": "Average Latency", + "p50": "p50 Latency", + "p90": "p90 Latency", + "p99": "p99 Latency", + "p100": "p100 (Max) Latency", +} + +plt.figure(figsize=(16, 4 * len(metrics))) +NUM_COLORS = len(data.keys()) + 1 +cm = plt.get_cmap("gist_rainbow") +cNorm = colors.Normalize(vmin=0, vmax=NUM_COLORS - 1) +scalarMap = mplcm.ScalarMappable(norm=cNorm, cmap=cm) +for i, metric in enumerate(metrics, 1): + if metric in ["avg"]: + ax = plt.subplot(len(metrics), 2, i) + else: + ax = plt.subplot(len(metrics), 2, (i - 1) * (2) + 1) + ax.set_prop_cycle(color=[scalarMap.to_rgba(i) for i in range(NUM_COLORS)]) + order = [] + for label, vals in data.items(): + if metric not in vals: + continue + arr = np.concatenate(np.around(np.array(vals[metric]), decimals=2)) + order.append(plt.plot(threads, arr, "o-", label=label)) + + plt.title(metric_titles[metric]) + plt.xscale("log", base=2) + plt.xlabel("Threads") + plt.ylabel("Seconds") + plt.xticks(threads, threads) + plt.grid(True, which="both", axis="x", linestyle="--", alpha=0.5) + plt.axhline(y=0, color="gray", linestyle="-") + if metric != "p90": + plt.legend().set_visible(False) + else: + plt.legend(loc=(1.01, 0.5), fontsize=8) + +plt.tight_layout() +plt.show() From 0e28b77bb206f94aac866734ff2d2c2dfc038bd7 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 16 Oct 2025 10:31:46 -0700 Subject: [PATCH 06/19] 1ms start, 1.25 rate, 500ms max --- pymongo/asynchronous/client_session.py | 6 +++--- pymongo/synchronous/client_session.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 697de81b1c..2d9192954c 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -472,8 +472,8 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 -_BACKOFF_MAX = 1 -_BACKOFF_INITIAL = 0.050 # 50ms initial backoff +_BACKOFF_MAX = 0.500 # 500ms max backoff +_BACKOFF_INITIAL = 0.001 # 1ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -711,7 +711,7 @@ async def callback(session, custom_arg, custom_kwarg=None): while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) await asyncio.sleep(backoff) retry += 1 await self.start_transaction( diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index d5a37eb108..2237645fba 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -470,8 +470,8 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 -_BACKOFF_MAX = 1 -_BACKOFF_INITIAL = 0.050 # 50ms initial backoff +_BACKOFF_MAX = 0.500 # 500ms max backoff +_BACKOFF_INITIAL = 0.001 # 1ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -709,7 +709,7 @@ def callback(session, custom_arg, custom_kwarg=None): while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) time.sleep(backoff) retry += 1 self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms) From 8341b71bddbaf290089ffd589e02aa9bf28f9c54 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 23 Oct 2025 12:28:07 -0700 Subject: [PATCH 07/19] add test --- test/asynchronous/test_transactions.py | 43 +++++++++++++++++++++++++- test/test_transactions.py | 41 +++++++++++++++++++++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 29c5d26423..4d3ebced94 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -17,6 +17,7 @@ import asyncio import sys +import time from io import BytesIO from test.asynchronous.utils_spec_runner import AsyncSpecRunner @@ -37,7 +38,11 @@ from bson.raw_bson import RawBSONDocument from pymongo import WriteConcern, _csot from pymongo.asynchronous import client_session -from pymongo.asynchronous.client_session import TransactionOptions +from pymongo.asynchronous.client_session import ( + _BACKOFF_MAX, + TransactionOptions, + _set_backoff_initial, +) from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.cursor import AsyncCursor from pymongo.errors import ( @@ -613,6 +618,42 @@ async def callback(session): await s.with_transaction(callback) self.assertFalse(s.in_transaction) + @async_client_context.require_test_commands + @async_client_context.require_transactions + async def test_transaction_backoff(self): + client = async_client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + await self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": {"times": 3}, + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addAsyncCleanup( + self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} + ) + + start = time.monotonic() + + async def callback(session): + await coll.insert_one({}, session=session) + + total_backoff = 0 + async with self.client.start_session() as s: + await s.with_transaction(callback) + self.assertEqual(len(s._transaction_retry_backoffs), 3) + for backoff in s._transaction_retry_backoffs: + self.assertGreater(backoff, 0) + total_backoff += backoff + + end = time.monotonic() + self.assertGreaterEqual(end - start, total_backoff) + class TestOptionsInsideTransactionProse(AsyncTransactionsBase): @async_client_context.require_transactions diff --git a/test/test_transactions.py b/test/test_transactions.py index 37e1a249e0..f2fe843434 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -17,6 +17,7 @@ import asyncio import sys +import time from io import BytesIO from test.utils_spec_runner import SpecRunner @@ -48,7 +49,11 @@ from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.synchronous import client_session -from pymongo.synchronous.client_session import TransactionOptions +from pymongo.synchronous.client_session import ( + _BACKOFF_MAX, + TransactionOptions, + _set_backoff_initial, +) from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.cursor import Cursor @@ -601,6 +606,40 @@ def callback(session): s.with_transaction(callback) self.assertFalse(s.in_transaction) + @client_context.require_test_commands + @client_context.require_transactions + def test_transaction_backoff(self): + client = client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": {"times": 3}, + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) + + start = time.monotonic() + + def callback(session): + coll.insert_one({}, session=session) + + total_backoff = 0 + with self.client.start_session() as s: + s.with_transaction(callback) + self.assertEqual(len(s._transaction_retry_backoffs), 3) + for backoff in s._transaction_retry_backoffs: + self.assertGreater(backoff, 0) + total_backoff += backoff + + end = time.monotonic() + self.assertGreaterEqual(end - start, total_backoff) + class TestOptionsInsideTransactionProse(TransactionsBase): @client_context.require_transactions From fe21d09d3be4adf661325c05f9098cc9975480bf Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Mon, 27 Oct 2025 13:45:22 -0700 Subject: [PATCH 08/19] change test --- pymongo/asynchronous/client_session.py | 2 ++ test/asynchronous/test_transactions.py | 13 ++++++------- test/test_transactions.py | 13 ++++++------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 2d9192954c..f771b848cf 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -708,10 +708,12 @@ async def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 + self._transaction_retry_backoffs = [] while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + self._transaction_retry_backoffs.append(backoff) await asyncio.sleep(backoff) retry += 1 await self.start_transaction( diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 4d3ebced94..314e8b209c 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -623,11 +623,15 @@ async def callback(session): async def test_transaction_backoff(self): client = async_client_context.client coll = client[self.db.name].test + # optionally set _backoff_initial to a higher value + _set_backoff_initial(client_session._BACKOFF_MAX) # set fail point to trigger transaction failure and trigger backoff await self.set_fail_point( { "configureFailPoint": "failCommand", - "mode": {"times": 3}, + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], "errorCode": 24, @@ -643,16 +647,11 @@ async def test_transaction_backoff(self): async def callback(session): await coll.insert_one({}, session=session) - total_backoff = 0 async with self.client.start_session() as s: await s.with_transaction(callback) - self.assertEqual(len(s._transaction_retry_backoffs), 3) - for backoff in s._transaction_retry_backoffs: - self.assertGreater(backoff, 0) - total_backoff += backoff end = time.monotonic() - self.assertGreaterEqual(end - start, total_backoff) + self.assertGreaterEqual(end - start, 1.25) # 1 second class TestOptionsInsideTransactionProse(AsyncTransactionsBase): diff --git a/test/test_transactions.py b/test/test_transactions.py index f2fe843434..83767d655d 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -611,11 +611,15 @@ def callback(session): def test_transaction_backoff(self): client = client_context.client coll = client[self.db.name].test + # optionally set _backoff_initial to a higher value + _set_backoff_initial(client_session._BACKOFF_MAX) # set fail point to trigger transaction failure and trigger backoff self.set_fail_point( { "configureFailPoint": "failCommand", - "mode": {"times": 3}, + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], "errorCode": 24, @@ -629,16 +633,11 @@ def test_transaction_backoff(self): def callback(session): coll.insert_one({}, session=session) - total_backoff = 0 with self.client.start_session() as s: s.with_transaction(callback) - self.assertEqual(len(s._transaction_retry_backoffs), 3) - for backoff in s._transaction_retry_backoffs: - self.assertGreater(backoff, 0) - total_backoff += backoff end = time.monotonic() - self.assertGreaterEqual(end - start, total_backoff) + self.assertGreaterEqual(end - start, 1.25) # 1 second class TestOptionsInsideTransactionProse(TransactionsBase): From 96c4d3fa26c42a2dd744237f75cc5a01d627e97e Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Wed, 29 Oct 2025 16:13:27 -0700 Subject: [PATCH 09/19] add second test? --- pymongo/asynchronous/client_session.py | 11 ++++-- pymongo/synchronous/client_session.py | 9 +++++ test/asynchronous/test_transactions.py | 50 ++++++++++++++++++++++++-- test/test_transactions.py | 48 +++++++++++++++++++++++-- 4 files changed, 110 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index f771b848cf..1c9c0663f1 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -481,6 +481,11 @@ def _within_time_limit(start_time: float) -> bool: return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT +def _would_exceed_time_limit(start_time: float, backoff: float) -> bool: + """Is the backoff within the with_transaction retry limit?""" + return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT + + _T = TypeVar("_T") if TYPE_CHECKING: @@ -708,12 +713,13 @@ async def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 - self._transaction_retry_backoffs = [] + last_error = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) - self._transaction_retry_backoffs.append(backoff) + if _would_exceed_time_limit(start_time, backoff): + raise last_error await asyncio.sleep(backoff) retry += 1 await self.start_transaction( @@ -723,6 +729,7 @@ async def callback(session, custom_arg, custom_kwarg=None): ret = await callback(self) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as exc: + last_error = exc if self.in_transaction: await self.abort_transaction() if ( diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 2237645fba..37cbe2661f 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -479,6 +479,11 @@ def _within_time_limit(start_time: float) -> bool: return time.monotonic() - start_time < _WITH_TRANSACTION_RETRY_TIME_LIMIT +def _would_exceed_time_limit(start_time: float, backoff: float) -> bool: + """Is the backoff within the with_transaction retry limit?""" + return time.monotonic() + backoff - start_time >= _WITH_TRANSACTION_RETRY_TIME_LIMIT + + _T = TypeVar("_T") if TYPE_CHECKING: @@ -706,10 +711,13 @@ def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 + last_error = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + if _would_exceed_time_limit(start_time, backoff): + raise last_error time.sleep(backoff) retry += 1 self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms) @@ -717,6 +725,7 @@ def callback(session, custom_arg, custom_kwarg=None): ret = callback(self) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as exc: + last_error = exc if self.in_transaction: self.abort_transaction() if ( diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 314e8b209c..5b33ad92a4 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -16,6 +16,7 @@ from __future__ import annotations import asyncio +import random import sys import time from io import BytesIO @@ -45,6 +46,7 @@ ) from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.cursor import AsyncCursor +from pymongo.asynchronous.helpers import anext from pymongo.errors import ( AutoReconnect, CollectionInvalid, @@ -618,13 +620,51 @@ async def callback(session): await s.with_transaction(callback) self.assertFalse(s.in_transaction) + @async_client_context.require_test_commands + @async_client_context.require_transactions + async def test_transaction_backoff_is_random(self): + client = async_client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + await self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addAsyncCleanup( + self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} + ) + + start = time.monotonic() + + async def callback(session): + await coll.insert_one({}, session=session) + + async with self.client.start_session() as s: + await s.with_transaction(callback) + + end = time.monotonic() + self.assertLess(end - start, 5) # backoff alone is ~3.5 seconds + @async_client_context.require_test_commands @async_client_context.require_transactions async def test_transaction_backoff(self): client = async_client_context.client coll = client[self.db.name].test - # optionally set _backoff_initial to a higher value - _set_backoff_initial(client_session._BACKOFF_MAX) + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + random.random = always_one # set fail point to trigger transaction failure and trigger backoff await self.set_fail_point( { @@ -651,7 +691,11 @@ async def callback(session): await s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual(end - start, 1.25) # 1 second + self.assertGreaterEqual( + end - start, 3.5629515313825695 + ) # sum of backoffs is 3.5629515313825695 + + random.random = _original_random_random class TestOptionsInsideTransactionProse(AsyncTransactionsBase): diff --git a/test/test_transactions.py b/test/test_transactions.py index 83767d655d..cc11576a81 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -16,6 +16,7 @@ from __future__ import annotations import asyncio +import random import sys import time from io import BytesIO @@ -56,6 +57,7 @@ ) from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.cursor import Cursor +from pymongo.synchronous.helpers import next _IS_SYNC = True @@ -606,13 +608,49 @@ def callback(session): s.with_transaction(callback) self.assertFalse(s.in_transaction) + @client_context.require_test_commands + @client_context.require_transactions + def test_transaction_backoff_is_random(self): + client = client_context.client + coll = client[self.db.name].test + # set fail point to trigger transaction failure and trigger backoff + self.set_fail_point( + { + "configureFailPoint": "failCommand", + "mode": { + "times": 30 + }, # sufficiently high enough such that the time effect of backoff is noticeable + "data": { + "failCommands": ["commitTransaction"], + "errorCode": 24, + }, + } + ) + self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) + + start = time.monotonic() + + def callback(session): + coll.insert_one({}, session=session) + + with self.client.start_session() as s: + s.with_transaction(callback) + + end = time.monotonic() + self.assertLess(end - start, 5) # backoff alone is ~3.5 seconds + @client_context.require_test_commands @client_context.require_transactions def test_transaction_backoff(self): client = client_context.client coll = client[self.db.name].test - # optionally set _backoff_initial to a higher value - _set_backoff_initial(client_session._BACKOFF_MAX) + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + random.random = always_one # set fail point to trigger transaction failure and trigger backoff self.set_fail_point( { @@ -637,7 +675,11 @@ def callback(session): s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual(end - start, 1.25) # 1 second + self.assertGreaterEqual( + end - start, 3.5629515313825695 + ) # sum of backoffs is 3.5629515313825695 + + random.random = _original_random_random class TestOptionsInsideTransactionProse(TransactionsBase): From 344286669897ae0f00b716ca3551dc14ca7e3c23 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 30 Oct 2025 09:04:29 -0700 Subject: [PATCH 10/19] round seconds --- test/asynchronous/test_transactions.py | 6 ++---- test/test_transactions.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 5b33ad92a4..8b62684e7f 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -651,7 +651,7 @@ async def callback(session): await s.with_transaction(callback) end = time.monotonic() - self.assertLess(end - start, 5) # backoff alone is ~3.5 seconds + self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds @async_client_context.require_test_commands @async_client_context.require_transactions @@ -691,9 +691,7 @@ async def callback(session): await s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual( - end - start, 3.5629515313825695 - ) # sum of backoffs is 3.5629515313825695 + self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 random.random = _original_random_random diff --git a/test/test_transactions.py b/test/test_transactions.py index cc11576a81..f3072d8150 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -637,7 +637,7 @@ def callback(session): s.with_transaction(callback) end = time.monotonic() - self.assertLess(end - start, 5) # backoff alone is ~3.5 seconds + self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds @client_context.require_test_commands @client_context.require_transactions @@ -675,9 +675,7 @@ def callback(session): s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual( - end - start, 3.5629515313825695 - ) # sum of backoffs is 3.5629515313825695 + self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 random.random = _original_random_random From 8ce9d7fcd42310833c0ef88aebc06d411c9e2a36 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 30 Oct 2025 09:04:42 -0700 Subject: [PATCH 11/19] fix typing --- pymongo/asynchronous/client_session.py | 3 ++- pymongo/synchronous/client_session.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 1c9c0663f1..87b81a4f34 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -713,12 +713,13 @@ async def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 - last_error = None + last_error: Optional[BaseException] = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) if _would_exceed_time_limit(start_time, backoff): + assert last_error is not None raise last_error await asyncio.sleep(backoff) retry += 1 diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 37cbe2661f..617d99ef67 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -711,12 +711,13 @@ def callback(session, custom_arg, custom_kwarg=None): """ start_time = time.monotonic() retry = 0 - last_error = None + last_error: Optional[BaseException] = None while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) if _would_exceed_time_limit(start_time, backoff): + assert last_error is not None raise last_error time.sleep(backoff) retry += 1 From 678c66f5fde70fcd3646ccd803e5491f9a89b857 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Wed, 12 Nov 2025 12:58:17 -0800 Subject: [PATCH 12/19] fix test and make comment more specific --- test/asynchronous/test_transactions.py | 4 ++-- test/test_transactions.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 8b62684e7f..6d48105262 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -651,7 +651,7 @@ async def callback(session): await s.with_transaction(callback) end = time.monotonic() - self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds + self.assertLess(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 @async_client_context.require_test_commands @async_client_context.require_transactions @@ -691,7 +691,7 @@ async def callback(session): await s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 + self.assertGreaterEqual(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 random.random = _original_random_random diff --git a/test/test_transactions.py b/test/test_transactions.py index f3072d8150..cb972ca94a 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -637,7 +637,7 @@ def callback(session): s.with_transaction(callback) end = time.monotonic() - self.assertLess(end - start, 5) # sum of backoffs is ~3.5 seconds + self.assertLess(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 @client_context.require_test_commands @client_context.require_transactions @@ -675,7 +675,7 @@ def callback(session): s.with_transaction(callback) end = time.monotonic() - self.assertGreaterEqual(end - start, 3.5) # sum of backoffs is 3.5629515313825695 + self.assertGreaterEqual(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 random.random = _original_random_random From 782aa2a7c341dfce3bdb197bbd6e91dc05c47e06 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Thu, 13 Nov 2025 15:54:55 -0800 Subject: [PATCH 13/19] fix backoff constants and test --- pymongo/asynchronous/client_session.py | 4 +-- pymongo/synchronous/client_session.py | 4 +-- test/asynchronous/test_transactions.py | 41 ++++++++++---------------- test/test_transactions.py | 41 ++++++++++---------------- 4 files changed, 36 insertions(+), 54 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 87b81a4f34..27f26c35b2 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -473,7 +473,7 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 _BACKOFF_MAX = 0.500 # 500ms max backoff -_BACKOFF_INITIAL = 0.001 # 1ms initial backoff +_BACKOFF_INITIAL = 0.005 # 5ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -717,7 +717,7 @@ async def callback(session, custom_arg, custom_kwarg=None): while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.5**retry), _BACKOFF_MAX) if _would_exceed_time_limit(start_time, backoff): assert last_error is not None raise last_error diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 617d99ef67..28999bcd62 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -471,7 +471,7 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 _BACKOFF_MAX = 0.500 # 500ms max backoff -_BACKOFF_INITIAL = 0.001 # 1ms initial backoff +_BACKOFF_INITIAL = 0.005 # 5ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -715,7 +715,7 @@ def callback(session, custom_arg, custom_kwarg=None): while True: if retry: # Implement exponential backoff on retry. jitter = random.random() # noqa: S311 - backoff = jitter * min(_BACKOFF_INITIAL * (1.25**retry), _BACKOFF_MAX) + backoff = jitter * min(_BACKOFF_INITIAL * (1.5**retry), _BACKOFF_MAX) if _would_exceed_time_limit(start_time, backoff): assert last_error is not None raise last_error diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 6d48105262..18e28001f8 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -622,15 +622,25 @@ async def callback(session): @async_client_context.require_test_commands @async_client_context.require_transactions - async def test_transaction_backoff_is_random(self): + async def test_transaction_backoff(self): client = async_client_context.client coll = client[self.db.name].test + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + def always_zero(): + return 0 + + random.random = always_zero # set fail point to trigger transaction failure and trigger backoff await self.set_fail_point( { "configureFailPoint": "failCommand", "mode": { - "times": 30 + "times": 13 }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], @@ -642,27 +652,14 @@ async def test_transaction_backoff_is_random(self): self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} ) - start = time.monotonic() - async def callback(session): await coll.insert_one({}, session=session) + start = time.monotonic() async with self.client.start_session() as s: await s.with_transaction(callback) - end = time.monotonic() - self.assertLess(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 - - @async_client_context.require_test_commands - @async_client_context.require_transactions - async def test_transaction_backoff(self): - client = async_client_context.client - coll = client[self.db.name].test - # patch random to make it deterministic - _original_random_random = random.random - - def always_one(): - return 1 + no_backoff_time = end - start random.random = always_one # set fail point to trigger transaction failure and trigger backoff @@ -670,7 +667,7 @@ def always_one(): { "configureFailPoint": "failCommand", "mode": { - "times": 30 + "times": 13 }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], @@ -681,17 +678,11 @@ def always_one(): self.addAsyncCleanup( self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"} ) - start = time.monotonic() - - async def callback(session): - await coll.insert_one({}, session=session) - async with self.client.start_session() as s: await s.with_transaction(callback) - end = time.monotonic() - self.assertGreaterEqual(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 + self.assertLess(abs(end - start - (no_backoff_time + 2.2)), 1) # sum of 13 backoffs is 2.2 random.random = _original_random_random diff --git a/test/test_transactions.py b/test/test_transactions.py index cb972ca94a..a8969250c6 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -610,15 +610,25 @@ def callback(session): @client_context.require_test_commands @client_context.require_transactions - def test_transaction_backoff_is_random(self): + def test_transaction_backoff(self): client = client_context.client coll = client[self.db.name].test + # patch random to make it deterministic + _original_random_random = random.random + + def always_one(): + return 1 + + def always_zero(): + return 0 + + random.random = always_zero # set fail point to trigger transaction failure and trigger backoff self.set_fail_point( { "configureFailPoint": "failCommand", "mode": { - "times": 30 + "times": 13 }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], @@ -628,27 +638,14 @@ def test_transaction_backoff_is_random(self): ) self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) - start = time.monotonic() - def callback(session): coll.insert_one({}, session=session) + start = time.monotonic() with self.client.start_session() as s: s.with_transaction(callback) - end = time.monotonic() - self.assertLess(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 - - @client_context.require_test_commands - @client_context.require_transactions - def test_transaction_backoff(self): - client = client_context.client - coll = client[self.db.name].test - # patch random to make it deterministic - _original_random_random = random.random - - def always_one(): - return 1 + no_backoff_time = end - start random.random = always_one # set fail point to trigger transaction failure and trigger backoff @@ -656,7 +653,7 @@ def always_one(): { "configureFailPoint": "failCommand", "mode": { - "times": 30 + "times": 13 }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], @@ -665,17 +662,11 @@ def always_one(): } ) self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) - start = time.monotonic() - - def callback(session): - coll.insert_one({}, session=session) - with self.client.start_session() as s: s.with_transaction(callback) - end = time.monotonic() - self.assertGreaterEqual(end - start, 3.5) # sum of 30 backoffs are 3.5629515313825695 + self.assertLess(abs(end - start - (no_backoff_time + 2.2)), 1) # sum of 13 backoffs is 2.2 random.random = _original_random_random From 405aed4bbffd16bfe9102c6096b3819b1303561f Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Mon, 24 Nov 2025 10:56:10 -0800 Subject: [PATCH 14/19] remove unnecessary files --- summarize.py | 65 -------------------------- withTransaction.py | 111 --------------------------------------------- 2 files changed, 176 deletions(-) delete mode 100644 summarize.py delete mode 100644 withTransaction.py diff --git a/summarize.py b/summarize.py deleted file mode 100644 index bcac39512a..0000000000 --- a/summarize.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -import csv -import pprint -import re -from collections import defaultdict - - -def testing_n_threads(f_in, data): - threads = re.match(r"Testing (?P.*) threads", f_in.readline()).group("n_threads") - seconds = re.match( - r"All threads completed after (?P.*) seconds", f_in.readline() - ).group("n_seconds") - tries = re.match(r"Total number of retry attempts: (?P.*)", f_in.readline()).group( - "n_tries" - ) - data[f"{threads}_sec"].append(float(seconds)) - data[f"{threads}_try"].append(int(tries)) - return data - - -def read_table(f_in, data): - # Initialize the CSV reader with the pipe '|' as the delimiter - reader = csv.reader(f_in, delimiter="|") - next(reader) # skip header - - for row in reader: - if "threads " in row: - continue - row = [col.strip() for col in row] # noqa:PLW2901 - if row == []: - continue - # Convert numbers to appropriate types (int for threads, float for statistics) - threads = int(row[0]) - avg, p50, p90, p99, p100 = map(float, row[1:]) - # Append the parsed row to the list - data[f"{threads}_avg"].append(avg) - data[f"{threads}_p50"].append(p50) - data[f"{threads}_p90"].append(p90) - data[f"{threads}_p99"].append(p99) - data[f"{threads}_p100"].append(p100) - return data - - -path = "/Users/iris.ho/Github/backpressure/final" -files = ["main", "local_original_1.5", "local_original_2", "local_server_algo"] -print_data = {} -pp = pprint.PrettyPrinter(width=80) -THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] -for f in files: - data = defaultdict(list) - with open(f"{path}/{f}.txt") as f_in: - for _ in THREADS: - data = testing_n_threads(f_in, data) - f_in.readline() - f_in.readline() - data = read_table(f_in, data) - print_data[f] = { - "avg": [data[f"{thread}_avg"] for thread in THREADS], - "p50": [data[f"{thread}_p50"] for thread in THREADS], - "p90": [data[f"{thread}_p90"] for thread in THREADS], - "p99": [data[f"{thread}_p99"] for thread in THREADS], - "p100": [data[f"{thread}_p100"] for thread in THREADS], - } -print(print_data) # noqa: T201 diff --git a/withTransaction.py b/withTransaction.py deleted file mode 100644 index a24f950c0c..0000000000 --- a/withTransaction.py +++ /dev/null @@ -1,111 +0,0 @@ -from __future__ import annotations - -import os -import time -from concurrent.futures import ThreadPoolExecutor - -from pymongo import MongoClient - - -class RunOrderTransaction: - def __init__(self, client): - super(RunOrderTransaction, self).__init__() # noqa:UP008 - self.retry_attempts = -1 - self.time = 0 - self.client = client - - def run(self): - start = time.time() - with self.client.start_session() as session: - try: - session.with_transaction(self.callback) - finally: - self.time = time.time() - start - return self - - def callback(self, session): - self.retry_attempts += 1 - return callback(session, self.client) - - -def callback(session, client): - order_id = client.test.orders1.insert_one({"sku": "foo", "qty": 1}, session=session).inserted_id - res = client.test.inventory1.update_one( - {"sku": "foo", "qty": {"$gte": 1}}, {"$inc": {"qty": -1}}, session=session - ) - if not res.modified_count: - raise TypeError("Insufficient inventory count") - - return order_id - - -def run(num_threads: int, local: bool): - if local: - client = MongoClient() - else: - client = MongoClient(os.getenv("ATLAS_URI")) - try: - client.drop_database("test") - except Exception: # noqa: S110 - # fails on atlas? - pass - db = client.test - db.drop_collection("orders1") - db.create_collection("orders1") - db.drop_collection("inventory1") - inventory = db.create_collection("inventory1") - inventory.insert_one({"sku": "foo", "qty": 1000000}) - - f.write("Testing %s threads\n" % num_threads) - start = time.time() - N_TXNS = 512 - results = [] - ops = [RunOrderTransaction(client) for _ in range(N_TXNS)] - with ThreadPoolExecutor(max_workers=num_threads) as exc: - futures = [exc.submit(op.run) for op in ops] - for future in futures: - result = future.result() - results.append(result) - - end = time.time() - total_time = end - start - total_attempts = sum(r.retry_attempts for r in results) - - f.write("All threads completed after %s seconds\n" % (end - start)) - f.write(f"Total number of retry attempts: {total_attempts}\n") - client.close() - - latencies = sorted(r.time for r in results) - avg_latency = sum(latencies) / N_TXNS - p50 = latencies[int(N_TXNS * 0.5)] - p90 = latencies[int(N_TXNS * 0.9)] - p99 = latencies[int(N_TXNS * 0.99)] - p100 = latencies[int(N_TXNS * 1.0) - 1] - # print(f'avg latency: {avg_latency:.2f}s p50: {p50:.2f}s p90: {p90:.2f}s p99: {p99:.2f}s p100: {p100:.2f}s') - return total_time, total_attempts, avg_latency, p50, p90, p99, p100 - - -def main(f, local=True): - NUM_THREADS = [1, 2, 4, 8, 16, 32, 64, 128, 256] - data = {} - for num in NUM_THREADS: - times, attempts, avg_latency, p50, p90, p99, p100 = run(num, local) - data[num] = { - "avg": avg_latency, - "p50": p50, - "p90": p90, - "p99": p99, - "p100": p100, - } - f.write("\n") - time.sleep(10) - f.write("\nthreads | avg | p50 | p90 | p99 | p100\n") - for num in NUM_THREADS: - f.write( - f"{num:7} | {data[num]['avg']:5.2f} | {data[num]['p50']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p90']:5.2f} | {data[num]['p100']:5.2f}\n" - ) - - -if __name__ == "__main__": - with open("/Users/iris.ho/Github/backpressure/final/local_original_1.5.txt", "w") as f: - main(f, local=True) From 1060137e01e5b864416541a0a0ed0e3fd9e0d7c4 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Mon, 24 Nov 2025 12:41:56 -0800 Subject: [PATCH 15/19] rename tests and add reasoning --- test/asynchronous/test_transactions.py | 24 ++++++++++++------------ test/test_transactions.py | 24 ++++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index 18e28001f8..cef0575abf 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -448,7 +448,7 @@ async def set_fail_point(self, command_args): await self.configure_fail_point(client, command_args) @async_client_context.require_transactions - async def test_callback_raises_custom_error(self): + async def test_1_callback_raises_custom_error(self): class _MyException(Exception): pass @@ -460,7 +460,7 @@ async def raise_error(_): await s.with_transaction(raise_error) @async_client_context.require_transactions - async def test_callback_returns_value(self): + async def test_2_callback_returns_value(self): async def callback(_): return "Foo" @@ -488,7 +488,7 @@ def callback(_): self.assertEqual(await s.with_transaction(callback), "Foo") @async_client_context.require_transactions - async def test_callback_not_retried_after_timeout(self): + async def test_3_1_callback_not_retried_after_timeout(self): listener = OvertCommandListener() client = await self.async_rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -516,7 +516,7 @@ async def callback(session): @async_client_context.require_test_commands @async_client_context.require_transactions - async def test_callback_not_retried_after_commit_timeout(self): + async def test_3_2_callback_not_retried_after_commit_timeout(self): listener = OvertCommandListener() client = await self.async_rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -550,7 +550,7 @@ async def callback(session): @async_client_context.require_test_commands @async_client_context.require_transactions - async def test_commit_not_retried_after_timeout(self): + async def test_3_3_commit_not_retried_after_timeout(self): listener = OvertCommandListener() client = await self.async_rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -622,10 +622,12 @@ async def callback(session): @async_client_context.require_test_commands @async_client_context.require_transactions - async def test_transaction_backoff(self): + async def test_4_retry_backoff_is_enforced(self): client = async_client_context.client coll = client[self.db.name].test - # patch random to make it deterministic + # patch random to make it deterministic -- once to effectively have + # no backoff and the second time with "max" backoff (always waiting the longest + # possible time) _original_random_random = random.random def always_one(): @@ -639,12 +641,10 @@ def always_zero(): await self.set_fail_point( { "configureFailPoint": "failCommand", - "mode": { - "times": 13 - }, # sufficiently high enough such that the time effect of backoff is noticeable + "mode": {"times": 13}, "data": { "failCommands": ["commitTransaction"], - "errorCode": 24, + "errorCode": 251, }, } ) @@ -671,7 +671,7 @@ async def callback(session): }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], - "errorCode": 24, + "errorCode": 251, }, } ) diff --git a/test/test_transactions.py b/test/test_transactions.py index a8969250c6..42796caca4 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -440,7 +440,7 @@ def set_fail_point(self, command_args): self.configure_fail_point(client, command_args) @client_context.require_transactions - def test_callback_raises_custom_error(self): + def test_1_callback_raises_custom_error(self): class _MyException(Exception): pass @@ -452,7 +452,7 @@ def raise_error(_): s.with_transaction(raise_error) @client_context.require_transactions - def test_callback_returns_value(self): + def test_2_callback_returns_value(self): def callback(_): return "Foo" @@ -480,7 +480,7 @@ def callback(_): self.assertEqual(s.with_transaction(callback), "Foo") @client_context.require_transactions - def test_callback_not_retried_after_timeout(self): + def test_3_1_callback_not_retried_after_timeout(self): listener = OvertCommandListener() client = self.rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -508,7 +508,7 @@ def callback(session): @client_context.require_test_commands @client_context.require_transactions - def test_callback_not_retried_after_commit_timeout(self): + def test_3_2_callback_not_retried_after_commit_timeout(self): listener = OvertCommandListener() client = self.rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -540,7 +540,7 @@ def callback(session): @client_context.require_test_commands @client_context.require_transactions - def test_commit_not_retried_after_timeout(self): + def test_3_3_commit_not_retried_after_timeout(self): listener = OvertCommandListener() client = self.rs_client(event_listeners=[listener]) coll = client[self.db.name].test @@ -610,10 +610,12 @@ def callback(session): @client_context.require_test_commands @client_context.require_transactions - def test_transaction_backoff(self): + def test_4_retry_backoff_is_enforced(self): client = client_context.client coll = client[self.db.name].test - # patch random to make it deterministic + # patch random to make it deterministic -- once to effectively have + # no backoff and the second time with "max" backoff (always waiting the longest + # possible time) _original_random_random = random.random def always_one(): @@ -627,12 +629,10 @@ def always_zero(): self.set_fail_point( { "configureFailPoint": "failCommand", - "mode": { - "times": 13 - }, # sufficiently high enough such that the time effect of backoff is noticeable + "mode": {"times": 13}, "data": { "failCommands": ["commitTransaction"], - "errorCode": 24, + "errorCode": 251, }, } ) @@ -657,7 +657,7 @@ def callback(session): }, # sufficiently high enough such that the time effect of backoff is noticeable "data": { "failCommands": ["commitTransaction"], - "errorCode": 24, + "errorCode": 251, }, } ) From 40d8ff6493c4e8f4e728f80cec4a6518671bf1b5 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Tue, 25 Nov 2025 10:26:02 -0800 Subject: [PATCH 16/19] Revert "PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers (#2509)" This reverts commit a99645eca58859647594caf92ca9cfdfcb79036b. --- test/connection_logging/connection-logging.json | 8 ++------ .../unified/auth-network-error.json | 6 +----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 60190c7dc0..5799e834d7 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,9 +331,7 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500 + "heartbeatFrequencyMS": 10000 }, "observeLogMessages": { "connection": "debug" @@ -357,9 +355,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "clientAppName" } } diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 656b291366..84763af32e 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,9 +53,7 @@ "failCommands": [ "saslContinue" ], - "closeConnection": false, - "blockConnection": true, - "blockTimeMS": 1000, + "closeConnection": true, "appName": "authNetworkErrorTest" } } @@ -77,8 +75,6 @@ ], "uriOptions": { "retryWrites": false, - "socketTimeoutMS": 500, - "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } From 9fe84d220dcbc3ae4e6c9dec25c72a4472b2c21e Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Tue, 25 Nov 2025 10:26:15 -0800 Subject: [PATCH 17/19] Revert "PYTHON-5506 Prototype adaptive token bucket retry (#2501)" This reverts commit 1820f97e8d810019054945aff06930c23e1e16d5. --- test/asynchronous/test_backpressure.py | 66 -------------------------- test/test_backpressure.py | 66 -------------------------- 2 files changed, 132 deletions(-) diff --git a/test/asynchronous/test_backpressure.py b/test/asynchronous/test_backpressure.py index 871b369ecf..11f8edde67 100644 --- a/test/asynchronous/test_backpressure.py +++ b/test/asynchronous/test_backpressure.py @@ -182,72 +182,6 @@ async def test_limit_retry_command(self): self.assertIn("RetryableError", str(error.exception)) -class TestRetryPolicy(AsyncPyMongoTestCase): - async def test_retry_policy(self): - capacity = 10 - retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) - self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) - self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) - self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) - for i in range(1, helpers._MAX_RETRIES + 1): - self.assertTrue(await retry_policy.should_retry(i, 0)) - self.assertFalse(await retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) - for i in range(capacity - helpers._MAX_RETRIES): - self.assertTrue(await retry_policy.should_retry(1, 0)) - # No tokens left, should not retry. - self.assertFalse(await retry_policy.should_retry(1, 0)) - self.assertEqual(retry_policy.token_bucket.tokens, 0) - - # record_success should generate tokens. - for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): - await retry_policy.record_success(retry=False) - self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) - for i in range(2): - self.assertTrue(await retry_policy.should_retry(1, 0)) - self.assertFalse(await retry_policy.should_retry(1, 0)) - - # Recording a successful retry should return 1 additional token. - await retry_policy.record_success(retry=True) - self.assertAlmostEqual( - retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN - ) - self.assertTrue(await retry_policy.should_retry(1, 0)) - self.assertFalse(await retry_policy.should_retry(1, 0)) - self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) - - async def test_retry_policy_csot(self): - retry_policy = _RetryPolicy(_TokenBucket()) - self.assertTrue(await retry_policy.should_retry(1, 0.5)) - with pymongo.timeout(0.5): - self.assertTrue(await retry_policy.should_retry(1, 0)) - self.assertTrue(await retry_policy.should_retry(1, 0.1)) - # Would exceed the timeout, should not retry. - self.assertFalse(await retry_policy.should_retry(1, 1.0)) - self.assertTrue(await retry_policy.should_retry(1, 1.0)) - - @async_client_context.require_failCommand_appName - async def test_limit_retry_command(self): - client = await self.async_rs_or_single_client() - client._retry_policy.token_bucket.tokens = 1 - db = client.pymongo_test - await db.t.insert_one({"x": 1}) - - # Ensure command is retried once overload error. - fail_many = mock_overload_error.copy() - fail_many["mode"] = {"times": 1} - async with self.fail_point(fail_many): - await db.command("find", "t") - - # Ensure command stops retrying when there are no tokens left. - fail_too_many = mock_overload_error.copy() - fail_too_many["mode"] = {"times": 2} - async with self.fail_point(fail_too_many): - with self.assertRaises(PyMongoError) as error: - await db.command("find", "t") - - self.assertIn("Retryable", str(error.exception)) - - class TestRetryPolicy(AsyncPyMongoTestCase): async def test_retry_policy(self): capacity = 10 diff --git a/test/test_backpressure.py b/test/test_backpressure.py index 0a145b32fc..fac1d6236d 100644 --- a/test/test_backpressure.py +++ b/test/test_backpressure.py @@ -182,72 +182,6 @@ def test_limit_retry_command(self): self.assertIn("RetryableError", str(error.exception)) -class TestRetryPolicy(PyMongoTestCase): - def test_retry_policy(self): - capacity = 10 - retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) - self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) - self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) - self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) - for i in range(1, helpers._MAX_RETRIES + 1): - self.assertTrue(retry_policy.should_retry(i, 0)) - self.assertFalse(retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) - for i in range(capacity - helpers._MAX_RETRIES): - self.assertTrue(retry_policy.should_retry(1, 0)) - # No tokens left, should not retry. - self.assertFalse(retry_policy.should_retry(1, 0)) - self.assertEqual(retry_policy.token_bucket.tokens, 0) - - # record_success should generate tokens. - for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): - retry_policy.record_success(retry=False) - self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) - for i in range(2): - self.assertTrue(retry_policy.should_retry(1, 0)) - self.assertFalse(retry_policy.should_retry(1, 0)) - - # Recording a successful retry should return 1 additional token. - retry_policy.record_success(retry=True) - self.assertAlmostEqual( - retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN - ) - self.assertTrue(retry_policy.should_retry(1, 0)) - self.assertFalse(retry_policy.should_retry(1, 0)) - self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) - - def test_retry_policy_csot(self): - retry_policy = _RetryPolicy(_TokenBucket()) - self.assertTrue(retry_policy.should_retry(1, 0.5)) - with pymongo.timeout(0.5): - self.assertTrue(retry_policy.should_retry(1, 0)) - self.assertTrue(retry_policy.should_retry(1, 0.1)) - # Would exceed the timeout, should not retry. - self.assertFalse(retry_policy.should_retry(1, 1.0)) - self.assertTrue(retry_policy.should_retry(1, 1.0)) - - @client_context.require_failCommand_appName - def test_limit_retry_command(self): - client = self.rs_or_single_client() - client._retry_policy.token_bucket.tokens = 1 - db = client.pymongo_test - db.t.insert_one({"x": 1}) - - # Ensure command is retried once overload error. - fail_many = mock_overload_error.copy() - fail_many["mode"] = {"times": 1} - with self.fail_point(fail_many): - db.command("find", "t") - - # Ensure command stops retrying when there are no tokens left. - fail_too_many = mock_overload_error.copy() - fail_too_many["mode"] = {"times": 2} - with self.fail_point(fail_too_many): - with self.assertRaises(PyMongoError) as error: - db.command("find", "t") - - self.assertIn("Retryable", str(error.exception)) - - class TestRetryPolicy(PyMongoTestCase): def test_retry_policy(self): capacity = 10 From 6665a4e570d3104702b8bc0678c1baafaee3d9b0 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Tue, 25 Nov 2025 12:28:11 -0800 Subject: [PATCH 18/19] fix tests -- i think when i rebased i did some funky stuff? --- test/asynchronous/test_transactions.py | 7 +------ test/test_transactions.py | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/test/asynchronous/test_transactions.py b/test/asynchronous/test_transactions.py index cef0575abf..18f9778463 100644 --- a/test/asynchronous/test_transactions.py +++ b/test/asynchronous/test_transactions.py @@ -39,14 +39,9 @@ from bson.raw_bson import RawBSONDocument from pymongo import WriteConcern, _csot from pymongo.asynchronous import client_session -from pymongo.asynchronous.client_session import ( - _BACKOFF_MAX, - TransactionOptions, - _set_backoff_initial, -) +from pymongo.asynchronous.client_session import TransactionOptions from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.cursor import AsyncCursor -from pymongo.asynchronous.helpers import anext from pymongo.errors import ( AutoReconnect, CollectionInvalid, diff --git a/test/test_transactions.py b/test/test_transactions.py index 42796caca4..94d70396fc 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -50,14 +50,9 @@ from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.synchronous import client_session -from pymongo.synchronous.client_session import ( - _BACKOFF_MAX, - TransactionOptions, - _set_backoff_initial, -) +from pymongo.synchronous.client_session import TransactionOptions from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.cursor import Cursor -from pymongo.synchronous.helpers import next _IS_SYNC = True From ef9d85c5fd5ae9a914805b8b0710671d23601ed4 Mon Sep 17 00:00:00 2001 From: Iris Ho Date: Tue, 2 Dec 2025 12:38:03 -0800 Subject: [PATCH 19/19] i thought i moved this but it turns out i didn't? bye graph.py --- graph.py | 82 -------------------------------------------------------- 1 file changed, 82 deletions(-) delete mode 100644 graph.py diff --git a/graph.py b/graph.py deleted file mode 100644 index ffb278f741..0000000000 --- a/graph.py +++ /dev/null @@ -1,82 +0,0 @@ -from __future__ import annotations - -import matplotlib.cm as mplcm -import matplotlib.colors as colors -import matplotlib.pyplot as plt -import numpy as np - -threads = [1, 2, 4, 8, 16, 32, 64, 128, 256] - -data = { - "local_main": { - "avg": [[0.01], [0.03], [0.05], [0.09], [0.21], [0.38], [0.95], [4.05], [11.11]], - "p50": [[0.01], [0.02], [0.03], [0.05], [0.12], [0.25], [0.67], [3.22], [9.8]], - "p90": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], - "p99": [[0.02], [0.05], [0.11], [0.23], [0.52], [0.94], [2.01], [8.88], [23.47]], - "p100": [[0.04], [0.21], [0.3], [0.58], [1.22], [2.59], [7.25], [17.03], [25.38]], - }, - "local_original_1.5": { - "avg": [[0.01], [0.02], [0.04], [0.09], [0.2], [0.35], [0.65], [1.14], [2.21]], - "p50": [[0.01], [0.01], [0.01], [0.02], [0.06], [0.09], [0.2], [0.45], [1.91]], - "p90": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], - "p99": [[0.02], [0.04], [0.11], [0.22], [0.49], [0.81], [1.87], [3.72], [4.76]], - "p100": [[0.04], [0.62], [1.11], [3.33], [4.71], [6.07], [5.64], [6.05], [6.76]], - }, - "local_original_2": { - "avg": [[0.01], [0.02], [0.04], [0.09], [0.18], [0.39], [0.63], [1.23], [2.28]], - "p50": [[0.01], [0.01], [0.01], [0.02], [0.02], [0.06], [0.17], [0.41], [1.9]], - "p90": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], - "p99": [[0.01], [0.02], [0.08], [0.21], [0.33], [1.24], [1.83], [3.82], [4.91]], - "p100": [[0.04], [1.3], [1.54], [3.07], [3.72], [5.55], [5.44], [6.42], [7.06]], - }, - "local_server_algo": { - "avg": [[0.01], [0.02], [0.05], [0.1], [0.19], [0.36], [0.73], [1.23], [2.19]], - "p50": [[0.01], [0.01], [0.01], [0.01], [0.03], [0.09], [0.19], [0.59], [2.04]], - "p90": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], - "p99": [[0.02], [0.04], [0.1], [0.22], [0.51], [1.07], [2.37], [3.58], [4.74]], - "p100": [[0.09], [0.65], [1.35], [2.87], [3.31], [4.4], [6.55], [5.84], [6.88]], - }, -} - - -metrics = ["avg", "p90", "p100"] -metric_titles = { - "avg": "Average Latency", - "p50": "p50 Latency", - "p90": "p90 Latency", - "p99": "p99 Latency", - "p100": "p100 (Max) Latency", -} - -plt.figure(figsize=(16, 4 * len(metrics))) -NUM_COLORS = len(data.keys()) + 1 -cm = plt.get_cmap("gist_rainbow") -cNorm = colors.Normalize(vmin=0, vmax=NUM_COLORS - 1) -scalarMap = mplcm.ScalarMappable(norm=cNorm, cmap=cm) -for i, metric in enumerate(metrics, 1): - if metric in ["avg"]: - ax = plt.subplot(len(metrics), 2, i) - else: - ax = plt.subplot(len(metrics), 2, (i - 1) * (2) + 1) - ax.set_prop_cycle(color=[scalarMap.to_rgba(i) for i in range(NUM_COLORS)]) - order = [] - for label, vals in data.items(): - if metric not in vals: - continue - arr = np.concatenate(np.around(np.array(vals[metric]), decimals=2)) - order.append(plt.plot(threads, arr, "o-", label=label)) - - plt.title(metric_titles[metric]) - plt.xscale("log", base=2) - plt.xlabel("Threads") - plt.ylabel("Seconds") - plt.xticks(threads, threads) - plt.grid(True, which="both", axis="x", linestyle="--", alpha=0.5) - plt.axhline(y=0, color="gray", linestyle="-") - if metric != "p90": - plt.legend().set_visible(False) - else: - plt.legend(loc=(1.01, 0.5), fontsize=8) - -plt.tight_layout() -plt.show()