diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java new file mode 100644 index 000000000..9e545068b --- /dev/null +++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImpl.java @@ -0,0 +1,177 @@ +/* + * Copyright 2018-present HiveMQ and the HiveMQ Community + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hivemq.client.internal.mqtt.datatypes; + +import com.hivemq.client.annotations.Immutable; +import com.hivemq.client.internal.util.Checks; +import com.hivemq.client.mqtt.datatypes.MqttQueueTopicFilter; +import com.hivemq.client.mqtt.datatypes.MqttTopicFilterBuilder; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * @author Silvio Giebl + * @see MqttQueueTopicFilter + * @see MqttUtf8StringImpl + */ +@Immutable +public class MqttQueueTopicFilterImpl extends MqttTopicFilterImpl implements MqttQueueTopicFilter { + + private static final int QUEUE_PREFIX_LENGTH = QUEUE_PREFIX.length(); + + /** + * Checks if the given UTF-16 encoded Java string represents a Queue Topic Filter. This method does not validate + * whether it represents a valid Queue Topic Filter but only whether it starts with {@value #QUEUE_PREFIX}. + * + * @param string the given UTF-16 encoded Java string. + * @return whether the string represents a Queue Topic Filter. + */ + static boolean isQueue(final @NotNull String string) { + return string.startsWith(QUEUE_PREFIX); + } + + /** + * Checks if the given byte array with UTF-8 encoded data represents a Queue Topic Filter. This method does not + * validate whether it represents a valid Queue Topic Filter but only whether it starts with {@value + * #QUEUE_PREFIX}. + * + * @param binary the byte array with UTF-8 encoded data. + * @return whether the byte array represents a Queue Topic Filter. + */ + static boolean isQueue(final byte @NotNull [] binary) { + if (binary.length < QUEUE_PREFIX_LENGTH) { + return false; + } + for (int i = 0; i < QUEUE_PREFIX_LENGTH; i++) { + if (binary[i] != QUEUE_PREFIX.charAt(i)) { + return false; + } + } + return true; + } + + /** + * Validates and creates a Queue Topic Filter of the given UTF-16 encoded Java string. + *
+ * This method does not validate {@link MqttUtf8StringImpl#checkLength(String, String) length}, {@link + * MqttUtf8StringImpl#checkWellFormed(String, String) if well-formed} and {@link #isQueue(String) if queue}. + * + * @param string the UTF-16 encoded Java string starting with {@value #QUEUE_PREFIX}. + * @return the created Queue Topic Filter. + * @throws IllegalArgumentException if the string is not a valid Queue Topic Filter. + */ + static @NotNull MqttQueueTopicFilterImpl ofInternal(final @NotNull String string) { + if (string.length() <= QUEUE_PREFIX_LENGTH) { + throw new IllegalArgumentException("Topic filter must be at least one character long."); + } + final int wildcardFlags = validateWildcards(string, QUEUE_PREFIX_LENGTH); + return new MqttQueueTopicFilterImpl(string, wildcardFlags); + } + + /** + * Validates and creates a Queue Topic Filter of the given byte array with UTF-8 encoded data. + *
+ * This method does not validate length, {@link MqttUtf8StringImpl#isWellFormed(byte[]) if well-formed} and {@link
+ * #isQueue(byte[]) if queue}.
+ *
+ * @param binary the byte array with UTF-8 encoded data starting with {@value #QUEUE_PREFIX}.
+ * @return the created Queue Topic Filter or null if the byte array is not a valid Queue Topic
+ * Filter.
+ */
+ static @Nullable MqttQueueTopicFilterImpl ofInternal(final byte @NotNull [] binary) {
+ if (binary.length <= QUEUE_PREFIX_LENGTH) {
+ return null;
+ }
+ final int wildcardFlags = validateWildcards(binary, QUEUE_PREFIX_LENGTH);
+ if (wildcardFlags == WILDCARD_CHECK_FAILURE) {
+ return null;
+ }
+ return new MqttQueueTopicFilterImpl(binary, wildcardFlags);
+ }
+
+ /**
+ * Validates and creates a Queue Topic Filter of the given Topic Filter.
+ *
+ * @param topicFilter the Topic Filter string.
+ * @return the created Queue Topic Filter.
+ * @throws IllegalArgumentException if the Topic Filter string is not a valid Topic Filter.
+ */
+ @Contract("null -> fail")
+ public static @NotNull MqttQueueTopicFilterImpl of(final @Nullable String topicFilter) {
+ Checks.notEmpty(topicFilter, "Topic filter");
+ checkWellFormed(topicFilter, "Topic filter");
+ final String queueTopicFilter = QUEUE_PREFIX + topicFilter;
+ checkLength(queueTopicFilter, "Queue topic filter");
+ final int wildcardFlags = validateWildcards(topicFilter, 0);
+ return new MqttQueueTopicFilterImpl(queueTopicFilter, wildcardFlags);
+ }
+
+ /**
+ * Validates and creates a Queue Topic Filter of the given Topic Filter.
+ *
+ * @param topicFilter the Topic Filter.
+ * @return the created Queue Topic Filter.
+ */
+ public static @NotNull MqttQueueTopicFilterImpl of(final @NotNull MqttTopicFilterImpl topicFilter) {
+ final String queueTopicFilter = QUEUE_PREFIX + topicFilter;
+ checkLength(queueTopicFilter, "Queue topic filter");
+ return new MqttQueueTopicFilterImpl(queueTopicFilter, topicFilter.wildcardFlags);
+ }
+
+ private final int filterByteStart;
+ private final int filterCharStart;
+
+ private MqttQueueTopicFilterImpl(final byte @NotNull [] binary, final int wildcardFlags) {
+ super(binary, wildcardFlags);
+ this.filterByteStart = QUEUE_PREFIX_LENGTH;
+ this.filterCharStart = -1;
+ }
+
+ private MqttQueueTopicFilterImpl(final @NotNull String string, final int wildcardFlags) {
+ super(string, wildcardFlags);
+ this.filterByteStart = -1;
+ this.filterCharStart = QUEUE_PREFIX_LENGTH;
+ }
+
+ @Override
+ public @NotNull String getTopicFilterString() {
+ return toString().substring(getFilterCharStart());
+ }
+
+ @Override
+ public @NotNull MqttTopicFilterImpl getTopicFilter() {
+ return MqttTopicFilterImpl.of(this);
+ }
+
+ @Override
+ int getFilterByteStart() {
+ return filterByteStart;
+ }
+
+ private int getFilterCharStart() {
+ if (filterCharStart == -1) {
+ return QUEUE_PREFIX_LENGTH;
+ }
+ return filterCharStart;
+ }
+
+ @Override
+ public MqttTopicFilterBuilder.@NotNull Complete extendQueue() {
+ return new MqttTopicFilterImplBuilder.Default(this);
+ }
+}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java
index 1a37b0429..a028f9607 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicFilterImpl.java
@@ -60,6 +60,9 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic
if (MqttSharedTopicFilterImpl.isShared(string)) {
return MqttSharedTopicFilterImpl.ofInternal(string);
}
+ if (MqttQueueTopicFilterImpl.isQueue(string)) {
+ return MqttQueueTopicFilterImpl.ofInternal(string);
+ }
final int wildcardFlags = validateWildcards(string, 0);
return new MqttTopicFilterImpl(string, wildcardFlags);
}
@@ -84,6 +87,16 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic
return new MqttTopicFilterImpl(sharedTopicFilter.getTopicFilterString(), sharedTopicFilter.wildcardFlags);
}
+ /**
+ * Creates a Topic Filter of the given Queue Topic Filter.
+ *
+ * @param queueTopicFilter the Queue Topic Filter.
+ * @return the created Topic Filter.
+ */
+ public static @NotNull MqttTopicFilterImpl of(final @NotNull MqttQueueTopicFilterImpl queueTopicFilter) {
+ return new MqttTopicFilterImpl(queueTopicFilter.getTopicFilterString(), queueTopicFilter.wildcardFlags);
+ }
+
/**
* Validates and creates a Topic Filter of the given byte array with UTF-8 encoded data.
*
@@ -97,6 +110,9 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic
if (MqttSharedTopicFilterImpl.isShared(binary)) {
return MqttSharedTopicFilterImpl.ofInternal(binary);
}
+ if (MqttQueueTopicFilterImpl.isQueue(binary)) {
+ return MqttQueueTopicFilterImpl.ofInternal(binary);
+ }
final int wildcardFlags = validateWildcards(binary, 0);
if (wildcardFlags == WILDCARD_CHECK_FAILURE) {
return null;
diff --git a/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java b/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java
new file mode 100644
index 000000000..129240d60
--- /dev/null
+++ b/src/main/java/com/hivemq/client/mqtt/datatypes/MqttQueueTopicFilter.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018-present HiveMQ and the HiveMQ Community
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hivemq.client.mqtt.datatypes;
+
+import com.hivemq.client.annotations.DoNotImplement;
+import com.hivemq.client.internal.mqtt.datatypes.MqttQueueTopicFilterImpl;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * MQTT Queue Topic Filter according to the MQTT specification.
+ *
+ * A Queue Topic Filter consists of the Queue prefix ({@value #QUEUE_PREFIX}) and a Topic Filter. + *
+ * The Topic Filter has the same restrictions as a {@link MqttTopicFilter Topic Filter}.
+ *
+ * @author Jean-François Côté
+ * @since 1.0
+ */
+@DoNotImplement
+public interface MqttQueueTopicFilter extends MqttTopicFilter {
+
+ /**
+ * The prefix of a Queue Topic Filter.
+ */
+ @NotNull String QUEUE_PREFIX = "$q" + MqttTopic.TOPIC_LEVEL_SEPARATOR;
+
+ /**
+ * Validates and creates a Queue Topic Filter of the given Topic Filter.
+ *
+ * @param topicFilter the string representation of the Topic Filter.
+ * @return the created Queue Topic Filter.
+ * @throws IllegalArgumentException if the Topic Filter string is
+ * not a valid Topic Filter.
+ */
+ static @NotNull MqttQueueTopicFilter of(final @NotNull String topicFilter) {
+ return MqttQueueTopicFilterImpl.of(topicFilter);
+ }
+
+ /**
+ * @return the Topic Filter of this Queue Topic Filter.
+ */
+ @NotNull MqttTopicFilter getTopicFilter();
+
+ /**
+ * Creates a builder for extending this Queue Topic Filter.
+ *
+ * @return the created builder.
+ */
+ MqttTopicFilterBuilder.@NotNull Complete extendQueue();
+}
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java
new file mode 100644
index 000000000..181866ce6
--- /dev/null
+++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttQueueTopicFilterImplTest.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2018-present HiveMQ and the HiveMQ Community
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import com.hivemq.client.internal.util.collections.ImmutableList;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * @author Jean-François Côté
+ */
+class MqttQueueTopicFilterImplTest {
+
+ enum QueueTopicFilterSource {
+ STRING,
+ BYTE_BUF,
+ TOPIC_FILTER
+ }
+
+ private @Nullable MqttTopicFilterImpl from(
+ final @NotNull QueueTopicFilterSource source,
+ final @NotNull String topicFilter) {
+
+ if (source == QueueTopicFilterSource.TOPIC_FILTER) {
+ return MqttQueueTopicFilterImpl.of(topicFilter);
+ }
+
+ final String queueTopicFilter = "$q/" + topicFilter;
+
+ switch (source) {
+ case BYTE_BUF:
+ final ByteBuf byteBuf = Unpooled.buffer();
+ final byte[] binary = queueTopicFilter.getBytes(StandardCharsets.UTF_8);
+ byteBuf.writeShort(binary.length);
+ byteBuf.writeBytes(binary);
+ final MqttTopicFilterImpl mqtt5TopicFilter = MqttTopicFilterImpl.decode(byteBuf);
+ byteBuf.release();
+ return mqtt5TopicFilter;
+ case STRING:
+ return MqttTopicFilterImpl.of(queueTopicFilter);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private @Nullable MqttTopicFilterImpl fromFullString(
+ final @NotNull QueueTopicFilterSource source, final @NotNull String fullQueueTopicFilter) {
+
+ switch (source) {
+ case BYTE_BUF:
+ final ByteBuf byteBuf = Unpooled.buffer();
+ final byte[] binary = fullQueueTopicFilter.getBytes(StandardCharsets.UTF_8);
+ byteBuf.writeShort(binary.length);
+ byteBuf.writeBytes(binary);
+ final MqttTopicFilterImpl mqtt5TopicFilter = MqttTopicFilterImpl.decode(byteBuf);
+ byteBuf.release();
+ return mqtt5TopicFilter;
+ case STRING:
+ return MqttTopicFilterImpl.of(fullQueueTopicFilter);
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(QueueTopicFilterSource.class)
+ void from_simple(final @NotNull QueueTopicFilterSource source) {
+ final String topicFilter = "abc/def";
+ final MqttTopicFilterImpl mqtt5TopicFilter = from(source, topicFilter);
+ assertNotNull(mqtt5TopicFilter);
+ assertInstanceOf(MqttQueueTopicFilterImpl.class, mqtt5TopicFilter);
+
+ final MqttQueueTopicFilterImpl mqtt5QueueTopicFilter = (MqttQueueTopicFilterImpl) mqtt5TopicFilter;
+ assertEquals(topicFilter, mqtt5QueueTopicFilter.getTopicFilter().toString());
+ }
+
+ private static @NotNull List