Skip to content

Commit 3024ac2

Browse files
Merge pull request #339 from neo4j/4.0-bolt-4.0
Adds Bolt 4.0 protocol support at connector level
2 parents 22e56db + d66d324 commit 3024ac2

File tree

9 files changed

+1067
-89
lines changed

9 files changed

+1067
-89
lines changed

neo4j/io/__init__.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ def open(cls, address, *, auth=None, timeout=None, **config):
124124
if config.protocol_version == (3, 0):
125125
from neo4j.io._bolt3 import Bolt3
126126
connection = Bolt3(address, s, auth=auth, **config)
127+
elif config.protocol_version == (4, 0):
128+
from neo4j.io._bolt4x0 import Bolt4x0
129+
connection = Bolt4x0(address, s, auth=auth, **config)
127130
else:
128131
log.debug("[#%04X] S: <CLOSE>", s.getpeername()[1])
129132
s.shutdown(SHUT_RDWR)
@@ -160,17 +163,50 @@ def __enter__(self):
160163
def __exit__(self, exc_type, exc_value, traceback):
161164
self.close()
162165

163-
def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
164-
raise NotImplementedError
166+
def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=None,
167+
timeout=None, db=None, **handlers):
168+
""" Appends a RUN message to the output stream.
169+
170+
:param statement: Cypher query string
171+
:param parameters: dictionary of Cypher parameters
172+
:param mode: access mode for routing - "READ" or "WRITE" (default)
173+
:param bookmarks: iterable of bookmark values after which this transaction should begin
174+
:param metadata: custom metadata dictionary to attach to the transaction
175+
:param timeout: timeout for transaction execution (seconds)
176+
:param db: name of the database against which to begin the transaction
177+
:param handlers: handler functions passed into the returned Response object
178+
:return: Response object
179+
"""
165180

166-
def discard_all(self, **handlers):
167-
raise NotImplementedError
181+
def discard(self, n=-1, qid=-1, **handlers):
182+
""" Appends a DISCARD message to the output stream.
168183
169-
def pull_all(self, **handlers):
170-
raise NotImplementedError
184+
:param n: number of records to discard, default = -1 (ALL)
185+
:param qid: query ID to discard for, default = -1 (last query)
186+
:param handlers: handler functions passed into the returned Response object
187+
:return: Response object
188+
"""
171189

172-
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
173-
raise NotImplementedError
190+
def pull(self, n=-1, qid=-1, **handlers):
191+
""" Appends a PULL message to the output stream.
192+
193+
:param n: number of records to pull, default = -1 (ALL)
194+
:param qid: query ID to pull for, default = -1 (last query)
195+
:param handlers: handler functions passed into the returned Response object
196+
:return: Response object
197+
"""
198+
199+
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
200+
""" Appends a BEGIN message to the output stream.
201+
202+
:param mode: access mode for routing - "READ" or "WRITE" (default)
203+
:param bookmarks: iterable of bookmark values after which this transaction should begin
204+
:param metadata: custom metadata dictionary to attach to the transaction
205+
:param timeout: timeout for transaction execution (seconds)
206+
:param db: name of the database against which to begin the transaction
207+
:param handlers: handler functions passed into the returned Response object
208+
:return: Response object
209+
"""
174210

175211
def commit(self, **handlers):
176212
raise NotImplementedError
@@ -462,7 +498,7 @@ def fail(md):
462498
log.debug("[#%04X] C: <ROUTING> query=%r", cx.local_port, self.routing_context or {})
463499
cx.run("CALL dbms.cluster.routing.getRoutingTable($context)",
464500
{"context": self.routing_context}, on_success=metadata.update, on_failure=fail)
465-
cx.pull_all(on_success=metadata.update, on_records=records.extend)
501+
cx.pull(on_success=metadata.update, on_records=records.extend)
466502
cx.send_all()
467503
cx.fetch_all()
468504
routing_info = [dict(zip(metadata.get("fields", ()), values)) for values in records]

neo4j/io/_bolt3.py

