Skip to content

Commit d12f951

Browse files
committed
Added db to BEGIN and introduced unit test tool
1 parent e8dc358 commit d12f951

File tree

8 files changed

+228
-130
lines changed

8 files changed

+228
-130
lines changed

neo4j/io/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,17 @@ def discard_all(self, **handlers):
172172
def pull_all(self, **handlers):
173173
raise NotImplementedError
174174

175-
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
176-
raise NotImplementedError
175+
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
176+
""" Appends a BEGIN message to the output stream.
177+
178+
:param mode: access mode for routing - "READ" or "WRITE" (default)
179+
:param bookmarks: iterable of bookmark values after which this transaction should begin
180+
:param metadata: custom metadata dictionary to attach to the transaction
181+
:param timeout: timeout for transaction execution (seconds)
182+
:param db: name of the database against which to begin the transaction
183+
:param handlers: handler functions passed into the returned Response object
184+
:return: :class:`neo4j.io._bolt3.Response` object
185+
"""
177186

178187
def commit(self, **handlers):
179188
raise NotImplementedError

neo4j/io/_bolt3.py

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from neo4j.api import (
2727
Version,
2828
)
29+
from neo4j.io._courier import MessageInbox
2930
from neo4j.meta import get_user_agent
3031
from neo4j.exceptions import (
3132
ProtocolError,
@@ -39,7 +40,6 @@
3940
SessionExpired,
4041
)
4142
from neo4j.packstream import (
42-
UnpackableBuffer,
4343
Unpacker,
4444
Packer,
4545
)
@@ -178,7 +178,10 @@ def pull_all(self, **handlers):
178178
log.debug("[#%04X] C: PULL_ALL", self.local_port)
179179
self._append(b"\x3F", (), Response(self, **handlers))
180180

181-
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
181+
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
182+
db=None, **handlers):
183+
if db is not None:
184+
raise ValueError("Database selection is not supported in Bolt 3")
182185
extra = {}
183186
if mode:
184187
extra["mode"] = mode
@@ -538,56 +541,16 @@ def recv_into(self, buffer, n_bytes=0, flags=0):
538541
return n_bytes
539542

540543

541-
class Inbox:
542-
543-
def __init__(self, s, on_error):
544-
super(Inbox, self).__init__()
545-
self.on_error = on_error
546-
self._messages = self._yield_messages(s)
547-
548-
def __iter__(self):
549-
return self
544+
class Inbox(MessageInbox):
550545

551546
def __next__(self):
552-
return next(self._messages)
553-
554-
@classmethod
555-
def _load_chunks(cls, sock, buffer):
556-
chunk_size = 0
557-
while True:
558-
if chunk_size == 0:
559-
buffer.receive(sock, 2)
560-
chunk_size = buffer.pop_u16()
561-
if chunk_size > 0:
562-
buffer.receive(sock, chunk_size + 2)
563-
yield chunk_size
564-
565-
def _yield_messages(self, sock):
566-
try:
567-
buffer = UnpackableBuffer()
568-
chunk_loader = self._load_chunks(sock, buffer)
569-
unpacker = Unpacker(buffer)
570-
details = []
571-
while True:
572-
unpacker.reset()
573-
details[:] = ()
574-
chunk_size = -1
575-
while chunk_size != 0:
576-
chunk_size = next(chunk_loader)
577-
summary_signature = None
578-
summary_metadata = None
579-
size, signature = unpacker.unpack_structure_header()
580-
if size > 1:
581-
raise ProtocolError("Expected one field")
582-
if signature == b"\x71":
583-
data = unpacker.unpack()
584-
details.append(data)
585-
else:
586-
summary_signature = signature
587-
summary_metadata = unpacker.unpack_map()
588-
yield details, summary_signature, summary_metadata
589-
except OSError as error:
590-
self.on_error(error)
547+
tag, fields = self.pop()
548+
if tag == b"\x71":
549+
return fields, None, None
550+
elif fields:
551+
return [], tag, fields[0]
552+
else:
553+
return [], tag, None
591554

