22
33namespace MongoDB \Tests \Operation ;
44
5+ use Closure ;
56use MongoDB \ChangeStream ;
67use MongoDB \BSON \TimestampInterface ;
78use MongoDB \Driver \Cursor ;
9+ use MongoDB \Driver \Exception \CommandException ;
10+ use MongoDB \Driver \Exception \ConnectionTimeoutException ;
811use MongoDB \Driver \Manager ;
912use MongoDB \Driver \ReadPreference ;
1013use MongoDB \Driver \Server ;
@@ -25,6 +28,8 @@ class WatchFunctionalTest extends FunctionalTestCase
2528{
2629 use SetUpTearDownTrait;
2730
31+ const NOT_MASTER = 10107 ;
32+
2833 private static $ wireVersionForStartAtOperationTime = 7 ;
2934
3035 private $ defaultOptions = ['maxAwaitTimeMS ' => 500 ];
@@ -890,9 +895,11 @@ public function testRewindExtractsResumeTokenAndNextResumes()
890895 $ changeStream ->next ();
891896 $ this ->assertTrue ($ changeStream ->valid ());
892897
893- $ options = ['resumeAfter ' => $ changeStream ->current ()->_id ] + $ this ->defaultOptions ;
898+ $ resumeToken = $ changeStream ->current ()->_id ;
899+ $ options = ['resumeAfter ' => $ resumeToken ] + $ this ->defaultOptions ;
894900 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
895901 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
902+ $ this ->assertSame ($ resumeToken , $ changeStream ->getResumeToken ());
896903
897904 $ changeStream ->rewind ();
898905 $ this ->assertTrue ($ changeStream ->valid ());
@@ -979,6 +986,7 @@ public function testStartAfterOption()
979986 $ options = $ this ->defaultOptions + ['startAfter ' => $ resumeToken ];
980987 $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
981988 $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
989+ $ this ->assertSame ($ resumeToken , $ changeStream ->getResumeToken ());
982990
983991 $ changeStream ->rewind ();
984992 $ this ->assertTrue ($ changeStream ->valid ());
@@ -1193,6 +1201,286 @@ public function testSessionFreed()
11931201 $ this ->assertNull ($ rp ->getValue ($ changeStream ));
11941202 }
11951203
1204+ /**
1205+ * Prose test: "ChangeStream will automatically resume one time on a
1206+ * resumable error (including not master) with the initial pipeline and
1207+ * options, except for the addition/update of a resumeToken."
1208+ */
1209+ public function testResumeRepeatsOriginalPipelineAndOptions ()
1210+ {
1211+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1212+
1213+ $ aggregateCommands = [];
1214+
1215+ $ this ->configureFailPoint ([
1216+ 'configureFailPoint ' => 'failCommand ' ,
1217+ 'mode ' => ['times ' => 1 ],
1218+ 'data ' => ['failCommands ' => ['getMore ' ], 'errorCode ' => self ::NOT_MASTER ],
1219+ ]);
1220+
1221+ (new CommandObserver )->observe (
1222+ function () use ($ operation ) {
1223+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1224+
1225+ // The first next will hit the fail point, causing a resume
1226+ $ changeStream ->next ();
1227+ $ changeStream ->next ();
1228+ },
1229+ function (array $ event ) use (&$ aggregateCommands ) {
1230+ $ command = $ event ['started ' ]->getCommand ();
1231+ if ($ event ['started ' ]->getCommandName () !== 'aggregate ' ) {
1232+ return ;
1233+ }
1234+
1235+ $ aggregateCommands [] = (array ) $ command ;
1236+ }
1237+ );
1238+
1239+ $ this ->assertCount (2 , $ aggregateCommands );
1240+
1241+ $ this ->assertThat (
1242+ $ aggregateCommands [0 ]['pipeline ' ][0 ]->{'$changeStream ' },
1243+ $ this ->logicalNot (
1244+ $ this ->logicalOr (
1245+ $ this ->objectHasAttribute ('resumeAfter ' ),
1246+ $ this ->objectHasAttribute ('startAfter ' ),
1247+ $ this ->objectHasAttribute ('startAtOperationTime ' )
1248+ )
1249+ )
1250+ );
1251+
1252+ $ this ->assertThat (
1253+ $ aggregateCommands [1 ]['pipeline ' ][0 ]->{'$changeStream ' },
1254+ $ this ->logicalOr (
1255+ $ this ->objectHasAttribute ('resumeAfter ' ),
1256+ $ this ->objectHasAttribute ('startAfter ' ),
1257+ $ this ->objectHasAttribute ('startAtOperationTime ' )
1258+ )
1259+ );
1260+
1261+ $ aggregateCommands = array_map (
1262+ function (array $ aggregateCommand ) {
1263+ // Remove resume options from the changestream document
1264+ if (isset ($ aggregateCommand ['pipeline ' ][0 ]->{'$changeStream ' })) {
1265+ $ aggregateCommand ['pipeline ' ][0 ]->{'$changeStream ' } = array_diff_key (
1266+ (array ) $ aggregateCommand ['pipeline ' ][0 ]->{'$changeStream ' },
1267+ ['resumeAfter ' => false , 'startAfter ' => false , 'startAtOperationTime ' => false ]
1268+ );
1269+ }
1270+
1271+ // Remove options we don't want to compare between commands
1272+ return array_diff_key ($ aggregateCommand , ['lsid ' => false , '$clusterTime ' => false ]);
1273+ },
1274+ $ aggregateCommands
1275+ );
1276+
1277+ // Ensure options in original and resuming aggregate command match
1278+ $ this ->assertEquals ($ aggregateCommands [0 ], $ aggregateCommands [1 ]);
1279+ }
1280+
1281+ /**
1282+ * Prose test: "ChangeStream will not attempt to resume on any error
1283+ * encountered while executing an aggregate command."
1284+ */
1285+ public function testErrorDuringAggregateCommandDoesNotCauseResume ()
1286+ {
1287+ if (version_compare ($ this ->getServerVersion (), '4.0.0 ' , '< ' )) {
1288+ $ this ->markTestSkipped ('failCommand is not supported ' );
1289+ }
1290+
1291+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1292+
1293+ $ commandCount = 0 ;
1294+
1295+ $ this ->configureFailPoint ([
1296+ 'configureFailPoint ' => 'failCommand ' ,
1297+ 'mode ' => ['times ' => 1 ],
1298+ 'data ' => ['failCommands ' => ['aggregate ' ], 'errorCode ' => self ::NOT_MASTER ],
1299+ ]);
1300+
1301+ $ this ->expectException (CommandException::class);
1302+
1303+ (new CommandObserver )->observe (
1304+ function () use ($ operation ) {
1305+ $ operation ->execute ($ this ->getPrimaryServer ());
1306+ },
1307+ function (array $ event ) use (&$ commandCount ) {
1308+ $ commandCount ++;
1309+ }
1310+ );
1311+
1312+ $ this ->assertSame (1 , $ commandCount );
1313+ }
1314+
1315+ /**
1316+ * Prose test: "ChangeStream will perform server selection before attempting
1317+ * to resume, using initial readPreference"
1318+ */
1319+ public function testOriginalReadPreferenceIsPreservedOnResume ()
1320+ {
1321+ $ readPreference = new ReadPreference ('secondary ' );
1322+ $ options = ['readPreference ' => $ readPreference ] + $ this ->defaultOptions ;
1323+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1324+
1325+ try {
1326+ $ secondary = $ this ->manager ->selectServer ($ readPreference );
1327+ } catch (ConnectionTimeoutException $ e ) {
1328+ $ this ->markTestSkipped ('Secondary is not available ' );
1329+ }
1330+
1331+ $ changeStream = $ operation ->execute ($ secondary );
1332+ $ previousCursorId = $ changeStream ->getCursorId ();
1333+ $ this ->killChangeStreamCursor ($ changeStream );
1334+
1335+ $ changeStream ->next ();
1336+ $ this ->assertNotSame ($ previousCursorId , $ changeStream ->getCursorId ());
1337+
1338+ $ getCursor = Closure::bind (
1339+ function () {
1340+ return $ this ->iterator ->getInnerIterator ();
1341+ },
1342+ $ changeStream ,
1343+ ChangeStream::class
1344+ );
1345+ /** @var Cursor $cursor */
1346+ $ cursor = $ getCursor ();
1347+ self ::assertTrue ($ cursor ->getServer ()->isSecondary ());
1348+ }
1349+
1350+ /**
1351+ * Prose test
1352+ * For a ChangeStream under these conditions:
1353+ * - Running against a server <4.0.7.
1354+ * - The batch is empty or has been iterated to the last document.
1355+ * Expected result:
1356+ * - getResumeToken must return the _id of the last document returned if one exists.
1357+ * - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1358+ * - If resumeAfter was not specified, the getResumeToken result must be empty.
1359+ */
1360+ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch ()
1361+ {
1362+ if ($ this ->isPostBatchResumeTokenSupported ()) {
1363+ $ this ->markTestSkipped ('postBatchResumeToken is supported ' );
1364+ }
1365+
1366+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1367+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1368+
1369+ $ this ->assertNull ($ changeStream ->getResumeToken ());
1370+
1371+ $ this ->insertDocument (['x ' => 1 ]);
1372+
1373+ $ changeStream ->next ();
1374+ $ this ->assertTrue ($ changeStream ->valid ());
1375+ $ resumeToken = $ changeStream ->getResumeToken ();
1376+ $ this ->assertSame ($ resumeToken , $ changeStream ->current ()->_id );
1377+
1378+ $ options = ['resumeAfter ' => $ resumeToken ] + $ this ->defaultOptions ;
1379+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1380+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1381+
1382+ $ this ->assertSame ($ resumeToken , $ changeStream ->getResumeToken ());
1383+ }
1384+
1385+ /**
1386+ * Prose test: "$changeStream stage for ChangeStream started with startAfter
1387+ * against a server >=4.1.1 that has not received any results yet MUST
1388+ * include a startAfter option and MUST NOT include a resumeAfter option
1389+ * when resuming a change stream."
1390+ */
1391+ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption ()
1392+ {
1393+ if (version_compare ($ this ->getServerVersion (), '4.1.1 ' , '< ' )) {
1394+ $ this ->markTestSkipped ('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1 ' );
1395+ }
1396+
1397+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1398+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1399+
1400+ $ this ->insertDocument (['x ' => 1 ]);
1401+
1402+ $ changeStream ->next ();
1403+ $ this ->assertTrue ($ changeStream ->valid ());
1404+ $ resumeToken = $ changeStream ->getResumeToken ();
1405+
1406+ $ options = ['startAfter ' => $ resumeToken ] + $ this ->defaultOptions ;
1407+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1408+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1409+ $ changeStream ->rewind ();
1410+ $ this ->killChangeStreamCursor ($ changeStream );
1411+
1412+ $ aggregateCommand = null ;
1413+
1414+ (new CommandObserver )->observe (
1415+ function () use ($ changeStream ) {
1416+ $ changeStream ->next ();
1417+ },
1418+ function (array $ event ) use (&$ aggregateCommand ) {
1419+ if ($ event ['started ' ]->getCommandName () !== 'aggregate ' ) {
1420+ return ;
1421+ }
1422+
1423+ $ aggregateCommand = $ event ['started ' ]->getCommand ();
1424+ }
1425+ );
1426+
1427+ $ this ->assertNotNull ($ aggregateCommand );
1428+ $ this ->assertObjectNotHasAttribute ('resumeAfter ' , $ aggregateCommand ->pipeline [0 ]->{'$changeStream ' });
1429+ $ this ->assertObjectHasAttribute ('startAfter ' , $ aggregateCommand ->pipeline [0 ]->{'$changeStream ' });
1430+ }
1431+
1432+ /**
1433+ * Prose test: "$changeStream stage for ChangeStream started with startAfter
1434+ * against a server >=4.1.1 that has received at least one result MUST
1435+ * include a resumeAfter option and MUST NOT include a startAfter option
1436+ * when resuming a change stream."
1437+ */
1438+ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption ()
1439+ {
1440+ if (version_compare ($ this ->getServerVersion (), '4.1.1 ' , '< ' )) {
1441+ $ this ->markTestSkipped ('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1 ' );
1442+ }
1443+
1444+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1445+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1446+
1447+ $ this ->insertDocument (['x ' => 1 ]);
1448+
1449+ $ changeStream ->next ();
1450+ $ this ->assertTrue ($ changeStream ->valid ());
1451+ $ resumeToken = $ changeStream ->getResumeToken ();
1452+
1453+ $ options = ['startAfter ' => $ resumeToken ] + $ this ->defaultOptions ;
1454+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1455+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1456+ $ changeStream ->rewind ();
1457+
1458+ $ this ->insertDocument (['x ' => 2 ]);
1459+ $ changeStream ->next ();
1460+ $ this ->assertTrue ($ changeStream ->valid ());
1461+
1462+ $ this ->killChangeStreamCursor ($ changeStream );
1463+
1464+ $ aggregateCommand = null ;
1465+
1466+ (new CommandObserver )->observe (
1467+ function () use ($ changeStream ) {
1468+ $ changeStream ->next ();
1469+ },
1470+ function (array $ event ) use (&$ aggregateCommand ) {
1471+ if ($ event ['started ' ]->getCommandName () !== 'aggregate ' ) {
1472+ return ;
1473+ }
1474+
1475+ $ aggregateCommand = $ event ['started ' ]->getCommand ();
1476+ }
1477+ );
1478+
1479+ $ this ->assertNotNull ($ aggregateCommand );
1480+ $ this ->assertObjectNotHasAttribute ('startAfter ' , $ aggregateCommand ->pipeline [0 ]->{'$changeStream ' });
1481+ $ this ->assertObjectHasAttribute ('resumeAfter ' , $ aggregateCommand ->pipeline [0 ]->{'$changeStream ' });
1482+ }
1483+
11961484 private function assertNoCommandExecuted (callable $ callable )
11971485 {
11981486 $ commands = [];
0 commit comments