Skip to content

Commit 6bdd0f5

Browse files
committed
Added n and qid to PULL (renamed from PULL_ALL)
1 parent af1bd0b commit 6bdd0f5

File tree

7 files changed

+88
-12
lines changed

7 files changed

+88
-12
lines changed

neo4j/io/__init__.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def run(self, statement, parameters=None, mode=None, bookmarks=None, metadata=No
175175
:param timeout: timeout for transaction execution (seconds)
176176
:param db: name of the database against which to begin the transaction
177177
:param handlers: handler functions passed into the returned Response object
178-
:return: :class:`neo4j.io._bolt3.Response` object
178+
:return: Response object
179179
"""
180180

181181
def discard(self, n=-1, qid=-1, **handlers):
@@ -184,11 +184,17 @@ def discard(self, n=-1, qid=-1, **handlers):
184184
:param n: number of records to discard, default = -1 (ALL)
185185
:param qid: query ID to discard for, default = -1 (last query)
186186
:param handlers: handler functions passed into the returned Response object
187-
:return: :class:`neo4j.io._bolt3.Response` object
187+
:return: Response object
188188
"""
189189

190-
def pull_all(self, **handlers):
191-
raise NotImplementedError
190+
def pull(self, n=-1, qid=-1, **handlers):
191+
""" Appends a PULL message to the output stream.
192+
193+
:param n: number of records to pull, default = -1 (ALL)
194+
:param qid: query ID to pull for, default = -1 (last query)
195+
:param handlers: handler functions passed into the returned Response object
196+
:return: Response object
197+
"""
192198

193199
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
194200
""" Appends a BEGIN message to the output stream.
@@ -199,7 +205,7 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None,
199205
:param timeout: timeout for transaction execution (seconds)
200206
:param db: name of the database against which to begin the transaction
201207
:param handlers: handler functions passed into the returned Response object
202-
:return: :class:`neo4j.io._bolt3.Response` object
208+
:return: Response object
203209
"""
204210

205211
def commit(self, **handlers):
@@ -492,7 +498,7 @@ def fail(md):
492498
log.debug("[#%04X] C: <ROUTING> query=%r", cx.local_port, self.routing_context or {})
493499
cx.run("CALL dbms.cluster.routing.getRoutingTable($context)",
494500
{"context": self.routing_context}, on_success=metadata.update, on_failure=fail)
495-
cx.pull_all(on_success=metadata.update, on_records=records.extend)
501+
cx.pull(on_success=metadata.update, on_records=records.extend)
496502
cx.send_all()
497503
cx.fetch_all()
498504
routing_info = [dict(zip(metadata.get("fields", ()), values)) for values in records]

neo4j/io/_bolt3.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,11 @@ def discard(self, n=-1, qid=-1, **handlers):
181181
log.debug("[#%04X] C: DISCARD_ALL", self.local_port)
182182
self._append(b"\x2F", (), Response(self, **handlers))
183183

184-
def pull_all(self, **handlers):
184+
def pull(self, n=-1, qid=-1, **handlers):
185+
if n != -1:
186+
raise ValueError("Incremental pull is not supported in Bolt 3")
187+
if qid != -1:
188+
raise ValueError("Query selection on pull is not supported in Bolt 3")
185189
log.debug("[#%04X] C: PULL_ALL", self.local_port)
186190
self._append(b"\x3F", (), Response(self, **handlers))
187191

neo4j/io/_bolt4x0.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,14 @@ def discard(self, n=-1, qid=-1, **handlers):
182182
log.debug("[#%04X] C: DISCARD %r", self.local_port, extra)
183183
self._append(b"\x2F", (extra,), Response(self, **handlers))
184184

185-
def pull_all(self, **handlers):
186-
log.debug("[#%04X] C: PULL_ALL", self.local_port)
187-
self._append(b"\x3F", (), Response(self, **handlers))
185+
def pull(self, n=-1, qid=-1, **handlers):
186+
# TODO: find out whether qid is optional, and optimise if so
187+
extra = {
188+
"n": n,
189+
"qid": qid,
190+
}
191+
log.debug("[#%04X] C: PULL %r", self.local_port, extra)
192+
self._append(b"\x3F", (extra,), Response(self, **handlers))
188193

189194
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
190195
db=None, **handlers):

