Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ services:
TEST_DRIVER_REPO: /opt/project
TEST_BACKEND_HOST: testkit_backend
TEST_STUB_HOST: testkit
BOLT_LISTEN_ADDR: "0.0.0.0:9001"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment where this config variable is being used?

depends_on:
- testkit_backend

Expand Down
4 changes: 4 additions & 0 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ public function assertNoFailure(Response $response): void
*/
public function discardUnconsumedResults(): void
{
if (!$this->isOpen()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this into the same if statement.

It is worth inspecting why we have to make this check, as if the conneciton is closed, the serverstate should not be in STREAMING or TX_STREAMING state in the first place

return;
}

if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
return;
}
Expand Down
25 changes: 23 additions & 2 deletions src/Bolt/BoltResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
namespace Laudis\Neo4j\Bolt;

use function array_splice;

use Bolt\error\ConnectException as BoltConnectException;

use function count;

use Generator;

use function in_array;

use Iterator;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Throwable;

/**
* @psalm-import-type BoltCypherStats from SummarizedResultFormatter
Expand Down Expand Up @@ -100,7 +106,17 @@ public function consume(): array

private function fetchResults(): void
{
$meta = $this->connection->pull($this->qid, $this->fetchSize);
try {
$meta = $this->connection->pull($this->qid, $this->fetchSize);
} catch (BoltConnectException $e) {
// Close connection on socket errors
try {
$this->connection->close();
} catch (Throwable) {
// Ignore errors when closing
}
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e);
}

/** @var list<list> $rows */
$rows = array_splice($meta, 0, count($meta) - 1);
Expand Down Expand Up @@ -154,6 +170,11 @@ public function __destruct()

public function discard(): void
{
$this->connection->discard($this->qid === -1 ? null : $this->qid);
try {
$this->connection->discard($this->qid === -1 ? null : $this->qid);
} catch (BoltConnectException $e) {
// Ignore connection errors during discard - connection is already broken
// The Neo4jException will be thrown when the next operation is attempted
}
}
}
2 changes: 1 addition & 1 deletion src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public function runStatement(Statement $statement): SummarizedResult
$this->database,
$this->tsxConfig->getTimeout(),
$this->isInstantTransaction ? $this->bookmarkHolder : null, // let the begin transaction pass the bookmarks if it is a managed transaction
$this->isInstantTransaction ? $this->config->getAccessMode() : null, // let the begin transaction decide if it is a managed transaction
null, // mode is never sent in RUN messages - it comes from session configuration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure? $this->config is a session configuration object.

$this->tsxConfig->getMetaData()
);
} catch (Throwable $e) {
Expand Down
7 changes: 6 additions & 1 deletion src/BoltFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public function createConnection(ConnectionRequestData $data, SessionConfigurati

$config->setServerAgent($response['server']);

// Apply recv_timeout hint if present
if (array_key_exists('hints', $response) && array_key_exists('connection.recv_timeout_seconds', $response['hints'])) {
$connection->setTimeout((float) $response['hints']['connection.recv_timeout_seconds']);
}

return $connection;
}

Expand All @@ -92,7 +97,7 @@ public function canReuseConnection(ConnectionInterface $connection, SessionConfi
$database = $databaseInfo?->getName();

return $connection->getAccessMode() === $config->getAccessMode()
&& $database === $config->getDatabase();
&& $database === $config->getDatabase();
}

public function reuseConnection(BoltConnection $connection, SessionConfiguration $sessionConfig): BoltConnection
Expand Down
46 changes: 45 additions & 1 deletion src/Contracts/BoltMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
use Bolt\protocol\Response;
use Iterator;
use Laudis\Neo4j\Bolt\BoltConnection;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Exception\Neo4jException;
use Throwable;

abstract class BoltMessage
{
Expand All @@ -31,13 +34,54 @@ abstract public function send(): BoltMessage;

public function getResponse(): Response
{
$response = $this->connection->protocol()->getResponse();
try {
$response = $this->connection->protocol()->getResponse();
} catch (Throwable $e) {
if ($this->isTimeoutException($e)) {
$timeoutMsg = 'Connection timeout reached';
if (preg_match('/(\d+)\s*(?:milliseconds?|ms|seconds?|s)/', $e->getMessage(), $matches) && array_key_exists(1, $matches)) {
$timeoutMsg = 'Connection timeout reached after '.$matches[1].' seconds';
}
try {
$this->connection->close();
} catch (Throwable) {
}
// Use DriverError so the driver treats this as a failure
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', $timeoutMsg)], $e);
} elseif ($this->isSocketException($e)) {
try {
$this->connection->close();
} catch (Throwable) {
}
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e);
}

throw $e;
}

$this->connection->assertNoFailure($response);

return $response;
}

private function isTimeoutException(Throwable $e): bool
{
$message = strtolower($e->getMessage());

return str_contains($message, 'timeout') || str_contains($message, 'time out');
}

private function isSocketException(Throwable $e): bool
{
$message = strtolower($e->getMessage());

return str_contains($message, 'broken pipe')
|| str_contains($message, 'connection reset')
|| str_contains($message, 'connection refused')
|| str_contains($message, 'interrupted system call')
|| str_contains($message, 'i/o error');
}

/**
* @return Iterator<Response>
*/
Expand Down
7 changes: 7 additions & 0 deletions testkit-backend/testkit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ python3 -m unittest -v \
tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing_no_server \
tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing_raises_error \
tests.stub.connectivity_check.test_get_server_info.TestGetServerInfo.test_routing \
\
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_unmanaged_tx \
tests.stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_in_time_managed_tx_retry \

EXIT_CODE=$?

Expand Down
Loading