592555

593556
class Response:

neo4j/io/_bolt4x0.py

Lines changed: 13 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from neo4j.api import (
2727
Version,
2828
)
29+
from neo4j.io._courier import MessageInbox
2930
from neo4j.meta import get_user_agent
3031
from neo4j.exceptions import (
3132
ProtocolError,
@@ -39,7 +40,6 @@
3940
SessionExpired,
4041
)
4142
from neo4j.packstream import (
42-
UnpackableBuffer,
4343
Unpacker,
4444
Packer,
4545
)
@@ -178,10 +178,13 @@ def pull_all(self, **handlers):
178178
log.debug("[#%04X] C: PULL_ALL", self.local_port)
179179
self._append(b"\x3F", (), Response(self, **handlers))
180180

181-
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, **handlers):
181+
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
182+
db=None, **handlers):
182183
extra = {}
183184
if mode:
184185
extra["mode"] = mode
186+
if db:
187+
extra["db"] = db
185188
if bookmarks:
186189
try:
187190
extra["bookmarks"] = list(bookmarks)
@@ -538,56 +541,16 @@ def recv_into(self, buffer, n_bytes=0, flags=0):
538541
return n_bytes
539542

540543

541-
class Inbox:
542-
543-
def __init__(self, s, on_error):
544-
super(Inbox, self).__init__()
545-
self.on_error = on_error
546-
self._messages = self._yield_messages(s)
547-
548-
def __iter__(self):
549-
return self
544+
class Inbox(MessageInbox):
550545

551546
def __next__(self):
552-
return next(self._messages)
553-
554-
@classmethod
555-
def _load_chunks(cls, sock, buffer):
556-
chunk_size = 0
557-
while True:
558-
if chunk_size == 0:
559-
buffer.receive(sock, 2)
560-
chunk_size = buffer.pop_u16()
561-
if chunk_size > 0:
562-
buffer.receive(sock, chunk_size + 2)
563-
yield chunk_size
564-
565-
def _yield_messages(self, sock):
566-
try:
567-
buffer = UnpackableBuffer()
568-
chunk_loader = self._load_chunks(sock, buffer)
569-
unpacker = Unpacker(buffer)
570-
details = []
571-
while True:
572-
unpacker.reset()
573-
details[:] = ()
574-
chunk_size = -1
575-
while chunk_size != 0:
576-
chunk_size = next(chunk_loader)
577-
summary_signature = None
578-
summary_metadata = None
579-
size, signature = unpacker.unpack_structure_header()
580-
if size > 1:
581-
raise ProtocolError("Expected one field")
582-
if signature == b"\x71":
583-
data = unpacker.unpack()
584-
details.append(data)
585-
else:
586-
summary_signature = signature
587-
summary_metadata = unpacker.unpack_map()
588-
yield details, summary_signature, summary_metadata
589-
except OSError as error:
590-
self.on_error(error)
547+
tag, fields = self.pop()
548+
if tag == b"\x71":
549+
return fields, None, None
550+
elif fields:
551+
return [], tag, fields[0]
552+
else:
553+
return [], tag, None
591554

592555

593556
class Response:

