diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java new file mode 100644 index 000000000..9e545068b --- /dev/null +++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java @@ -0,0 +1,177 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.internal.mqtt.datatypes; + +import com.hivemq.client.annotations.Immutable; +import com.hivemq.client.internal.util.Checks; +import com.hivemq.client.mqtt.datatypes.MqttQueueTopicFilter; +import com.hivemq.client.mqtt.datatypes.MqttTopicFilterBuilder; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * @author Silvio Giebl + * @see MqttQueueTopicFilter + * @see MqttUtf8StringImpl + */ +@Immutable +public class MqttQueueTopicFilterImpl extends MqttTopicFilterImpl implements MqttQueueTopicFilter { + + private static final int QUEUE_PREFIX_LENGTH = QUEUE_PREFIX.length(); + + /** + * Checks if the given UTF-16 encoded Java string represents a Queue Topic Filter. This method does not validate + * whether it represents a valid Queue Topic Filter but only whether it starts with {@value #QUEUE_PREFIX}. + * + * @param string the given UTF-16 encoded Java string. + * @return whether the string represents a Queue Topic Filter. + */ + static boolean isQueue(final @NotNull String string) { + return string.startsWith(QUEUE_PREFIX); + } + + /** + * Checks if the given byte array with UTF-8 encoded data represents a Queue Topic Filter. This method does not + * validate whether it represents a valid Queue Topic Filter but only whether it starts with {@value + * #QUEUE_PREFIX}. + * + * @param binary the byte array with UTF-8 encoded data. + * @return whether the byte array represents a Queue Topic Filter. + */ + static boolean isQueue(final byte @NotNull [] binary) { + if (binary.length < QUEUE_PREFIX_LENGTH) { + return false; + } + for (int i = 0; i < QUEUE_PREFIX_LENGTH; i++) { + if (binary[i] != QUEUE_PREFIX.charAt(i)) { + return false; + } + } + return true; + } + + /** + * Validates and creates a Queue Topic Filter of the given UTF-16 encoded Java string. + *

+ * This method does not validate {@link MqttUtf8StringImpl#checkLength(String, String) length}, {@link + * MqttUtf8StringImpl#checkWellFormed(String, String) if well-formed} and {@link #isQueue(String) if queue}. + * + * @param string the UTF-16 encoded Java string starting with {@value #QUEUE_PREFIX}. + * @return the created Queue Topic Filter. + * @throws IllegalArgumentException if the string is not a valid Queue Topic Filter. + */ + static @NotNull MqttQueueTopicFilterImpl ofInternal(final @NotNull String string) { + if (string.length() <= QUEUE_PREFIX_LENGTH) { + throw new IllegalArgumentException("Topic filter must be at least one character long."); + } + final int wildcardFlags = validateWildcards(string, QUEUE_PREFIX_LENGTH); + return new MqttQueueTopicFilterImpl(string, wildcardFlags); + } + + /** + * Validates and creates a Queue Topic Filter of the given byte array with UTF-8 encoded data. + *

+ * This method does not validate length, {@link MqttUtf8StringImpl#isWellFormed(byte[]) if well-formed} and {@link + * #isQueue(byte[]) if queue}. + * + * @param binary the byte array with UTF-8 encoded data starting with {@value #QUEUE_PREFIX}. + * @return the created Queue Topic Filter or null if the byte array is not a valid Queue Topic + * Filter. + */ + static @Nullable MqttQueueTopicFilterImpl ofInternal(final byte @NotNull [] binary) { + if (binary.length <= QUEUE_PREFIX_LENGTH) { + return null; + } + final int wildcardFlags = validateWildcards(binary, QUEUE_PREFIX_LENGTH); + if (wildcardFlags == WILDCARD_CHECK_FAILURE) { + return null; + } + return new MqttQueueTopicFilterImpl(binary, wildcardFlags); + } + + /** + * Validates and creates a Queue Topic Filter of the given Topic Filter. + * + * @param topicFilter the Topic Filter string. + * @return the created Queue Topic Filter. + * @throws IllegalArgumentException if the Topic Filter string is not a valid Topic Filter. + */ + @Contract("null -> fail") + public static @NotNull MqttQueueTopicFilterImpl of(final @Nullable String topicFilter) { + Checks.notEmpty(topicFilter, "Topic filter"); + checkWellFormed(topicFilter, "Topic filter"); + final String queueTopicFilter = QUEUE_PREFIX + topicFilter; + checkLength(queueTopicFilter, "Queue topic filter"); + final int wildcardFlags = validateWildcards(topicFilter, 0); + return new MqttQueueTopicFilterImpl(queueTopicFilter, wildcardFlags); + } + + /** + * Validates and creates a Queue Topic Filter of the given Topic Filter. + * + * @param topicFilter the Topic Filter. + * @return the created Queue Topic Filter. + */ + public static @NotNull MqttQueueTopicFilterImpl of(final @NotNull MqttTopicFilterImpl topicFilter) { + final String queueTopicFilter = QUEUE_PREFIX + topicFilter; + checkLength(queueTopicFilter, "Queue topic filter"); + return new MqttQueueTopicFilterImpl(queueTopicFilter, topicFilter.wildcardFlags); + } + + private final int filterByteStart; + private final int filterCharStart; + + private MqttQueueTopicFilterImpl(final byte @NotNull [] binary, final int wildcardFlags) { + super(binary, wildcardFlags); + this.filterByteStart = QUEUE_PREFIX_LENGTH; + this.filterCharStart = -1; + } + + private MqttQueueTopicFilterImpl(final @NotNull String string, final int wildcardFlags) { + super(string, wildcardFlags); + this.filterByteStart = -1; + this.filterCharStart = QUEUE_PREFIX_LENGTH; + } + + @Override + public @NotNull String getTopicFilterString() { + return toString().substring(getFilterCharStart()); + } + + @Override + public @NotNull MqttTopicFilterImpl getTopicFilter() { + return MqttTopicFilterImpl.of(this); + } + + @Override + int getFilterByteStart() { + return filterByteStart; + } + + private int getFilterCharStart() { + if (filterCharStart == -1) { + return QUEUE_PREFIX_LENGTH; + } + return filterCharStart; + } + + @Override + public MqttTopicFilterBuilder.@NotNull Complete extendQueue() { + return new MqttTopicFilterImplBuilder.Default(this); + } +} diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java index 1a37b0429..a028f9607 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java @@ -60,6 +60,9 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic if (MqttSharedTopicFilterImpl.isShared(string)) { return MqttSharedTopicFilterImpl.ofInternal(string); } + if (MqttQueueTopicFilterImpl.isQueue(string)) { + return MqttQueueTopicFilterImpl.ofInternal(string); + } final int wildcardFlags = validateWildcards(string, 0); return new MqttTopicFilterImpl(string, wildcardFlags); } @@ -84,6 +87,16 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic return new MqttTopicFilterImpl(sharedTopicFilter.getTopicFilterString(), sharedTopicFilter.wildcardFlags); } + /** + * Creates a Topic Filter of the given Queue Topic Filter. + * + * @param queueTopicFilter the Queue Topic Filter. + * @return the created Topic Filter. + */ + public static @NotNull MqttTopicFilterImpl of(final @NotNull MqttQueueTopicFilterImpl queueTopicFilter) { + return new MqttTopicFilterImpl(queueTopicFilter.getTopicFilterString(), queueTopicFilter.wildcardFlags); + } + /** * Validates and creates a Topic Filter of the given byte array with UTF-8 encoded data. * @@ -97,6 +110,9 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic if (MqttSharedTopicFilterImpl.isShared(binary)) { return MqttSharedTopicFilterImpl.ofInternal(binary); } + if (MqttQueueTopicFilterImpl.isQueue(binary)) { + return MqttQueueTopicFilterImpl.ofInternal(binary); + } final int wildcardFlags = validateWildcards(binary, 0); if (wildcardFlags == WILDCARD_CHECK_FAILURE) { return null; diff --git a/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java b/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java new file mode 100644 index 000000000..129240d60 --- /dev/null +++ b/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.mqtt.datatypes; + +import com.hivemq.client.annotations.DoNotImplement; +import com.hivemq.client.internal.mqtt.datatypes.MqttQueueTopicFilterImpl; +import org.jetbrains.annotations.NotNull; + +/** + * MQTT Queue Topic Filter according to the MQTT specification. + *

+ * A Queue Topic Filter consists of the Queue prefix ({@value #QUEUE_PREFIX}) and a Topic Filter. + *

+ * The Topic Filter has the same restrictions as a {@link MqttTopicFilter Topic Filter}. + * + * @author Jean-François Côté + * @since 1.0 + */ +@DoNotImplement +public interface MqttQueueTopicFilter extends MqttTopicFilter { + + /** + * The prefix of a Queue Topic Filter. + */ + @NotNull String QUEUE_PREFIX = "$q" + MqttTopic.TOPIC_LEVEL_SEPARATOR; + + /** + * Validates and creates a Queue Topic Filter of the given Topic Filter. + * + * @param topicFilter the string representation of the Topic Filter. + * @return the created Queue Topic Filter. + * @throws IllegalArgumentException if the Topic Filter string is + * not a valid Topic Filter. + */ + static @NotNull MqttQueueTopicFilter of(final @NotNull String topicFilter) { + return MqttQueueTopicFilterImpl.of(topicFilter); + } + + /** + * @return the Topic Filter of this Queue Topic Filter. + */ + @NotNull MqttTopicFilter getTopicFilter(); + + /** + * Creates a builder for extending this Queue Topic Filter. + * + * @return the created builder. + */ + MqttTopicFilterBuilder.@NotNull Complete extendQueue(); +} diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java new file mode 100644 index 000000000..181866ce6 --- /dev/null +++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java @@ -0,0 +1,275 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.internal.mqtt.datatypes; + +import com.hivemq.client.internal.util.collections.ImmutableList; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * @author Jean-François Côté + */ +class MqttQueueTopicFilterImplTest { + + enum QueueTopicFilterSource { + STRING, + BYTE_BUF, + TOPIC_FILTER + } + + private @Nullable MqttTopicFilterImpl from( + final @NotNull QueueTopicFilterSource source, + final @NotNull String topicFilter) { + + if (source == QueueTopicFilterSource.TOPIC_FILTER) { + return MqttQueueTopicFilterImpl.of(topicFilter); + } + + final String queueTopicFilter = "$q/" + topicFilter; + + switch (source) { + case BYTE_BUF: + final ByteBuf byteBuf = Unpooled.buffer(); + final byte[] binary = queueTopicFilter.getBytes(StandardCharsets.UTF_8); + byteBuf.writeShort(binary.length); + byteBuf.writeBytes(binary); + final MqttTopicFilterImpl mqtt5TopicFilter = MqttTopicFilterImpl.decode(byteBuf); + byteBuf.release(); + return mqtt5TopicFilter; + case STRING: + return MqttTopicFilterImpl.of(queueTopicFilter); + default: + throw new IllegalStateException(); + } + } + + private @Nullable MqttTopicFilterImpl fromFullString( + final @NotNull QueueTopicFilterSource source, final @NotNull String fullQueueTopicFilter) { + + switch (source) { + case BYTE_BUF: + final ByteBuf byteBuf = Unpooled.buffer(); + final byte[] binary = fullQueueTopicFilter.getBytes(StandardCharsets.UTF_8); + byteBuf.writeShort(binary.length); + byteBuf.writeBytes(binary); + final MqttTopicFilterImpl mqtt5TopicFilter = MqttTopicFilterImpl.decode(byteBuf); + byteBuf.release(); + return mqtt5TopicFilter; + case STRING: + return MqttTopicFilterImpl.of(fullQueueTopicFilter); + default: + throw new IllegalStateException(); + } + } + + @ParameterizedTest + @EnumSource(QueueTopicFilterSource.class) + void from_simple(final @NotNull QueueTopicFilterSource source) { + final String topicFilter = "abc/def"; + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + + final MqttQueueTopicFilterImpl mqtt5QueueTopicFilter = (MqttQueueTopicFilterImpl) mqtt5TopicFilter; + assertEquals(topicFilter, mqtt5QueueTopicFilter.getTopicFilter().toString()); + } + + private static @NotNull List invalidQueueSubscriptions() { + final List testSpecs = new LinkedList<>(); + // queueTopicFilter, message + testSpecs.add(Arguments.of("$q/", "Topic filter must be at least one character long")); + return testSpecs; + } + + @ParameterizedTest + @MethodSource("invalidQueueSubscriptions") + void from_invalidQueueSubscriptionByteBuf_returnsNull( + final @NotNull String queueTopicFilter, @SuppressWarnings("unused") final @NotNull String message) { + + final MqttTopicFilterImpl mqtt5TopicFilter = fromFullString(QueueTopicFilterSource.BYTE_BUF, queueTopicFilter); + assertNull(mqtt5TopicFilter); + } + + @ParameterizedTest + @MethodSource("invalidQueueSubscriptions") + void from_invalidQueueSubscriptionString_throws( + final @NotNull String queueTopicFilter, final @NotNull String message) { + + final IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class, + () -> fromFullString(QueueTopicFilterSource.STRING, queueTopicFilter)); + assertTrue(exception.getMessage().contains(message), "IllegalArgumentException must give hint that " + message); + } + + private static @NotNull List invalidTopicFilter( + final @NotNull QueueTopicFilterSource source) { + + final List testSpecs = new LinkedList<>(); + // source, topicFilter, errorMsg + testSpecs.add(Arguments.of(source, "abc/def/ghi/#/", + "Topic filter [abc/def/ghi/#/] contains misplaced wildcard characters. " + + "Multi level wildcard (#) must be the last character.")); + testSpecs.add(Arguments.of(source, "abc/def/ghi#", + "Topic filter [abc/def/ghi#] contains misplaced wildcard characters. " + + "Wildcard (#) at index 11 must follow a topic level separator.")); + testSpecs.add(Arguments.of(source, "abc+/def/ghi", + "Topic filter [abc+/def/ghi] contains misplaced wildcard characters. " + + "Wildcard (+) at index 3 must follow a topic level separator.")); + testSpecs.add(Arguments.of(source, "abc/+def/ghi", + "Topic filter [abc/+def/ghi] contains misplaced wildcard characters. " + + "Single level wildcard (+) at index 4 must be followed by a topic level separator.")); + testSpecs.add(Arguments.of(source, "abc/def/ghi/+#", + "Topic filter [abc/def/ghi/+#] contains misplaced wildcard characters. " + + "Single level wildcard (+) at index 12 must be followed by a topic level separator.")); + testSpecs.add(Arguments.of(source, "abc/++/def/ghi", + "Topic filter [abc/++/def/ghi] contains misplaced wildcard characters. " + + "Single level wildcard (+) at index 4 must be followed by a topic level separator.")); + testSpecs.add(Arguments.of(source, "", "Topic filter must be at least one character long")); + return testSpecs; + } + + private static @NotNull List invalidTopicFilterFromStringAndFromTopicFilter() { + final List testSpecs = invalidTopicFilter(QueueTopicFilterSource.STRING); + testSpecs.addAll(invalidTopicFilter(QueueTopicFilterSource.TOPIC_FILTER)); + return testSpecs; + } + + private static @NotNull List invalidTopicFilterFromByteBuf() { + return invalidTopicFilter(QueueTopicFilterSource.BYTE_BUF); + } + + @ParameterizedTest + @MethodSource("invalidTopicFilterFromByteBuf") + void from_invalidTopicFilterByteBuf_returnsNull( + final @NotNull QueueTopicFilterSource source, + final @NotNull String topicFilter, + @SuppressWarnings("unused") final @NotNull String message) { + + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNull(mqtt5TopicFilter); + } + + @ParameterizedTest + @MethodSource("invalidTopicFilterFromStringAndFromTopicFilter") + void from_invalidTopicFilterString_throws( + final @NotNull QueueTopicFilterSource source, + final @NotNull String topicFilter, + final @NotNull String message) { + + final IllegalArgumentException exception = + Assertions.assertThrows(IllegalArgumentException.class, () -> from(source, topicFilter)); + assertTrue(exception.getMessage().contains(message), "IllegalArgumentException must give hint that " + message); + } + + @Test + void test_queue_topic_filter_must_be_case_sensitive() { + final String topicFilter1 = "abc"; + final String topicFilter2 = "ABC"; + final MqttQueueTopicFilterImpl mqtt5TopicFilter1 = MqttQueueTopicFilterImpl.of(topicFilter1); + final MqttQueueTopicFilterImpl mqtt5TopicFilter2 = MqttQueueTopicFilterImpl.of(topicFilter2); + assertNotNull(mqtt5TopicFilter1); + assertNotNull(mqtt5TopicFilter2); + assertNotEquals(mqtt5TopicFilter1, mqtt5TopicFilter2); + assertNotEquals(mqtt5TopicFilter1.toString(), mqtt5TopicFilter2.toString()); + + assertEquals(topicFilter1, mqtt5TopicFilter1.getTopicFilter().toString()); + assertEquals(topicFilter2, mqtt5TopicFilter2.getTopicFilter().toString()); + } + + private static @NotNull List validTopicFilter() { + final List testSpecs = new LinkedList<>(); + for (final QueueTopicFilterSource source : QueueTopicFilterSource.values()) { + // source, testDescription, topicFilter + testSpecs.add(Arguments.of(source, "topic filter with space", "ab c/def")); + testSpecs.add(Arguments.of(source, "topic filter is single space", " ")); + testSpecs.add(Arguments.of(source, "topic filter contains multi level wildcard", "abc/def/ghi/#")); + testSpecs.add(Arguments.of(source, "topic filter is multi level wildcard", "#")); + testSpecs.add(Arguments.of(source, "topic filter with single level wildcard", "abc/+/def/ghi")); + testSpecs.add(Arguments.of(source, "topic filter is single level wildcard", "+")); + testSpecs.add(Arguments.of(source, "topic filter with multiple single level wildcards", + "+/abc/+/def/+/+/ghi/+")); + testSpecs.add(Arguments.of(source, "topic filter with multi and single level wildcards", + "abc/+/def/+/ghi/#")); + } + return testSpecs; + } + + @ParameterizedTest + @MethodSource("validTopicFilter") + void from_validTopicFilter( + final @NotNull QueueTopicFilterSource source, + @SuppressWarnings("unused") final @NotNull String testDescription, + final @NotNull String topicFilter) { + + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + + final MqttQueueTopicFilterImpl mqtt5QueueTopicFilter = (MqttQueueTopicFilterImpl) mqtt5TopicFilter; + assertEquals(topicFilter, mqtt5QueueTopicFilter.getTopicFilter().toString()); + } + + @ParameterizedTest + @EnumSource(QueueTopicFilterSource.class) + void getLevels_onlyTopicLevelSeparator(final @NotNull QueueTopicFilterSource source) { + final String topicFilter = "/"; + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + final ImmutableList levels = mqtt5TopicFilter.getLevels(); + assertEquals(levels, Arrays.asList("", "")); + + final MqttQueueTopicFilterImpl mqtt5QueueTopicFilter = (MqttQueueTopicFilterImpl) mqtt5TopicFilter; + assertEquals(topicFilter, mqtt5QueueTopicFilter.getTopicFilter().toString()); + } + + @ParameterizedTest + @EnumSource(QueueTopicFilterSource.class) + void getLevels_simple(final @NotNull QueueTopicFilterSource source) { + final String topicFilter = "abc/def/ghi"; + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + final ImmutableList levels = mqtt5TopicFilter.getLevels(); + assertEquals(levels, Arrays.asList("abc", "def", "ghi")); + } + + @ParameterizedTest + @EnumSource(QueueTopicFilterSource.class) + void getLevels_multipleEmptyLevels(final @NotNull QueueTopicFilterSource source) { + final String topicFilter = "/abc//def///ghi/"; + final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + final ImmutableList levels = mqtt5TopicFilter.getLevels(); + assertEquals(levels, Arrays.asList("", "abc", "", "def", "", "", "ghi", "")); + } +} \ No newline at end of file diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImplTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImplTest.java index c4f410a34..4a47354b2 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImplTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImplTest.java @@ -227,6 +227,71 @@ void isShared_false( assertFalse(mqtt5TopicFilter instanceof MqttSharedTopicFilterImpl); } + private static @NotNull List invalidQueueTopicFilterProvider() { + final List testSpecs = new LinkedList<>(); + // Test cases where the string should NOT be recognized as a queue topic + testSpecs.add(Arguments.of("$q without slash is NOT queue topic", "$qabc/def")); + testSpecs.add(Arguments.of("just $q does not define a queue topic", "$q")); + return testSpecs; + } + + @ParameterizedTest + @MethodSource("invalidQueueTopicFilterProvider") + void isQueue_false_notRecognizedAsQueue( + final @NotNull String description, + final @NotNull String topicFilterString) { + // These should be treated as regular topic filters, not queue topics + for (final Function method : topicFilterFactoryMethods) { + final MqttTopicFilterImpl mqtt5TopicFilter = method.apply(topicFilterString); + assertNotNull(mqtt5TopicFilter, description + " with " + method); + assertFalse(mqtt5TopicFilter instanceof MqttQueueTopicFilterImpl, description + " with " + method); + } + } + + @Test + void isQueue_false_emptyTopicFilter_byteBuf_returnsNull() { + // "$q/" has no topic filter after the prefix, should return null for ByteBuf + final MqttTopicFilterImpl mqtt5TopicFilter = createFromByteBuf("$q/"); + assertNull(mqtt5TopicFilter, "$q/ without topic filter should return null from ByteBuf"); + } + + @Test + void isQueue_false_emptyTopicFilter_string_throws() { + // "$q/" has no topic filter after the prefix, should throw for String + final IllegalArgumentException exception = Assertions.assertThrows( + IllegalArgumentException.class, + () -> MqttTopicFilterImpl.of("$q/")); + assertTrue(exception.getMessage().contains("must be at least one character long"), + "IllegalArgumentException must give hint that topic filter must not be empty"); + } + + private static @NotNull List validQueueTopicFilterProvider() { + final List testSpecs = new LinkedList<>(); + for (final Function method : topicFilterFactoryMethods) { + testSpecs.add(Arguments.of(method, "simple queue topic", "$q/abc/def")); + testSpecs.add(Arguments.of(method, "queue topic with wildcard", "$q/abc/+/def")); + testSpecs.add(Arguments.of(method, "queue topic with multi-level wildcard", "$q/abc/def/#")); + testSpecs.add(Arguments.of(method, "queue topic with single char", "$q/a")); + } + return testSpecs; + } + + @ParameterizedTest + @MethodSource("validQueueTopicFilterProvider") + void isQueue_true( + final @NotNull Function topicFilterFactoryMethod, + final @NotNull String ignored, + final @NotNull String topicFilterString) { + final MqttTopicFilterImpl mqtt5TopicFilter = topicFilterFactoryMethod.apply(topicFilterString); + assertNotNull(mqtt5TopicFilter); + assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter); + + final MqttQueueTopicFilterImpl queueTopicFilter = (MqttQueueTopicFilterImpl) mqtt5TopicFilter; + // Extract the topic filter part (everything after "$q/") + final String expectedTopicFilter = topicFilterString.substring(3); // Remove "$q/" + assertEquals(expectedTopicFilter, queueTopicFilter.getTopicFilter().toString()); + } + /** * Extension of Function<T, R> used to make test results more readable. */