neo4j/work/pipelining.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(self, pool, config):
4444

4545
def push(self, statement, parameters=None):
4646
self._connection.run(statement, parameters)
47-
self._connection.pull_all(on_records=self._data.extend)
47+
self._connection.pull(on_records=self._data.extend)
4848
output_buffer_size = len(self._connection.outbox.view())
4949
if output_buffer_size >= self._flush_every:
5050
self._connection.send_all()

neo4j/work/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def done(summary_metadata):
207207
# TODO: capture ValueError and surface as SessionError/TransactionError if
208208
# TODO: explicit database selection has been made
209209
cx.run(statement_text, parameters, **run_metadata)
210-
cx.pull_all(
210+
cx.pull(
211211
on_records=lambda records: result._records.extend(
212212
hydrant.hydrate_records(result.keys(), records)),
213213
on_success=done,

tests/unit/io/test_class_bolt3.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,28 @@ def test_qid_extra_not_supported_in_discard(fake_socket):
8080
with pytest.raises(ValueError):
8181
connection.discard(qid=666)
8282

83+
84+
def test_simple_pull(fake_socket):
85+
address = ("127.0.0.1", 7687)
86+
socket = fake_socket(address)
87+
connection = Bolt3(address, socket)
88+
connection.pull()
89+
connection.send_all()
90+
tag, fields = socket.pop_message()
91+
assert tag == b"\x3F"
92+
assert len(fields) == 0
93+
94+
95+
def test_n_extra_not_supported_in_pull(fake_socket):
96+
address = ("127.0.0.1", 7687)
97+
connection = Bolt3(address, fake_socket(address))
98+
with pytest.raises(ValueError):
99+
connection.pull(n=666)
100+
101+
102+
def test_qid_extra_not_supported_in_pull(fake_socket):
103+
address = ("127.0.0.1", 7687)
104+
connection = Bolt3(address, fake_socket(address))
105+
with pytest.raises(ValueError):
106+
connection.pull(qid=666)
107+

tests/unit/io/test_class_bolt4x0.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,39 @@ def test_n_and_qid_extras_in_discard(fake_socket):
100100
assert tag == b"\x2F"
101101
assert len(fields) == 1
102102
assert fields[0] == {"n": 666, "qid": 777}
103+
104+
105+
def test_n_extra_in_pull(fake_socket):
106+
address = ("127.0.0.1", 7687)
107+
socket = fake_socket(address)
108+
connection = Bolt4x0(address, socket)
109+
connection.pull(n=666)
110+
connection.send_all()
111+
tag, fields = socket.pop_message()
112+
assert tag == b"\x3F"
113+
assert len(fields) == 1
114+
assert fields[0] == {"n": 666, "qid": -1}
115+
116+
117+
def test_qid_extra_in_pull(fake_socket):
118+
address = ("127.0.0.1", 7687)
119+
socket = fake_socket(address)
120+
connection = Bolt4x0(address, socket)
121+
connection.pull(qid=666)
122+
connection.send_all()
123+
tag, fields = socket.pop_message()
124+
assert tag == b"\x3F"
125+
assert len(fields) == 1
126+
assert fields[0] == {"n": -1, "qid": 666}
127+
128+
129+
def test_n_and_qid_extras_in_pull(fake_socket):
130+
address = ("127.0.0.1", 7687)
131+
socket = fake_socket(address)
132+
connection = Bolt4x0(address, socket)
133+
connection.pull(n=666, qid=777)
134+
connection.send_all()
135+
tag, fields = socket.pop_message()
136+
assert tag == b"\x3F"
137+
assert len(fields) == 1
138+
assert fields[0] == {"n": 666, "qid": 777}

0 commit comments

Comments
 (0)