Lines changed: 27 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from neo4j.api import (
2727
Version,
2828
)
29+
from neo4j.io._courier import MessageInbox
2930
from neo4j.meta import get_user_agent
3031
from neo4j.exceptions import (
3132
ProtocolError,
@@ -39,7 +40,6 @@
3940
SessionExpired,
4041
)
4142
from neo4j.packstream import (
42-
UnpackableBuffer,
4343
Unpacker,
4444
Packer,
4545
)
@@ -142,7 +142,10 @@ def hello(self):
142142
self.send_all()
143143
self.fetch_all()
144144

145-
def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
145+
def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=None,
146+
timeout=None, db=None, **handlers):
147+
if db is not None:
148+
raise ValueError("Database selection is not supported in Bolt 3")
146149
if not parameters:
147150
parameters = {}
148151
extra = {}
@@ -170,15 +173,26 @@ def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=No
170173
else:
171174
self._append(b"\x10", fields, Response(self, **handlers))
172175

173-
def discard_all(self, **handlers):
176+
def discard(self, n=-1, qid=-1, **handlers):
177+
if n != -1:
178+
raise ValueError("Incremental discard is not supported in Bolt 3")
179+
if qid != -1:
180+
raise ValueError("Query selection on discard is not supported in Bolt 3")
174181
log.debug("[#%04X] C: DISCARD_ALL", self.local_port)
175182
self._append(b"\x2F", (), Response(self, **handlers))
176183

177-
def pull_all(self, **handlers):
184+
def pull(self, n=-1, qid=-1, **handlers):
185+
if n != -1:
186+
raise ValueError("Incremental pull is not supported in Bolt 3")
187+
if qid != -1:
188+
raise ValueError("Query selection on pull is not supported in Bolt 3")
178189
log.debug("[#%04X] C: PULL_ALL", self.local_port)
179190
self._append(b"\x3F", (), Response(self, **handlers))
180191

181-
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
192+
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
193+
db=None, **handlers):
194+
if db is not None:
195+
raise ValueError("Database selection is not supported in Bolt 3")
182196
extra = {}
183197
if mode:
184198
extra["mode"] = mode
@@ -538,56 +552,16 @@ def recv_into(self, buffer, n_bytes=0, flags=0):
538552
return n_bytes
539553

540554

541-
class Inbox:
542-
543-
def __init__(self, s, on_error):
544-
super(Inbox, self).__init__()
545-
self.on_error = on_error
546-
self._messages = self._yield_messages(s)
547-
548-
def __iter__(self):
549-
return self
555+
class Inbox(MessageInbox):
550556

551557
def __next__(self):
552-
return next(self._messages)
553-
554-
@classmethod
555-
def _load_chunks(cls, sock, buffer):
556-
chunk_size = 0
557-
while True:
558-
if chunk_size == 0:
559-
buffer.receive(sock, 2)
560-
chunk_size = buffer.pop_u16()
561-
if chunk_size > 0:
562-
buffer.receive(sock, chunk_size + 2)
563-
yield chunk_size
564-
565-
def _yield_messages(self, sock):
566-
try:
567-
buffer = UnpackableBuffer()
568-
chunk_loader = self._load_chunks(sock, buffer)
569-
unpacker = Unpacker(buffer)
570-
details = []
571-
while True:
572-
unpacker.reset()
573-
details[:] = ()
574-
chunk_size = -1
575-
while chunk_size != 0:
576-
chunk_size = next(chunk_loader)
577-
summary_signature = None
578-
summary_metadata = None
579-
size, signature = unpacker.unpack_structure_header()
580-
if size > 1:
581-
raise ProtocolError("Expected one field")
582-
if signature == b"\x71":
583-
data = unpacker.unpack()
584-
details.append(data)
585-
else:
586-
summary_signature = signature
587-
summary_metadata = unpacker.unpack_map()
588-
yield details, summary_signature, summary_metadata
589-
except OSError as error:
590-
self.on_error(error)
558+
tag, fields = self.pop()
559+
if tag == b"\x71":
560+
return fields, None, None
561+
elif fields:
562+
return [], tag, fields[0]
563+
else:
564+
return [], tag, None
591565

592566

593567
class Response:

0 commit comments

Comments
 (0)