diff --git a/docker-compose.yml b/docker-compose.yml index 151e707c..c3518b10 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -133,6 +133,7 @@ services: TEST_DRIVER_REPO: /opt/project TEST_BACKEND_HOST: testkit_backend TEST_STUB_HOST: testkit + BOLT_LISTEN_ADDR: "0.0.0.0:9001" depends_on: - testkit_backend diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index f1fdd6f0..0d1f9aab 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -419,6 +419,10 @@ public function assertNoFailure(Response $response): void */ public function discardUnconsumedResults(): void { + if (!$this->isOpen()) { + return; + } + if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) { return; } diff --git a/src/Bolt/BoltResult.php b/src/Bolt/BoltResult.php index 7d9c7408..bf8dbebd 100644 --- a/src/Bolt/BoltResult.php +++ b/src/Bolt/BoltResult.php @@ -14,6 +14,9 @@ namespace Laudis\Neo4j\Bolt; use function array_splice; + +use Bolt\error\ConnectException as BoltConnectException; + use function count; use Generator; @@ -21,7 +24,10 @@ use function in_array; use Iterator; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Exception\Neo4jException; use Laudis\Neo4j\Formatter\SummarizedResultFormatter; +use Throwable; /** * @psalm-import-type BoltCypherStats from SummarizedResultFormatter @@ -100,7 +106,17 @@ public function consume(): array private function fetchResults(): void { - $meta = $this->connection->pull($this->qid, $this->fetchSize); + try { + $meta = $this->connection->pull($this->qid, $this->fetchSize); + } catch (BoltConnectException $e) { + // Close connection on socket errors + try { + $this->connection->close(); + } catch (Throwable) { + // Ignore errors when closing + } + throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e); + } /** @var list $rows */ $rows = array_splice($meta, 0, count($meta) - 1); @@ -154,6 +170,11 @@ public function __destruct() public function discard(): void { - $this->connection->discard($this->qid === -1 ? null : $this->qid); + try { + $this->connection->discard($this->qid === -1 ? null : $this->qid); + } catch (BoltConnectException $e) { + // Ignore connection errors during discard - connection is already broken + // The Neo4jException will be thrown when the next operation is attempted + } } } diff --git a/src/Bolt/BoltUnmanagedTransaction.php b/src/Bolt/BoltUnmanagedTransaction.php index a4108d87..0de6c262 100644 --- a/src/Bolt/BoltUnmanagedTransaction.php +++ b/src/Bolt/BoltUnmanagedTransaction.php @@ -149,7 +149,7 @@ public function runStatement(Statement $statement): SummarizedResult $this->database, $this->tsxConfig->getTimeout(), $this->isInstantTransaction ? $this->bookmarkHolder : null, // let the begin transaction pass the bookmarks if it is a managed transaction - $this->isInstantTransaction ? $this->config->getAccessMode() : null, // let the begin transaction decide if it is a managed transaction + null, // mode is never sent in RUN messages - it comes from session configuration $this->tsxConfig->getMetaData() ); } catch (Throwable $e) { diff --git a/src/BoltFactory.php b/src/BoltFactory.php index afe1d744..715c2ecf 100644 --- a/src/BoltFactory.php +++ b/src/BoltFactory.php @@ -79,6 +79,11 @@ public function createConnection(ConnectionRequestData $data, SessionConfigurati $config->setServerAgent($response['server']); + // Apply recv_timeout hint if present + if (array_key_exists('hints', $response) && array_key_exists('connection.recv_timeout_seconds', $response['hints'])) { + $connection->setTimeout((float) $response['hints']['connection.recv_timeout_seconds']); + } + return $connection; } @@ -92,7 +97,7 @@ public function canReuseConnection(ConnectionInterface $connection, SessionConfi $database = $databaseInfo?->getName(); return $connection->getAccessMode() === $config->getAccessMode() - && $database === $config->getDatabase(); + && $database === $config->getDatabase(); } public function reuseConnection(BoltConnection $connection, SessionConfiguration $sessionConfig): BoltConnection diff --git a/src/Contracts/BoltMessage.php b/src/Contracts/BoltMessage.php index ba7d7984..c3828de6 100644 --- a/src/Contracts/BoltMessage.php +++ b/src/Contracts/BoltMessage.php @@ -16,6 +16,9 @@ use Bolt\protocol\Response; use Iterator; use Laudis\Neo4j\Bolt\BoltConnection; +use Laudis\Neo4j\Databags\Neo4jError; +use Laudis\Neo4j\Exception\Neo4jException; +use Throwable; abstract class BoltMessage { @@ -31,13 +34,54 @@ abstract public function send(): BoltMessage; public function getResponse(): Response { - $response = $this->connection->protocol()->getResponse(); + try { + $response = $this->connection->protocol()->getResponse(); + } catch (Throwable $e) { + if ($this->isTimeoutException($e)) { + $timeoutMsg = 'Connection timeout reached'; + if (preg_match('/(\d+)\s*(?:milliseconds?|ms|seconds?|s)/', $e->getMessage(), $matches) && array_key_exists(1, $matches)) { + $timeoutMsg = 'Connection timeout reached after '.$matches[1].' seconds'; + } + try { + $this->connection->close(); + } catch (Throwable) { + } + // Use DriverError so the driver treats this as a failure + throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', $timeoutMsg)], $e); + } elseif ($this->isSocketException($e)) { + try { + $this->connection->close(); + } catch (Throwable) { + } + throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e); + } + + throw $e; + } $this->connection->assertNoFailure($response); return $response; } + private function isTimeoutException(Throwable $e): bool + { + $message = strtolower($e->getMessage()); + + return str_contains($message, 'timeout') || str_contains($message, 'time out'); + } + + private function isSocketException(Throwable $e): bool + { + $message = strtolower($e->getMessage()); + + return str_contains($message, 'broken pipe') + || str_contains($message, 'connection reset') + || str_contains($message, 'connection refused') + || str_contains($message, 'interrupted system call') + || str_contains($message, 'i/o error'); + } + /** * @return Iterator */ diff --git a/testkit-backend/testkit.sh b/testkit-backend/testkit.sh index 78521237..a6bb77b0 100755 --- a/testkit-backend/testkit.sh +++ b/testkit-backend/testkit.sh @@ -139,6 +139,13 @@ python3 -m unittest -v \ tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing_no_server \ tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing_raises_error \ tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing \ +\ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_unmanaged_tx \ + tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_managed_tx_retry \ EXIT_CODE=$?