From 8d8181493197011771ebacf066d841156c69f5c4 Mon Sep 17 00:00:00 2001 From: Pratiksha Zalte Date: Wed, 3 Dec 2025 19:14:54 +0530 Subject: [PATCH] fixed TestRoutingConnectionRecvTimeout tests for feature BACKEND_ROUTING_TABLE FETCH in configuration testkit --- src/Bolt/Session.php | 133 +++++++++++++++++- src/Neo4j/Neo4jConnectionPool.php | 60 +++++++- .../src/Handlers/ForcedRoutingTableUpdate.php | 7 +- .../src/Handlers/GetRoutingTable.php | 17 ++- testkit-backend/testkit.sh | 3 + 5 files changed, 208 insertions(+), 12 deletions(-) diff --git a/src/Bolt/Session.php b/src/Bolt/Session.php index 92ced82a..c8bd0e65 100644 --- a/src/Bolt/Session.php +++ b/src/Bolt/Session.php @@ -29,6 +29,7 @@ use Laudis\Neo4j\Databags\TransactionConfiguration; use Laudis\Neo4j\Enum\AccessMode; use Laudis\Neo4j\Exception\Neo4jException; +use Laudis\Neo4j\Exception\TimeoutException; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; use Laudis\Neo4j\Neo4j\Neo4jConnectionPool; use Laudis\Neo4j\Types\CypherList; @@ -74,12 +75,80 @@ public function runStatements(iterable $statements, ?TransactionConfiguration $c $this->getLogger()?->log(LogLevel::INFO, 'Running statements', ['statements' => $statements]); $config = $this->mergeTsxConfig($config); foreach ($statements as $statement) { - $tbr[] = $this->beginInstantTransaction($this->config, $config)->runStatement($statement); + $tbr[] = $this->runStatementWithRetry($statement, $config); } return new CypherList($tbr); } + /** + * Runs a single statement with retry logic to handle socket timeouts and routing errors. + * For routing drivers, this is essential to refresh the routing table on timeout. + * + * @throws Exception + */ + private function runStatementWithRetry(Statement $statement, TransactionConfiguration $config): SummarizedResult + { + while (true) { + $transaction = null; + try { + $transaction = $this->beginInstantTransaction($this->config, $config); + $result = $transaction->runStatement($statement); + // Trigger lazy loading of results to catch any timeout during result iteration + self::triggerLazyResult($result); + + return $result; + } catch (TimeoutException $e) { + // Socket timeout - clear routing table and retry + if ($transaction) { + try { + $transaction->rollback(); + } catch (Exception $rollbackException) { + // Ignore rollback errors during timeout + } + } + + // Close broken connection so it won't be reused + foreach ($this->usedConnections as $i => $usedConnection) { + try { + $usedConnection->close(); + array_splice($this->usedConnections, $i, 1); + } catch (Exception $closeException) { + // Ignore close errors + } + } + + if ($this->pool instanceof Neo4jConnectionPool) { + $this->pool->clearRoutingTable(); + } + // Continue retry loop + } catch (Neo4jException $e) { + if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) { + try { + $transaction->rollback(); + } catch (Exception $rollbackException) { + // Ignore rollback errors + } + } + + if ($this->isSocketTimeoutError($e)) { + // When socket timeout occurs, clear routing table to force re-fetch with fresh server list + if ($this->pool instanceof Neo4jConnectionPool) { + $this->pool->clearRoutingTable(); + } + // Continue retry loop + } elseif ($e->getTitle() === 'NotALeader') { + // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched + $this->pool->close(); + // Continue retry loop + } elseif ($e->getClassification() !== 'TransientError') { + throw $e; + } + // For other transient errors, continue retry loop + } + } + } + /** * @param iterable|null $statements */ @@ -136,12 +205,41 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio $transaction->commit(); return $tbr; + } catch (TimeoutException $e) { + // Socket timeout - clear routing table and retry + if ($transaction) { + try { + $transaction->rollback(); + } catch (Exception $rollbackException) { + // Ignore rollback errors during timeout + } + } + + // Close broken connection so it won't be reused + foreach ($this->usedConnections as $i => $usedConnection) { + try { + $usedConnection->close(); + array_splice($this->usedConnections, $i, 1); + } catch (Exception $closeException) { + // Ignore close errors + } + } + + if ($this->pool instanceof Neo4jConnectionPool) { + $this->pool->clearRoutingTable(); + } + // Continue retry loop } catch (Neo4jException $e) { if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) { $transaction->rollback(); } - if ($e->getTitle() === 'NotALeader') { + if ($this->isSocketTimeoutError($e)) { + // When socket timeout occurs, clear routing table to force re-fetch with fresh server list + if ($this->pool instanceof Neo4jConnectionPool) { + $this->pool->clearRoutingTable(); + } + } elseif ($e->getTitle() === 'NotALeader') { // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched $this->pool->close(); } elseif ($e->getClassification() !== 'TransientError') { @@ -151,6 +249,37 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio } } + /** + * Checks if an exception represents a socket timeout or connection-related failure + * that requires routing table refresh. + * + * @param Neo4jException $e The exception to check + * + * @return bool True if this is a socket timeout or connection failure + */ + private function isSocketTimeoutError(Neo4jException $e): bool + { + $title = $e->getTitle(); + $classification = $e->getClassification(); + + // Check if this was caused by a timeout exception in the bolt library + // Timeout exceptions are wrapped in Neo4jException with NotALeader title, + // but we can detect them by checking the previous exception message + $previous = $e->getPrevious(); + if ($previous !== null) { + $prevMessage = strtolower($previous->getMessage()); + if (str_contains($prevMessage, 'timeout') || str_contains($prevMessage, 'time out')) { + return true; + } + } + + // Socket timeout errors should be treated as transient and trigger routing table refresh + return in_array($title, [ + 'ServiceUnavailable', + 'FailedToRoute', + ], true) || $classification === 'TransientError'; + } + private static function triggerLazyResult(mixed $tbr): void { if ($tbr instanceof CypherSequence) { diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index b69b0b63..2703b1ca 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -64,6 +64,14 @@ final class Neo4jConnectionPool implements ConnectionPoolInterface /** @var array */ private static array $pools = []; + /** + * Registry of routing tables per database. + * Maps database name -> RoutingTable. + * + * @var array + */ + private array $routingTableRegistry = []; + /** * @psalm-mutation-free */ @@ -182,6 +190,50 @@ public function getLogger(): ?Neo4jLogger return $this->logger; } + /** + * Returns the routing table for a specific database, or null if not yet initialized. + * This method is intended for testkit backend access to routing table information. + * + * @param string $database The database name to retrieve routing table for + * + * @return RoutingTable|null The routing table if available, null otherwise + */ + public function getRoutingTable(string $database = 'neo4j'): ?RoutingTable + { + return $this->routingTableRegistry[$database] ?? null; + } + + /** + * Returns the complete routing table registry for all databases. + * This method is intended for testkit backend access to routing information. + * + * @return array Map of database name to RoutingTable + */ + public function getRoutingTableRegistry(): array + { + return $this->routingTableRegistry; + } + + /** + * Clears the routing table registry for a specific database or all databases. + * This is used to force a routing table refresh on the next session. + * + * @param string|null $database Database to clear, or null to clear all + */ + public function clearRoutingTable(?string $database = null): void + { + if ($database === null) { + $this->routingTableRegistry = []; + // Also clear the entire cache to force routing table refresh + $this->cache->clear(); + } else { + unset($this->routingTableRegistry[$database]); + // Also clear the specific cache key for this database + $key = $this->createKey($this->data, new SessionConfiguration(database: $database)); + $this->cache->delete($key); + } + } + /** * @throws Exception */ @@ -212,7 +264,13 @@ private function routingTable(BoltConnection $connection, SessionConfiguration $ ['servers' => $servers, 'ttl' => $ttl] = $route['rt']; $ttl += time(); - return new RoutingTable($servers, $ttl); + $table = new RoutingTable($servers, $ttl); + + // Store in routing table registry for testkit access + $database = $config->getDatabase() ?? 'neo4j'; + $this->routingTableRegistry[$database] = $table; + + return $table; } public function release(ConnectionInterface $connection): void diff --git a/testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php b/testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php index 43aa515a..2d71ea70 100644 --- a/testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php +++ b/testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php @@ -53,9 +53,10 @@ public function handle($request): TestkitResponseInterface /** @var ConnectionPoolInterface $pool */ $pool = $poolProperty->getValue($driver); - $tableProperty = (new ReflectionClass(Neo4jConnectionPool::class))->getProperty('table'); - $tableProperty->setAccessible(true); - $tableProperty->setValue($pool, null); + // Clear routing table registry to force refresh on next session + if ($pool instanceof Neo4jConnectionPool) { + $pool->clearRoutingTable(); + } } $driver->createSession()->run('RETURN 1 AS x'); diff --git a/testkit-backend/src/Handlers/GetRoutingTable.php b/testkit-backend/src/Handlers/GetRoutingTable.php index 83db5977..54699138 100644 --- a/testkit-backend/src/Handlers/GetRoutingTable.php +++ b/testkit-backend/src/Handlers/GetRoutingTable.php @@ -17,7 +17,6 @@ use Laudis\Neo4j\Enum\RoutingRoles; use Laudis\Neo4j\Neo4j\Neo4jConnectionPool; use Laudis\Neo4j\Neo4j\Neo4jDriver; -use Laudis\Neo4j\Neo4j\RoutingTable; use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface; use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface; use Laudis\Neo4j\TestkitBackend\MainRepository; @@ -55,13 +54,19 @@ public function handle($request): TestkitResponseInterface /** @var Neo4jConnectionPool $pool */ $pool = $poolProperty->getValue($driver); - $tableProperty = (new ReflectionClass(Neo4jConnectionPool::class))->getProperty('table'); - $tableProperty->setAccessible(true); - /** @var RoutingTable $table */ - $table = $tableProperty->getValue($pool); + // Use public getter to access routing table registry + $database = $request->getDatabase() ?? 'neo4j'; + $table = $pool->getRoutingTable($database); + + if ($table === null) { + return new FrontendErrorResponse(sprintf( + 'There is no routing table for database "%s". (It might not have been initialized yet)', + $database + )); + } return new RoutingTableResponse( - $request->getDatabase(), + $database, $table->getTtl(), $table->getWithRole(RoutingRoles::ROUTE()), $table->getWithRole(RoutingRoles::FOLLOWER()), diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index a6bb77b0..1ee5be84 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -146,6 +146,9 @@ python3 -m unittest -v \ tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout \ tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_unmanaged_tx \ tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_managed_tx_retry \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time_unmanaged_tx \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time_managed_tx_retry EXIT_CODE=$?