Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Exception\TimeoutException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
use Laudis\Neo4j\Types\CypherList;
Expand Down Expand Up @@ -74,12 +75,80 @@ public function runStatements(iterable $statements, ?TransactionConfiguration $c
$this->getLogger()?->log(LogLevel::INFO, 'Running statements', ['statements' => $statements]);
$config = $this->mergeTsxConfig($config);
foreach ($statements as $statement) {
$tbr[] = $this->beginInstantTransaction($this->config, $config)->runStatement($statement);
$tbr[] = $this->runStatementWithRetry($statement, $config);
}

return new CypherList($tbr);
}

/**
* Runs a single statement with retry logic to handle socket timeouts and routing errors.
* For routing drivers, this is essential to refresh the routing table on timeout.
*
* @throws Exception
*/
private function runStatementWithRetry(Statement $statement, TransactionConfiguration $config): SummarizedResult
{
while (true) {
$transaction = null;
try {
$transaction = $this->beginInstantTransaction($this->config, $config);
$result = $transaction->runStatement($statement);
// Trigger lazy loading of results to catch any timeout during result iteration
self::triggerLazyResult($result);

return $result;
} catch (TimeoutException $e) {
// Socket timeout - clear routing table and retry
if ($transaction) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors during timeout
}
}

// Close broken connection so it won't be reused
foreach ($this->usedConnections as $i => $usedConnection) {
try {
$usedConnection->close();
array_splice($this->usedConnections, $i, 1);
} catch (Exception $closeException) {
// Ignore close errors
}
}

if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} catch (Neo4jException $e) {
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors
}
}

if ($this->isSocketTimeoutError($e)) {
// When socket timeout occurs, clear routing table to force re-fetch with fresh server list
if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} elseif ($e->getTitle() === 'NotALeader') {
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
$this->pool->close();
// Continue retry loop
} elseif ($e->getClassification() !== 'TransientError') {
throw $e;
}
// For other transient errors, continue retry loop
}
}
}

/**
* @param iterable<Statement>|null $statements
*/
Expand Down Expand Up @@ -136,12 +205,41 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
$transaction->commit();

return $tbr;
} catch (TimeoutException $e) {
// Socket timeout - clear routing table and retry
if ($transaction) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors during timeout
}
}

// Close broken connection so it won't be reused
foreach ($this->usedConnections as $i => $usedConnection) {
try {
$usedConnection->close();
array_splice($this->usedConnections, $i, 1);
} catch (Exception $closeException) {
// Ignore close errors
}
}

if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} catch (Neo4jException $e) {
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
$transaction->rollback();
}

if ($e->getTitle() === 'NotALeader') {
if ($this->isSocketTimeoutError($e)) {
// When socket timeout occurs, clear routing table to force re-fetch with fresh server list
if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
} elseif ($e->getTitle() === 'NotALeader') {
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
$this->pool->close();
} elseif ($e->getClassification() !== 'TransientError') {
Expand All @@ -151,6 +249,37 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
}
}

/**
* Checks if an exception represents a socket timeout or connection-related failure
* that requires routing table refresh.
*
* @param Neo4jException $e The exception to check
*
* @return bool True if this is a socket timeout or connection failure
*/
private function isSocketTimeoutError(Neo4jException $e): bool
{
$title = $e->getTitle();
$classification = $e->getClassification();

// Check if this was caused by a timeout exception in the bolt library
// Timeout exceptions are wrapped in Neo4jException with NotALeader title,
// but we can detect them by checking the previous exception message
$previous = $e->getPrevious();
if ($previous !== null) {
$prevMessage = strtolower($previous->getMessage());
if (str_contains($prevMessage, 'timeout') || str_contains($prevMessage, 'time out')) {
return true;
}
}

// Socket timeout errors should be treated as transient and trigger routing table refresh
return in_array($title, [
'ServiceUnavailable',
'FailedToRoute',
], true) || $classification === 'TransientError';
}