neo4j/io/_courier.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env python
2+
# -*- encoding: utf-8 -*-
3+
4+
# Copyright (c) 2002-2020 "Neo4j,"
5+
# Neo4j Sweden AB [http://neo4j.com]
6+
#
7+
# This file is part of Neo4j.
8+
#
9+
# Licensed under the Apache License, Version 2.0 (the "License");
10+
# you may not use this file except in compliance with the License.
11+
# You may obtain a copy of the License at
12+
#
13+
# http://www.apache.org/licenses/LICENSE-2.0
14+
#
15+
# Unless required by applicable law or agreed to in writing, software
16+
# distributed under the License is distributed on an "AS IS" BASIS,
17+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
# See the License for the specific language governing permissions and
19+
# limitations under the License.
20+
21+
22+
from neo4j.packstream import (
23+
UnpackableBuffer,
24+
Unpacker,
25+
)
26+
27+
28+
class MessageInbox:
29+
30+
def __init__(self, s, on_error):
31+
self.on_error = on_error
32+
self._messages = self._yield_messages(s)
33+
34+
@classmethod
35+
def _load_chunks(cls, sock, buffer):
36+
chunk_size = 0
37+
while True:
38+
if chunk_size == 0:
39+
buffer.receive(sock, 2)
40+
chunk_size = buffer.pop_u16()
41+
if chunk_size > 0:
42+
buffer.receive(sock, chunk_size + 2)
43+
yield chunk_size
44+
45+
def _yield_messages(self, sock):
46+
try:
47+
buffer = UnpackableBuffer()
48+
chunk_loader = self._load_chunks(sock, buffer)
49+
unpacker = Unpacker(buffer)
50+
while True:
51+
unpacker.reset()
52+
chunk_size = -1
53+
while chunk_size != 0:
54+
chunk_size = next(chunk_loader)
55+
size, tag = unpacker.unpack_structure_header()
56+
fields = [unpacker.unpack() for _ in range(size)]
57+
yield tag, fields
58+
except OSError as error:
59+
self.on_error(error)
60+
61+
def pop(self):
62+
return next(self._messages)

neo4j/work/simple.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ def begin_transaction(self, bookmark=None, metadata=None, timeout=None):
308308
def _open_transaction(self, access_mode=None, metadata=None, timeout=None):
309309
self._transaction = Transaction(self, on_close=self._close_transaction)
310310
self._connect(access_mode)
311+
# TODO: capture ValueError and surface as SessionError/TransactionError if
312+
# TODO: explicit database selection has been made
311313
self._connection.begin(bookmarks=self._bookmarks_in, metadata=metadata, timeout=timeout)
312314

313315
def commit_transaction(self):

tests/unit/io/conftest.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env python
2+
# -*- encoding: utf-8 -*-
3+
4+
# Copyright (c) 2002-2020 "Neo4j,"
5+
# Neo4j Sweden AB [http://neo4j.com]
6+
#
7+
# This file is part of Neo4j.
8+
#
9+
# Licensed under the Apache License, Version 2.0 (the "License");
10+
# you may not use this file except in compliance with the License.
11+
# You may obtain a copy of the License at
12+
#
13+
# http://www.apache.org/licenses/LICENSE-2.0
14+
#
15+
# Unless required by applicable law or agreed to in writing, software
16+
# distributed under the License is distributed on an "AS IS" BASIS,
17+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
# See the License for the specific language governing permissions and
19+
# limitations under the License.
20+
21+
22+
import pytest
23+
24+
from neo4j.io._courier import MessageInbox
25+
26+
27+
class FakeSocket:
28+
29+
def __init__(self, address):
30+
self.address = address
31+
self.captured = b""
32+
self.messages = MessageInbox(self, on_error=print)
33+
34+
def setblocking(self, flag):
35+
pass
36+
37+
def getsockname(self):
38+
return "127.0.0.1", 0xFFFF
39+
40+
def getpeername(self):
41+
return self.address
42+
43+
def recv_into(self, buffer, nbytes):
44+
data = self.captured[:nbytes]
45+
actual = len(data)
46+
buffer[:actual] = data
47+
self.captured = self.captured[actual:]
48+
return actual
49+
50+
def sendall(self, data):
51+
self.captured += data
52+
53+
def close(self):
54+
return
55+
56+
def pop_message(self):
57+
return self.messages.pop()
58+
59+
60+
@pytest.fixture
61+
def fake_socket():
62+
return FakeSocket

0 commit comments

Comments
 (0)