Skip to content

Commit ddac30d

Browse files
committed
PYTHON-1742 add postBatchResumeToken support
PYTHON-1815 add tests for postBatchResumeToken support PYTHON-1845 clarify resume token used in resuming and getResumeToken
1 parent 010e8d4 commit ddac30d

File tree

4 files changed

+783
-397
lines changed

4 files changed

+783
-397
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ Version 3.9 adds support for MongoDB 4.2. Highlights include:
6262
:meth:`~pymongo.operations.UpdateMany`.
6363
- :class:`~bson.binary.Binary` now supports any bytes-like type that implements
6464
the buffer protocol.
65+
- Resume tokens can now be accessed from a ``ChangeStream`` cursor using the
66+
:attr:`~pymongo.change_stream.ChangeStream.resume_token` attribute.
6567

6668
.. _URI options specification: https://github.com/mongodb/specifications/blob/master/source/uri-options/uri-options.rst
6769

pymongo/change_stream.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,16 @@ def __init__(self, target, pipeline, full_document, resume_after,
7777

7878
self._pipeline = copy.deepcopy(pipeline)
7979
self._full_document = full_document
80-
self._resume_token = copy.deepcopy(resume_after)
80+
self._uses_start_after = start_after is not None
81+
self._uses_resume_after = resume_after is not None
82+
self._resume_token = copy.deepcopy(start_after or resume_after)
8183
self._max_await_time_ms = max_await_time_ms
8284
self._batch_size = batch_size
8385
self._collation = collation
8486
self._start_at_operation_time = start_at_operation_time
8587
self._session = session
86-
self._start_after = copy.deepcopy(start_after)
88+
89+
# Initialize cursor.
8790
self._cursor = self._create_cursor()
8891

8992
@property
@@ -102,10 +105,14 @@ def _change_stream_options(self):
102105
options = {}
103106
if self._full_document is not None:
104107
options['fullDocument'] = self._full_document
105-
if self._resume_token is not None:
106-
options['resumeAfter'] = self._resume_token
107-
if self._start_after is not None:
108-
options['startAfter'] = self._start_after
108+
109+
resume_token = self.resume_token
110+
if resume_token is not None:
111+
if self._uses_start_after:
112+
options['startAfter'] = resume_token
113+
if self._uses_resume_after:
114+
options['resumeAfter'] = resume_token
115+
109116
if self._start_at_operation_time is not None:
110117
options['startAtOperationTime'] = self._start_at_operation_time
111118
return options
@@ -127,12 +134,18 @@ def _aggregation_pipeline(self):
127134
return full_pipeline
128135

129136
def _process_result(self, result, session, server, sock_info, slave_ok):
130-
"""Callback that records a change stream cursor's operationTime."""
131-
if (self._start_at_operation_time is None and
132-
self._resume_token is None and
133-
self._start_after is None and
134-
sock_info.max_wire_version >= 7):
135-
self._start_at_operation_time = result["operationTime"]
137+
"""Callback that caches the startAtOperationTime from a changeStream
138+
aggregate command response containing an empty batch of change
139+
documents.
140+
141+
This is implemented as a callback because we need access to the wire
142+
version in order to determine whether to cache this value.
143+
"""
144+
if not result['cursor']['firstBatch']:
145+
if (self._start_at_operation_time is None and
146+
self.resume_token is None and
147+
sock_info.max_wire_version >= 7):
148+
self._start_at_operation_time = result["operationTime"]
136149

137150
def _run_aggregation_cmd(self, session, explicit_session):
138151
"""Run the full aggregation pipeline for this ChangeStream and return
@@ -168,6 +181,15 @@ def close(self):
168181
def __iter__(self):
169182
return self
170183

184+
@property
185+
def resume_token(self):
186+
"""The cached resume token that will be used to resume after the most
187+
recently returned change.
188+
189+
.. versionadded:: 3.9
190+
"""
191+
return copy.deepcopy(self._resume_token)
192+
171193
def next(self):
172194
"""Advance the cursor.
173195
@@ -249,20 +271,39 @@ def try_next(self):
249271
self._resume()
250272
change = self._cursor._try_next(False)
251273

252-
# No changes are available.
274+
# If no changes are available.
253275
if change is None:
254-
return None
255-
276+
# We have either iterated over all documents in the cursor,
277+
# OR the most-recently returned batch is empty. In either case,
278+
# update the cached resume token with the postBatchResumeToken if
279+
# one was returned. We also clear the startAtOperationTime.
280+
if self._cursor._post_batch_resume_token is not None:
281+
self._resume_token = self._cursor._post_batch_resume_token
282+
self._start_at_operation_time = None
283+
return change
284+
285+
# Else, changes are available.
256286
try:
257287
resume_token = change['_id']
258288
except KeyError:
259289
self.close()
260290
raise InvalidOperation(
261291
"Cannot provide resume functionality when the resume "
262292
"token is missing.")
263-
self._resume_token = copy.copy(resume_token)
293+
294+
# If this is the last change document from the current batch, cache the
295+
# postBatchResumeToken.
296+
if (not self._cursor._has_next() and
297+
self._cursor._post_batch_resume_token):
298+
resume_token = self._cursor._post_batch_resume_token
299+
300+
# Hereafter, don't use startAfter; instead use resumeAfter.
301+
self._uses_start_after = False
302+
self._uses_resume_after = True
303+
304+
# Cache the resume token and clear startAtOperationTime.
305+
self._resume_token = resume_token
264306
self._start_at_operation_time = None
265-
self._start_after = None
266307

267308
if self._decode_custom:
268309
return _bson_to_dict(change.raw, self._orig_codec_options)

pymongo/command_cursor.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,14 @@
1414

1515
"""CommandCursor class to iterate over command results."""
1616

17-
import datetime
18-
1917
from collections import deque
2018

2119
from bson.py3compat import integer_types
22-
from pymongo import helpers
2320
from pymongo.errors import (ConnectionFailure,
2421
InvalidOperation,
2522
NotMasterError,
2623
OperationFailure)
27-
from pymongo.message import (_convert_exception,
28-
_CursorAddress,
24+
from pymongo.message import (_CursorAddress,
2925
_GetMore,
3026
_RawBatchGetMore)
3127

@@ -43,8 +39,9 @@ def __init__(self, collection, cursor_info, address, retrieved=0,
4339
"""
4440
self.__collection = collection
4541
self.__id = cursor_info['id']
46-
self.__address = address
4742
self.__data = deque(cursor_info['firstBatch'])
43+
self.__postbatchresumetoken = cursor_info.get('postBatchResumeToken')
44+
self.__address = address
4845
self.__batch_size = batch_size
4946
self.__max_await_time_ms = max_await_time_ms
5047
self.__session = session
@@ -119,6 +116,17 @@ def batch_size(self, batch_size):
119116
self.__batch_size = batch_size == 1 and 2 or batch_size
120117
return self
121118

119+
def _has_next(self):
120+
"""Returns `True` if the cursor has documents remaining from the
121+
previous batch."""
122+
return len(self.__data) > 0
123+
124+
@property
125+
def _post_batch_resume_token(self):
126+
"""Retrieve the postBatchResumeToken from the response to a
127+
changeStream aggregate or getMore."""
128+
return self.__postbatchresumetoken
129+
122130
def __send_message(self, operation):
123131
"""Send a getmore message and handle the response.
124132
"""
@@ -157,6 +165,7 @@ def kill():
157165
if from_command:
158166
cursor = docs[0]['cursor']
159167
documents = cursor['nextBatch']
168+
self.__postbatchresumetoken = cursor.get('postBatchResumeToken')
160169
self.__id = cursor['id']
161170
else:
162171
documents = docs

0 commit comments

Comments
 (0)