From dd254667e250f18265e676a4767697fce721ba41 Mon Sep 17 00:00:00 2001 From: karolkujawski Date: Tue, 8 Oct 2024 14:44:00 +0200 Subject: [PATCH 1/2] Additional tests for ChangeStreams and ConfigManager verification - 2 failing need consultation --- .../gravity9/mongocse/ChangeStreamTest.java | 252 ++++++++++++++++-- .../gravity9/mongocse/ConfigManagerTest.java | 115 +++++--- .../gravity9/mongocse/constants/TestIds.java | 8 +- 3 files changed, 311 insertions(+), 64 deletions(-) diff --git a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java index 45dd396..15d538c 100644 --- a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java +++ b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java @@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; class ChangeStreamTest extends AbstractMongoDbBase { @@ -304,9 +306,9 @@ void givenConfigurationWithMatchStageWithSingleCondition_shouldAcceptOnlyMatchin @Test void givenConfigurationWithMatchStageWithMultipleConditions_shouldAcceptOnlyMatchingEvents() throws InterruptedException { var match = Filters.and( - Filters.gt("fullDocument.testValue", 0), - Filters.lt("fullDocument.testValue", 2), - Filters.in("operationType", List.of("insert")) + Filters.gt("fullDocument.testValue", 0), + Filters.lt("fullDocument.testValue", 2), + Filters.in("operationType", List.of("insert")) ); var config = buildMongoConfig(match); @@ -346,16 +348,16 @@ void givenConfigurationWithMatchStageWithIdFilter_shouldAcceptOnlyMatchingEvents void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatchingEvents() throws InterruptedException { var match = Filters.in("fullDocument.testValue", List.of(0, 2)); var config = MongoConfig.builder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .match(match) - .keyName("testId") - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()) - .numberOfPartitions(3) - .fullDocument(FullDocument.UPDATE_LOOKUP) - .build(); + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .match(match) + .keyName("testId") + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()) + .numberOfPartitions(3) + .fullDocument(FullDocument.UPDATE_LOOKUP) + .build(); MongoCseManager manager = new MongoCseManager(config); TestChangeStreamListener listener0 = new TestChangeStreamListener(); @@ -384,17 +386,213 @@ void givenConfigurationWithMatchStage_and_multipleListeners_shouldAcceptOnlyMatc assertEquals(2, events2.get(0).getFullDocument().getInteger("testValue")); } + @Test + void givenDuplicateListener_shouldReceiveEventsIndependently() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create the primary listener and duplicate listener. + TestChangeStreamListener primaryListener = new TestChangeStreamListener(); + TestChangeStreamListener duplicateListener = new TestChangeStreamListener(); + + // Register the primary listener for partition 0 and duplicate for the same partition. + manager.registerListener(primaryListener, List.of(0)); + manager.registerListener(duplicateListener, List.of(0)); + + // Start the manager and allow time for listeners to initialize. + manager.start(); + Thread.sleep(1000); + + // Insert a document into the collection, triggering an event for partition 0. + Document testDoc0 = new Document(Map.of( + "_id", new ObjectId(TestIds.MOD_0_ID), + "testValue", 0 + )); + collection.insertOne(testDoc0); + + // Wait for CDC events to be picked up by the listeners. + Thread.sleep(500); + + // Retrieve events for both listeners. + List> primaryEvents = primaryListener.getEvents(); + List> duplicateEvents = duplicateListener.getEvents(); + + // Assert that both listeners receive the event independently. + assertEquals(1, primaryEvents.size(), "Primary listener should receive 1 event."); + assertEquals(1, duplicateEvents.size(), "Duplicate listener should receive 1 event."); + + // Verify that both listeners received the same event data. + assertNotEquals(primaryEvents.get(0).getFullDocument(), null); + assertEquals(0, primaryEvents.get(0).getFullDocument().getInteger("testValue")); + assertEquals(0, duplicateEvents.get(0).getFullDocument().getInteger("testValue")); + } + + @Test + void givenAfterAllListenersAreDeregistered_shouldStopProcessingEvents() throws InterruptedException { + // Arrange: Create a MongoCseManager and register two listeners for partition 0 and 1 + MongoCseManager manager = new MongoCseManager(mongoConfig); + + TestChangeStreamListener listener0 = new TestChangeStreamListener(); + TestChangeStreamListener listener1 = new TestChangeStreamListener(); + manager.registerListener(listener0, List.of(0)); + manager.registerListener(listener1, List.of(1)); + manager.start(); + + // Insert a document to trigger an event for both listeners + insertDocumentsToAllPartitions(); + Thread.sleep(500); + + // Verify that listeners received initial events + assertEquals(1, listener0.getEvents().size(), "Listener 0 should receive 1 event."); + assertEquals(1, listener1.getEvents().size(), "Listener 1 should receive 1 event."); + + // Act: Deregister both listeners from all partitions + manager.deregisterListenerFromAllPartitions(listener0); + manager.deregisterListenerFromAllPartitions(listener1); + + // Insert another documents to test if deregistered listeners receive any events + insertMultipleDocumentsToAllPartitions(5); + Thread.sleep(500); + + // Assert: Verify that no new events are received by either listener + assertEquals(1, listener0.getEvents().size(), "Listener 0 should not receive any new events."); + assertEquals(1, listener1.getEvents().size(), "Listener 1 should not receive any new events."); + } + + @Test + void givenListenerIsDeregistered_shouldNotHandleEventsFromSpecificPartition() throws InterruptedException { + // Given: Create a MongoCseManager with a configuration + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Register a single listener to all partitions (0, 1, and 2) + TestChangeStreamListener listener = new TestChangeStreamListener(); + manager.registerListenerToAllPartitions(listener); + manager.start(); + + // Insert documents into all partitions + var numberOfInsertedDocuments = insertDocumentsToAllPartitions(); + + // Wait for CDC events to be picked up + Thread.sleep(500); + + // Verify that the listener received all events for all partitions + List> initialEvents = listener.getEvents(); + assertEquals(numberOfInsertedDocuments, initialEvents.size()); + + // When: Deregister the listener from partition 1 only + manager.deregisterListener(listener, List.of(1)); + + // Perform a new operation on each partition + var deleteResult0 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_0_ID))); + var deleteResult1 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_1_ID))); + var deleteResult2 = collection.deleteOne(Filters.eq("_id", new ObjectId(TestIds.MOD_2_ID))); + + assertEquals(1, deleteResult0.getDeletedCount()); + assertEquals(1, deleteResult1.getDeletedCount()); + assertEquals(1, deleteResult2.getDeletedCount()); + + // Wait for CDC events to be picked up + Thread.sleep(500); + + // Then: Only events from partition 0 and 2 should be handled (since partition 1 was deregistered) + var expectedNewEventCount = 2; // 1 delete event from partition 0 and 1 delete event from partition 2 + var totalExpectedEvents = numberOfInsertedDocuments + expectedNewEventCount; + + List> finalEvents = listener.getEvents(); + assertEquals(totalExpectedEvents, finalEvents.size()); + } + + @Test + void givenListenerOnSpecificPartition_shouldNotReceiveEventsFromOtherPartitions() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create a listener and register it only for partition 1. + TestChangeStreamListener partition1Listener = new TestChangeStreamListener(); + manager.registerListener(partition1Listener, List.of(1)); + + // Start the manager and allow time for listeners to initialize. + manager.start(); + Thread.sleep(1000); + + // Insert a document in partition 0, which should not trigger an event for partition 1. + Document testDocPartition0 = new Document(Map.of( + "_id", new ObjectId(TestIds.MOD_0_ID), + "testValue", 0 + )); + collection.insertOne(testDocPartition0); + + // Wait for CDC events to be picked up by the listener. + Thread.sleep(500); + + // Verify that partition1Listener did not receive any events. + List> partition1Events = partition1Listener.getEvents(); + assertEquals(0, partition1Events.size(), "Listener registered for partition 1 should not receive events from partition 0."); + } + + @Test + void givenListenerOnSpecificPartition_shouldReceiveEventsOnlyFromRegisteredPartition() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create listeners for each partition + TestChangeStreamListener partition0Listener = new TestChangeStreamListener(); + manager.registerListener(partition0Listener, List.of(0)); + manager.start(); + + // Insert documents for partitions 0, 1, and 2 + Document testDoc0 = new Document(Map.of("_id", new ObjectId(), "testValue", 0)); + Document testDoc1 = new Document(Map.of("_id", new ObjectId(), "testValue", 1)); + Document testDoc2 = new Document(Map.of("_id", new ObjectId(), "testValue", 2)); + collection.insertMany(List.of(testDoc0, testDoc1, testDoc2)); + + // Wait for CDC events to arrive + Thread.sleep(500); + + // Verify the listener only receives events from the registered partition (partition 0) + List> partition0Events = partition0Listener.getEvents(); + assertEquals(1, partition0Events.size(), "Listener should only receive events from partition 0."); + assertEquals(0, partition0Events.get(0).getFullDocument().getInteger("testValue")); + } + + @Test + void givenDelayedListenerRegistration_shouldReceiveExpectedResults() throws InterruptedException { + MongoCseManager manager = new MongoCseManager(mongoConfig); + + // Create a listener that will register after initial events + TestChangeStreamListener delayedListener = new TestChangeStreamListener(); + + // Simulate initial events + insertMultipleDocumentsToAllPartitions(10); + + // Register the listener after initial events + manager.registerListener(delayedListener, List.of(1)); + manager.start(); + + // Insert more documents + insertMultipleDocumentsToAllPartitions(5); + + // Wait for events to be processed + Thread.sleep(100); // Ensure sufficient time for events to propagate + + // Verify that the delayed listener received the expected events + assertEquals(10, delayedListener.getEvents().size(), "Delayed listener should receive 10 events."); + + // Optionally validate the content of received events + for (ChangeStreamDocument event : delayedListener.getEvents()) { + assertTrue(event.getFullDocument().getInteger("testValue") >= 10 && event.getFullDocument().getInteger("testValue") < 20, + "Unexpected value in delayed listener events."); + } + } + private MongoConfig buildMongoConfig(Bson match) { return MongoConfig.builder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .match(match) - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()) - .numberOfPartitions(3) - .fullDocument(FullDocument.UPDATE_LOOKUP) - .build(); + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .match(match) + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()) + .numberOfPartitions(3) + .fullDocument(FullDocument.UPDATE_LOOKUP) + .build(); } private int insertDocumentsToAllPartitions() { @@ -433,6 +631,16 @@ private int insertDocumentsWithTestIdToAllPartitions() { return result.getInsertedIds().size(); } + private void insertMultipleDocumentsToAllPartitions(int numberOfDocuments) { + for (int i = 0; i < numberOfDocuments; i++) { + Document testDoc = new Document(Map.of( + "testId", new ObjectId(TestIds.MOD_3_ID), + "testValue", i + )); + collection.insertOne(testDoc); + } + } + private void assertEventsContainNumberOfOpTypes(List> events, OperationType operationType, long expectedCount) { long actualCount = events.stream().filter(op -> op.getOperationType() == operationType).count(); assertEquals(expectedCount, actualCount); diff --git a/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java b/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java index a88b906..cecfbf3 100644 --- a/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java +++ b/src/test/java/com/gravity9/mongocse/ConfigManagerTest.java @@ -1,51 +1,88 @@ package com.gravity9.mongocse; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertThrows; public class ConfigManagerTest extends AbstractMongoDbBase { - private MongoConfig.MongoConfigBuilder mongoConfigBuilder; - - @BeforeEach - public void setup() { - super.setup(); - mongoConfigBuilder = new MongoConfig.MongoConfigBuilder() - .connectionUri(getConnectionUri()) - .databaseName(getDatabaseName()) - .collectionName(getTestCollectionName()) - .workerConfigCollectionName(getWorkerConfigCollectionName()) - .clusterConfigCollectionName(getClusterConfigCollectionName()); - } - - @Test - void givenSameConfiguration_shouldNotThrowException() { - int partitions = 3; - var mongoConfig = mongoConfigBuilder - .numberOfPartitions(partitions) - .build(); - new MongoCseManager(mongoConfig); - assertDoesNotThrow(() -> new MongoCseManager(mongoConfig)); - - WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions); - assertEquals(getTestCollectionName(), config.getCollection()); - assertEquals(partitions, config.getPartitions()); - } - - @Test - void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() { - var firstConfig = mongoConfigBuilder - .numberOfPartitions(3) - .build(); - var secondConfig = mongoConfigBuilder - .numberOfPartitions(1) - .build(); - new MongoCseManager(firstConfig); - assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig)); - } + private MongoConfig.MongoConfigBuilder mongoConfigBuilder; + + @BeforeEach + public void setup() { + super.setup(); + mongoConfigBuilder = new MongoConfig.MongoConfigBuilder() + .connectionUri(getConnectionUri()) + .databaseName(getDatabaseName()) + .collectionName(getTestCollectionName()) + .workerConfigCollectionName(getWorkerConfigCollectionName()) + .clusterConfigCollectionName(getClusterConfigCollectionName()); + } + + @Test + void givenSameConfiguration_shouldNotThrowException() { + int partitions = 3; + var mongoConfig = mongoConfigBuilder + .numberOfPartitions(partitions) + .build(); + new MongoCseManager(mongoConfig); + assertDoesNotThrow(() -> new MongoCseManager(mongoConfig)); + + WorkerClusterConfig config = new ConfigManager(mongoConfig, CLIENT_PROVIDER).getOrInitClusterConfig(getTestCollectionName(), partitions); + assertEquals(getTestCollectionName(), config.getCollection()); + assertEquals(partitions, config.getPartitions()); + } + + @Test + void givenNewConfigWithDifferentNumberOfPartitions_shouldThrowException() { + var firstConfig = mongoConfigBuilder + .numberOfPartitions(3) + .build(); + var secondConfig = mongoConfigBuilder + .numberOfPartitions(1) + .build(); + new MongoCseManager(firstConfig); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(secondConfig)); + } + + @Test + void givenNullConfig_shouldThrowNullPointerException() { + assertThrows(NullPointerException.class, () -> new MongoCseManager(null)); + } + + @Test + void givenWrongDatabaseName_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .databaseName("invalid_db_name") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenInvalidConnectionUri_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .connectionUri("invalid_uri") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenInvalidCollectionName_shouldThrowException() { + var mongoConfig = mongoConfigBuilder + .collectionName("invalid_collection_name") + .build(); + assertThrows(IllegalArgumentException.class, () -> new MongoCseManager(mongoConfig)); + } + + @Test + void givenSameConfigTwice_shouldCreateSeparateInstances() { + var mongoConfig = mongoConfigBuilder.numberOfPartitions(2).build(); + MongoCseManager firstInstance = new MongoCseManager(mongoConfig); + MongoCseManager secondInstance = new MongoCseManager(mongoConfig); + assertNotSame(firstInstance, secondInstance); + } } diff --git a/src/test/java/com/gravity9/mongocse/constants/TestIds.java b/src/test/java/com/gravity9/mongocse/constants/TestIds.java index ed480e0..5e2a03d 100644 --- a/src/test/java/com/gravity9/mongocse/constants/TestIds.java +++ b/src/test/java/com/gravity9/mongocse/constants/TestIds.java @@ -2,9 +2,11 @@ public class TestIds { - public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7"; + public static final String MOD_0_ID = "652e9fdc597d12ddbf7380e7"; - public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843"; + public static final String MOD_1_ID = "652e9f6fcd6b9a316b067843"; - public static final String MOD_2_ID = "652ea13969adc932efb550d4"; + public static final String MOD_2_ID = "652ea13969adc932efb550d4"; + + public static final String MOD_3_ID = "000e9fdc597d12ddbf738000"; } From a121f7e0429b3d9e194e5457f080571d59fd24d3 Mon Sep 17 00:00:00 2001 From: karolkujawski Date: Thu, 17 Oct 2024 09:15:20 +0200 Subject: [PATCH 2/2] Remove failing tests --- .../gravity9/mongocse/ChangeStreamTest.java | 59 +------------------ 1 file changed, 1 insertion(+), 58 deletions(-) diff --git a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java index 15d538c..f72218c 100644 --- a/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java +++ b/src/test/java/com/gravity9/mongocse/ChangeStreamTest.java @@ -22,10 +22,7 @@ import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; class ChangeStreamTest extends AbstractMongoDbBase { @@ -528,60 +525,6 @@ void givenListenerOnSpecificPartition_shouldNotReceiveEventsFromOtherPartitions( assertEquals(0, partition1Events.size(), "Listener registered for partition 1 should not receive events from partition 0."); } - @Test - void givenListenerOnSpecificPartition_shouldReceiveEventsOnlyFromRegisteredPartition() throws InterruptedException { - MongoCseManager manager = new MongoCseManager(mongoConfig); - - // Create listeners for each partition - TestChangeStreamListener partition0Listener = new TestChangeStreamListener(); - manager.registerListener(partition0Listener, List.of(0)); - manager.start(); - - // Insert documents for partitions 0, 1, and 2 - Document testDoc0 = new Document(Map.of("_id", new ObjectId(), "testValue", 0)); - Document testDoc1 = new Document(Map.of("_id", new ObjectId(), "testValue", 1)); - Document testDoc2 = new Document(Map.of("_id", new ObjectId(), "testValue", 2)); - collection.insertMany(List.of(testDoc0, testDoc1, testDoc2)); - - // Wait for CDC events to arrive - Thread.sleep(500); - - // Verify the listener only receives events from the registered partition (partition 0) - List> partition0Events = partition0Listener.getEvents(); - assertEquals(1, partition0Events.size(), "Listener should only receive events from partition 0."); - assertEquals(0, partition0Events.get(0).getFullDocument().getInteger("testValue")); - } - - @Test - void givenDelayedListenerRegistration_shouldReceiveExpectedResults() throws InterruptedException { - MongoCseManager manager = new MongoCseManager(mongoConfig); - - // Create a listener that will register after initial events - TestChangeStreamListener delayedListener = new TestChangeStreamListener(); - - // Simulate initial events - insertMultipleDocumentsToAllPartitions(10); - - // Register the listener after initial events - manager.registerListener(delayedListener, List.of(1)); - manager.start(); - - // Insert more documents - insertMultipleDocumentsToAllPartitions(5); - - // Wait for events to be processed - Thread.sleep(100); // Ensure sufficient time for events to propagate - - // Verify that the delayed listener received the expected events - assertEquals(10, delayedListener.getEvents().size(), "Delayed listener should receive 10 events."); - - // Optionally validate the content of received events - for (ChangeStreamDocument event : delayedListener.getEvents()) { - assertTrue(event.getFullDocument().getInteger("testValue") >= 10 && event.getFullDocument().getInteger("testValue") < 20, - "Unexpected value in delayed listener events."); - } - } - private MongoConfig buildMongoConfig(Bson match) { return MongoConfig.builder() .connectionUri(getConnectionUri())