diff --git a/pom.xml b/pom.xml
index 58b294b..67e88d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,7 @@
1.0.0
csv-file-adapter
+ redis-adapter
diff --git a/redis-adapter/pom.xml b/redis-adapter/pom.xml
new file mode 100644
index 0000000..2d97f2c
--- /dev/null
+++ b/redis-adapter/pom.xml
@@ -0,0 +1,26 @@
+
+
+
+ gateway-examples
+ com.diffusiondata.gateway.adapter
+ 1.0.0
+
+ 4.0.0
+
+ redis-adapter
+ pom
+
+ redis-sink
+ redis-source
+
+
+
+
+ io.lettuce
+ lettuce-core
+ 6.2.1.RELEASE
+
+
+
\ No newline at end of file
diff --git a/redis-adapter/redis-sink/README.md b/redis-adapter/redis-sink/README.md
new file mode 100644
index 0000000..7704994
--- /dev/null
+++ b/redis-adapter/redis-sink/README.md
@@ -0,0 +1,29 @@
+# Redis Sink Adapter
+
+## Introduction
+This adapter can be used to get updates from Diffusion JSON topics and write the update to a Redis instance. It supports only one type of service.
+
+### REDIS_SINK
+This sink service supports getting Diffusion JSON topic updates and publishing them to a Redis instance specified in the configuration. This service will correctly function only for Diffusion topics of JSON topic type. If any other type of topic selector is used in its configuration, or the topic selector matches any non JSON topic type, when an update is received for this topic, Payload convertor exception will be thrown. This service type requires the following configuration to be declared in each defined service in the configuration file:
+
+ "application": {
+ "redisUrl": "redisUrl"
+ }
+
+Below is an example of an overall configuration of a service of type `REDIS_SINK`:
+
+ {
+ "serviceName": "dataSelectorSink",
+ "serviceType": "REDIS_SINK",
+ "description": "Subscribes to JSON diffusion topic and writes its content to Redis instance",
+ "config": {
+ "framework": {
+ "diffusionTopicSelector": "?data//"
+ },
+ "application": {
+ "redisUrl": "redis://password@localhost:6379/"
+ }
+ }
+ }
+
+With this configuration, this service will subscribe to all topics that match the selectors passed in `diffusionTopicSelectors` field. For each topic path update, a Redis entry will be created with tthe topic path as the key and the content, in the form a JSON string, as the value.
\ No newline at end of file
diff --git a/redis-adapter/redis-sink/pom.xml b/redis-adapter/redis-sink/pom.xml
new file mode 100644
index 0000000..f258da9
--- /dev/null
+++ b/redis-adapter/redis-sink/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ redis-adapter
+ com.diffusiondata.gateway.adapter
+ 1.0.0
+
+ 4.0.0
+
+ redis-sink
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
+
+ com.diffusiondata.gateway.adapter.redis.sink.Runner
+
+
+ redis-sink-${project.version}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkApplication.java b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkApplication.java
new file mode 100644
index 0000000..5207b08
--- /dev/null
+++ b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkApplication.java
@@ -0,0 +1,67 @@
+package com.diffusiondata.gateway.adapter.redis.sink;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newApplicationDetailsBuilder;
+
+import com.diffusiondata.gateway.framework.GatewayApplication;
+import com.diffusiondata.gateway.framework.ServiceDefinition;
+import com.diffusiondata.gateway.framework.ServiceMode;
+import com.diffusiondata.gateway.framework.StateHandler;
+import com.diffusiondata.gateway.framework.exceptions.ApplicationConfigurationException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Main Gateway Application implementation for Redis sink application.
+ *
+ * @author Diffusion Data
+ */
+final public class RedisSinkApplication implements GatewayApplication {
+
+ @Override public ApplicationDetails getApplicationDetails()
+ throws ApplicationConfigurationException {
+ return newApplicationDetailsBuilder()
+ .addServiceType(
+ "REDIS_SINK",
+ ServiceMode.SINK,
+ "A sink service which writes received string updates from " +
+ "configured Diffusion topics to a Redis instance",
+ "{\n" +
+ " \"$schema\": \"http://json-schema" +
+ ".org/draft-07/schema#\",\n" +
+ " \"$ref\": \"#/definitions/application\",\n" +
+ " \"definitions\": {\n" +
+ " \"application\": {\n" +
+ " \"type\": \"object\",\n" +
+ " \"additionalProperties\": false,\n" +
+ " \"properties\": {\n" +
+ " \"redisUrl\": {\n" +
+ " \"type\": \"string\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"required\": [\n" +
+ " \"redisUrl\"\n" +
+ " ]\n" +
+ " }\n" +
+ " }\n" +
+ "}")
+ .build("REDIS_SINK", 1);
+ }
+
+ @Override
+ public RedisSinkHandler addSink(
+ ServiceDefinition serviceDefinition,
+ StateHandler stateHandler) {
+
+ final Map parameters =
+ serviceDefinition.getParameters();
+
+ final String redisUrl = (String) parameters.get("redisUrl");
+
+ return new RedisSinkHandler(redisUrl);
+ }
+
+ @Override public CompletableFuture> stop() {
+ return null;
+ }
+}
diff --git a/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkHandler.java b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkHandler.java
new file mode 100644
index 0000000..d401f5f
--- /dev/null
+++ b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/RedisSinkHandler.java
@@ -0,0 +1,56 @@
+package com.diffusiondata.gateway.adapter.redis.sink;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newSinkServicePropertiesBuilder;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import com.diffusiondata.gateway.framework.SinkHandler;
+import com.diffusiondata.gateway.framework.TopicType;
+import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+import net.jcip.annotations.Immutable;
+
+/**
+ * Redis sink handler to write received string update into a Redis instance.
+ *
+ * @author Diffusion Data
+ */
+@Immutable
+final class RedisSinkHandler implements SinkHandler {
+
+ private final RedisAsyncCommands redisClient;
+
+ public RedisSinkHandler(String redisUrl) {
+ redisClient = RedisClient
+ .create(redisUrl)
+ .connect()
+ .async();
+ }
+
+ @Override
+ public SinkServiceProperties getSinkServiceProperties()
+ throws InvalidConfigurationException {
+ return newSinkServicePropertiesBuilder()
+ .topicType(TopicType.JSON)
+ .payloadConvertorName("$Default_JSON")
+ .build();
+ }
+
+ @Override
+ public CompletableFuture> update(String diffusionTopic, String value) {
+ return redisClient.set(diffusionTopic, value).toCompletableFuture();
+ }
+
+ @Override
+ public CompletableFuture> pause(PauseReason pauseReason) {
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture> resume(ResumeReason resumeReason) {
+ return completedFuture(null);
+ }
+}
diff --git a/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/Runner.java b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/Runner.java
new file mode 100644
index 0000000..e36c77f
--- /dev/null
+++ b/redis-adapter/redis-sink/src/main/java/com/diffusiondata/gateway/adapter/redis/sink/Runner.java
@@ -0,0 +1,20 @@
+package com.diffusiondata.gateway.adapter.redis.sink;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.initialize;
+
+/**
+ * Main Runner class.
+ *
+ * @author Diffusion Data
+ */
+public class Runner {
+ public static void main(String[] args) {
+ final RedisSinkApplication redisSinkApplication =
+ new RedisSinkApplication();
+
+ initialize(redisSinkApplication)
+ .connect();
+
+
+ }
+}
diff --git a/redis-adapter/redis-sink/src/main/resources/configuration.json b/redis-adapter/redis-sink/src/main/resources/configuration.json
new file mode 100644
index 0000000..3dd3a6d
--- /dev/null
+++ b/redis-adapter/redis-sink/src/main/resources/configuration.json
@@ -0,0 +1,26 @@
+{
+ "id": "redis-adapter-1",
+ "framework-version": 1,
+ "application-version": 1,
+ "diffusion": {
+ "url": "ws://localhost:8080",
+ "principal": "admin",
+ "password": "password",
+ "reconnectIntervalMs": 5000
+ },
+ "services": [
+ {
+ "serviceName": "dataSelectorSink",
+ "serviceType": "REDIS_SINK",
+ "description": "Subscribes to 'data' diffusion topic selector and stores its content to a Redis instance",
+ "config": {
+ "framework": {
+ "diffusionTopicSelector": "?data//"
+ },
+ "application": {
+ "redisUrl": "redis://password@localhost:6379/"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/redis-adapter/redis-source/README.md b/redis-adapter/redis-source/README.md
new file mode 100644
index 0000000..54e0da4
--- /dev/null
+++ b/redis-adapter/redis-source/README.md
@@ -0,0 +1,27 @@
+# Redis Source Adapter
+
+## Introduction
+This adapter can be used to read from a Redis instance and publish its contents to a Diffusion topic. It supports two types of service.
+
+- POLLING_REDIS_SOURCE
+
+ A service of this type is called by the framework in the interval set in the configuration file or at a default interval of 30 secs, if not specified in the configuration. This service type requires the following configuration to be declared in each defined service in the configuration file:
+
+ "application":
+ {
+ "redisUrl": ,
+ "diffusionTopicName":
+ }
+
+-
+STREAMING_REDIS_SOURCE
+
+A service of this type is used to subscribe to a Redis channel. When a message is received, the application will publish the value to a Diffusion topic using the channel key as the topic path. This service type requires the following configuration to be declared for each defined service in the configuration file:
+
+ "application":
+ {
+ "redisUrl": ,
+ "diffusionTopicName":
+ }
+
+
diff --git a/redis-adapter/redis-source/pom.xml b/redis-adapter/redis-source/pom.xml
new file mode 100644
index 0000000..9cdb5d6
--- /dev/null
+++ b/redis-adapter/redis-source/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ redis-adapter
+ com.diffusiondata.gateway.adapter
+ 1.0.0
+
+ 4.0.0
+
+ redis-source
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
+
+ com.diffusiondata.gateway.adapter.redis.source.Runner
+
+
+ redis-source-${project.version}
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceApplication.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceApplication.java
new file mode 100644
index 0000000..535a9a3
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceApplication.java
@@ -0,0 +1,90 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newApplicationDetailsBuilder;
+
+import com.diffusiondata.gateway.framework.GatewayApplication;
+import com.diffusiondata.gateway.framework.PollingSourceHandler;
+import com.diffusiondata.gateway.framework.Publisher;
+import com.diffusiondata.gateway.framework.ServiceDefinition;
+import com.diffusiondata.gateway.framework.ServiceMode;
+import com.diffusiondata.gateway.framework.StateHandler;
+import com.diffusiondata.gateway.framework.StreamingSourceHandler;
+import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Main Gateway Application implementation for Redis source application.
+ *
+ * @author Diffsion Data
+ */
+public class RedisSourceApplication implements GatewayApplication {
+
+ static final String POLLING_REDIS_SOURCE = "POLLING_REDIS_SOURCE";
+ static final String STREAMING_REDIS_SOURCE = "STREAMING_REDIS_SOURCE";
+ static final String APPLICATION_TYPE = "REDIS_SOURCE";
+
+ private final SourceConfigValidator sourceConfigValidator;
+
+ RedisSourceApplication(SourceConfigValidator sourceConfigValidator) {
+ this.sourceConfigValidator = sourceConfigValidator;
+ }
+
+ @Override
+ public ApplicationDetails getApplicationDetails() {
+ return newApplicationDetailsBuilder()
+ .addServiceType(POLLING_REDIS_SOURCE,
+ ServiceMode.POLLING_SOURCE,
+ "A polling source service which frequently polls the " +
+ "configured Redis instance for any updates and publishes to " +
+ "Diffusion server",
+ null)
+ .addServiceType(STREAMING_REDIS_SOURCE,
+ ServiceMode.STREAMING_SOURCE,
+ "A streaming source which subscribes to a Redis instance and " +
+ "publishes contents to Diffusion server.",
+ null)
+ .build(APPLICATION_TYPE, 1);
+ }
+
+ @Override
+ public StreamingSourceHandler addStreamingSource(
+ ServiceDefinition serviceDefinition,
+ Publisher publisher,
+ StateHandler stateHandler) throws InvalidConfigurationException {
+
+ final Map parameters =
+ serviceDefinition.getParameters();
+
+ final SourceConfig sourceConfig =
+ sourceConfigValidator.validateAndGet(parameters);
+
+ return new RedisSourceStreamingHandler(
+ sourceConfig.getRedisUrl(),
+ sourceConfig.getDiffusionTopicName(),
+ publisher);
+ }
+
+ @Override
+ public PollingSourceHandler addPollingSource(
+ ServiceDefinition serviceDefinition,
+ Publisher publisher,
+ StateHandler stateHandler) throws InvalidConfigurationException {
+
+ final Map parameters =
+ serviceDefinition.getParameters();
+
+ final SourceConfig sourceConfig =
+ sourceConfigValidator.validateAndGet(parameters);
+
+ return new RedisSourcePollingHandler(
+ sourceConfig.getRedisUrl(),
+ sourceConfig.getDiffusionTopicName(),
+ publisher);
+ }
+
+ @Override public CompletableFuture> stop() {
+ return CompletableFuture.completedFuture(null);
+ }
+}
\ No newline at end of file
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourcePollingHandler.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourcePollingHandler.java
new file mode 100644
index 0000000..2bd22ec
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourcePollingHandler.java
@@ -0,0 +1,79 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newSourceServicePropertiesBuilder;
+
+import com.diffusiondata.gateway.framework.PollingSourceHandler;
+import com.diffusiondata.gateway.framework.Publisher;
+import com.diffusiondata.gateway.framework.TopicType;
+import com.diffusiondata.gateway.framework.UpdateMode;
+import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;
+import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+
+/**
+ * Polling source handler implementation for Redis source.
+ *
+ * @author Diffusion Data
+ */
+public class RedisSourcePollingHandler implements PollingSourceHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RedisSourcePollingHandler.class);
+
+ private final RedisAsyncCommands redisClient;
+ private final Publisher publisher;
+ private final String diffusionTopicName;
+
+ RedisSourcePollingHandler(
+ String redisUrl,
+ String diffusionTopicName,
+ Publisher publisher) {
+ this.diffusionTopicName = diffusionTopicName;
+ this.publisher = publisher;
+ redisClient = RedisClient
+ .create(redisUrl)
+ .connect()
+ .async();
+ }
+
+ @Override public CompletableFuture> poll() {
+ LOG.debug("Polled");
+ CompletableFuture> pollCf = new CompletableFuture<>();
+ try {
+ final String value = redisClient.get(diffusionTopicName).get();
+ pollCf = publisher.publish(diffusionTopicName, value);
+ }
+ catch (InterruptedException | ExecutionException |
+ PayloadConversionException ex) {
+ LOG.error("polling {} failed", diffusionTopicName, ex);
+ pollCf.completeExceptionally(ex);
+ }
+ return pollCf;
+ }
+
+ @Override public CompletableFuture> pause(PauseReason pauseReason) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override public CompletableFuture> resume(ResumeReason resumeReason) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public SourceServiceProperties getSourceServiceProperties() throws
+ InvalidConfigurationException {
+ return
+ newSourceServicePropertiesBuilder()
+ .topicType(TopicType.STRING)
+ .updateMode(UpdateMode.STREAMING)
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceStreamingHandler.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceStreamingHandler.java
new file mode 100644
index 0000000..5e72116
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/RedisSourceStreamingHandler.java
@@ -0,0 +1,104 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.newSourceServicePropertiesBuilder;
+
+import com.diffusiondata.gateway.framework.Publisher;
+import com.diffusiondata.gateway.framework.StreamingSourceHandler;
+import com.diffusiondata.gateway.framework.TopicType;
+import com.diffusiondata.gateway.framework.UpdateMode;
+import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;
+import com.diffusiondata.gateway.framework.exceptions.PayloadConversionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.pubsub.RedisPubSubAdapter;
+import io.lettuce.core.pubsub.RedisPubSubListener;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
+
+/**
+ * Implementation of {@link StreamingSourceHandler} which listens to Redis
+ * changes and publishes values to Diffusion server.
+ *
+ * @author Diffusion Data
+ */
+public class RedisSourceStreamingHandler implements StreamingSourceHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RedisSourceStreamingHandler.class);
+
+ private final RedisPubSubAsyncCommands redisClient;
+ private final Publisher publisher;
+ private final String diffusionTopicName;
+
+ RedisSourceStreamingHandler(
+ String redisUrl,
+ String diffusionTopicName,
+ Publisher publisher
+ ) {
+ this.diffusionTopicName = diffusionTopicName + "*";
+ this.publisher = publisher;
+
+ StatefulRedisPubSubConnection connection = RedisClient
+ .create(redisUrl)
+ .connectPubSub();
+
+ connection.addListener(createListener());
+ redisClient = connection.async();
+ }
+
+ @Override
+ public SourceServiceProperties getSourceServiceProperties() throws
+ InvalidConfigurationException {
+ return
+ newSourceServicePropertiesBuilder()
+ .topicType(TopicType.STRING)
+ .updateMode(UpdateMode.STREAMING)
+ .build();
+ }
+
+ @Override
+ public CompletableFuture> start() {
+ return redisClient.psubscribe(diffusionTopicName).toCompletableFuture();
+ }
+
+ @Override
+ public CompletableFuture> stop() {
+ return redisClient.punsubscribe(diffusionTopicName).toCompletableFuture();
+ }
+
+ @Override public CompletableFuture> pause(PauseReason pauseReason) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override public CompletableFuture> resume(ResumeReason resumeReason) {
+ return start();
+ }
+
+ private void update(String path, String value) {
+ LOG.debug("Streaming update to server from {}", path);
+ try {
+ publisher.publish(path, value).get();
+ }
+ catch (InterruptedException | ExecutionException |
+ PayloadConversionException ex) {
+ LOG.error("updating {} failed", path, ex);
+ }
+ }
+
+ private RedisPubSubListener createListener() {
+ return new RedisPubSubAdapter<>() {
+ @Override public void message(String pattern, String channel, String message) {
+ LOG.debug("Message received from a pattern subscription {} {}", pattern, channel);
+ update(channel, message);
+ }
+ };
+ }
+
+
+}
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/Runner.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/Runner.java
new file mode 100644
index 0000000..1593940
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/Runner.java
@@ -0,0 +1,20 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+import static com.diffusiondata.gateway.framework.DiffusionGatewayFramework.initialize;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Main Runner class.
+ *
+ * @author Diffusion Data
+ */
+public class Runner {
+ public static void main(String[] args) {
+ final RedisSourceApplication redisSourceApplication =
+ new RedisSourceApplication(new SourceConfigValidator(new ObjectMapper()));
+
+ initialize(redisSourceApplication)
+ .connect();
+ }
+}
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfig.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfig.java
new file mode 100644
index 0000000..998a357
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfig.java
@@ -0,0 +1,20 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+/**
+ * Sample configuration required for source application.
+ *
+ * @author Diffusion Data
+ */
+public class SourceConfig {
+ private String redisUrl;
+
+ private String diffusionTopicName;
+
+ public String getRedisUrl() {
+ return redisUrl;
+ }
+
+ public String getDiffusionTopicName() {
+ return diffusionTopicName;
+ }
+}
diff --git a/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfigValidator.java b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfigValidator.java
new file mode 100644
index 0000000..b584daa
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/java/com/diffusiondata/gateway/adapter/redis/source/SourceConfigValidator.java
@@ -0,0 +1,39 @@
+package com.diffusiondata.gateway.adapter.redis.source;
+
+import com.diffusiondata.gateway.framework.exceptions.InvalidConfigurationException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+
+/**
+ * Validator for {@link SourceConfig} instance.
+ *
+ * @author Diffusion Data
+ */
+public class SourceConfigValidator {
+
+ private final ObjectMapper objectMapper;
+
+ SourceConfigValidator(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+ SourceConfig validateAndGet(Map parameters) throws
+ InvalidConfigurationException {
+ final SourceConfig sourceConfig =
+ objectMapper.convertValue(parameters, SourceConfig.class);
+
+ final String redisUrl = sourceConfig.getRedisUrl();
+ final String diffusionTopicName = sourceConfig.getDiffusionTopicName();
+
+ if (redisUrl == null ||
+ redisUrl.isEmpty() ||
+ diffusionTopicName == null ||
+ diffusionTopicName.isEmpty()) {
+
+ throw new InvalidConfigurationException(
+ "Invalid config value");
+ }
+ return sourceConfig;
+ }
+}
diff --git a/redis-adapter/redis-source/src/main/resources/configuration.json b/redis-adapter/redis-source/src/main/resources/configuration.json
new file mode 100644
index 0000000..8b9191c
--- /dev/null
+++ b/redis-adapter/redis-source/src/main/resources/configuration.json
@@ -0,0 +1,39 @@
+{
+ "id": "redis-source-adapter-1",
+ "framework-version": 1,
+ "application-version": 1,
+ "diffusion": {
+ "url": "ws://localhost:8080",
+ "principal": "admin",
+ "password": "password",
+ "reconnectIntervalMs": 5000
+ },
+ "services": [
+ {
+ "serviceName": "redisPollingSource",
+ "serviceType": "POLLING_REDIS_SOURCE",
+ "description": "Polls data from a Redis instance and publishes to server",
+ "config": {
+ "framework": {
+ "pollIntervalMs": 2000,
+ "pollTimeoutMs": 4000
+ },
+ "application": {
+ "redisUrl": "redis://password@localhost:6379/",
+ "diffusionTopicName": "polling/redis"
+ }
+ }
+ },
+ {
+ "serviceName": "redisStreamingSource",
+ "serviceType": "STREAMING_REDIS_SOURCE",
+ "description": "Streaming changes on a Redis instance and publishes to server",
+ "config": {
+ "application": {
+ "redisUrl": "redis://password@localhost:6379/",
+ "diffusionTopicName": "streaming/redis"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file