private static function triggerLazyResult(mixed $tbr): void
{
if ($tbr instanceof CypherSequence) {
Expand Down
60 changes: 59 additions & 1 deletion src/Neo4j/Neo4jConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ final class Neo4jConnectionPool implements ConnectionPoolInterface
/** @var array<string, ConnectionPool> */
private static array $pools = [];

/**
* Registry of routing tables per database.
* Maps database name -> RoutingTable.
*
* @var array<string, RoutingTable>
*/
private array $routingTableRegistry = [];

/**
* @psalm-mutation-free
*/
Expand Down Expand Up @@ -182,6 +190,50 @@ public function getLogger(): ?Neo4jLogger
return $this->logger;
}

/**
* Returns the routing table for a specific database, or null if not yet initialized.
* This method is intended for testkit backend access to routing table information.
*
* @param string $database The database name to retrieve routing table for
*
* @return RoutingTable|null The routing table if available, null otherwise
*/
public function getRoutingTable(string $database = 'neo4j'): ?RoutingTable
{
return $this->routingTableRegistry[$database] ?? null;
}

/**
* Returns the complete routing table registry for all databases.
* This method is intended for testkit backend access to routing information.
*
* @return array<string, RoutingTable> Map of database name to RoutingTable
*/
public function getRoutingTableRegistry(): array
{
return $this->routingTableRegistry;
}

/**
* Clears the routing table registry for a specific database or all databases.
* This is used to force a routing table refresh on the next session.
*
* @param string|null $database Database to clear, or null to clear all
*/
public function clearRoutingTable(?string $database = null): void
{
if ($database === null) {
$this->routingTableRegistry = [];
// Also clear the entire cache to force routing table refresh
$this->cache->clear();
} else {
unset($this->routingTableRegistry[$database]);
// Also clear the specific cache key for this database
$key = $this->createKey($this->data, new SessionConfiguration(database: $database));
$this->cache->delete($key);
}
}

/**
* @throws Exception
*/
Expand Down Expand Up @@ -212,7 +264,13 @@ private function routingTable(BoltConnection $connection, SessionConfiguration $
['servers' => $servers, 'ttl' => $ttl] = $route['rt'];
$ttl += time();

return new RoutingTable($servers, $ttl);
$table = new RoutingTable($servers, $ttl);

// Store in routing table registry for testkit access
$database = $config->getDatabase() ?? 'neo4j';
$this->routingTableRegistry[$database] = $table;

return $table;
}

public function release(ConnectionInterface $connection): void
Expand Down
7 changes: 4 additions & 3 deletions testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public function handle($request): TestkitResponseInterface
/** @var ConnectionPoolInterface $pool */
$pool = $poolProperty->getValue($driver);

$tableProperty = (new ReflectionClass(Neo4jConnectionPool::class))->getProperty('table');
$tableProperty->setAccessible(true);
$tableProperty->setValue($pool, null);
// Clear routing table registry to force refresh on next session
if ($pool instanceof Neo4jConnectionPool) {
$pool->clearRoutingTable();
}
}

$driver->createSession()->run('RETURN 1 AS x');
Expand Down
17 changes: 11 additions & 6 deletions testkit-backend/src/Handlers/GetRoutingTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Laudis\Neo4j\Enum\RoutingRoles;
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
use Laudis\Neo4j\Neo4j\Neo4jDriver;
use Laudis\Neo4j\Neo4j\RoutingTable;
use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface;
use Laudis\Neo4j\TestkitBackend\Contracts\TestkitResponseInterface;
use Laudis\Neo4j\TestkitBackend\MainRepository;
Expand Down Expand Up @@ -55,13 +54,19 @@ public function handle($request): TestkitResponseInterface
/** @var Neo4jConnectionPool $pool */
$pool = $poolProperty->getValue($driver);

$tableProperty = (new ReflectionClass(Neo4jConnectionPool::class))->getProperty('table');
$tableProperty->setAccessible(true);
/** @var RoutingTable $table */
$table = $tableProperty->getValue($pool);
// Use public getter to access routing table registry
$database = $request->getDatabase() ?? 'neo4j';
$table = $pool->getRoutingTable($database);

if ($table === null) {
return new FrontendErrorResponse(sprintf(
'There is no routing table for database "%s". (It might not have been initialized yet)',
$database
));
}

return new RoutingTableResponse(
$request->getDatabase(),
$database,
$table->getTtl(),
$table->getWithRole(RoutingRoles::ROUTE()),
$table->getWithRole(RoutingRoles::FOLLOWER()),
Expand Down
3 changes: 3 additions & 0 deletions testkit-backend/testkit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ python3 -m unittest -v \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_unmanaged_tx \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_managed_tx_retry \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time_unmanaged_tx \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestRoutingConnectionRecvTimeout.test_in_time_managed_tx_retry

EXIT_CODE=$?

Expand Down
Loading