diff --git a/build.gradle b/build.gradle index eefe46b6ea..0c68d18ca1 100644 --- a/build.gradle +++ b/build.gradle @@ -52,6 +52,7 @@ ext { avroVersion = '1.12.1' awaitilityVersion = '4.3.0' camelVersion = '4.16.0' + cloudEventsVersion = '4.0.1' commonsDbcp2Version = '2.13.0' commonsIoVersion = '2.21.0' commonsNetVersion = '3.12.0' @@ -477,6 +478,16 @@ project('spring-integration-cassandra') { } } +project('spring-integration-cloudevents') { + description = 'Spring Integration CloudEvents Support' + + dependencies { + api "io.cloudevents:cloudevents-core:$cloudEventsVersion" + testImplementation "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion" + testImplementation "io.cloudevents:cloudevents-xml:$cloudEventsVersion" + } +} + project('spring-integration-core') { description = 'Spring Integration Core' diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java new file mode 100644 index 0000000000..130c148285 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/package-info.java @@ -0,0 +1,6 @@ +/** + * Provides core CloudEvents support classes and components + */ + +@org.jspecify.annotations.NullMarked +package org.springframework.integration.cloudevents; diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformer.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformer.java new file mode 100644 index 0000000000..60e4ac07d4 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformer.java @@ -0,0 +1,426 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.CloudEventExtension; +import io.cloudevents.CloudEventExtensions; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.CloudEventUtils; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import io.cloudevents.rw.CloudEventWriterFactory; +import org.jspecify.annotations.Nullable; + +import org.springframework.context.ApplicationContext; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.expression.ValueExpression; +import org.springframework.integration.support.utils.PatternMatchUtils; +import org.springframework.integration.transformer.AbstractTransformer; +import org.springframework.integration.transformer.MessageTransformationException; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.StringUtils; + +/** + * Transform messages to CloudEvent format with attribute and extension mapping. + * + * @author Glenn Renfro + * + * @since 7.1 + */ +public class ToCloudEventTransformer extends AbstractTransformer { + + private static final String DEFAULT_PREFIX = "ce-"; + + private final String[] extensionPatterns; + + private final EventFormatProvider eventFormatProvider = EventFormatProvider.getInstance(); + + private String cloudEventPrefix = DEFAULT_PREFIX; + + private boolean failOnNoFormat; + + private Expression eventIdExpression = new FunctionExpression>( + msg -> Objects.requireNonNull(msg.getHeaders().getId()).toString()); + + @SuppressWarnings("NullAway.Init") + private Expression sourceExpression; + + private Expression typeExpression = new LiteralExpression("spring.message"); + + private @Nullable Expression dataSchemaExpression; + + private @Nullable Expression subjectExpression; + + private @Nullable Expression eventFormatContentTypeExpression; + + private @Nullable EventFormat eventFormat; + + @SuppressWarnings("NullAway.Init") + private EvaluationContext evaluationContext; + + /** + * Create a ToCloudEventTransformer. + * @param eventFormat the event format to use for serialization + * @param extensionPatterns patterns to evaluate whether message headers should be added as extensions + * to the CloudEvent + */ + public ToCloudEventTransformer(@Nullable EventFormat eventFormat, String... extensionPatterns) { + this.eventFormat = eventFormat; + this.extensionPatterns = Arrays.copyOf(extensionPatterns, extensionPatterns.length); + } + + /** + * Create a ToCloudEventTransformer with no extensionPatterns. + * @param eventFormat the event format to use for serialization + */ + public ToCloudEventTransformer(@Nullable EventFormat eventFormat) { + this(eventFormat, new String[0]); + } + + /** + * Create a ToCloudEventTransformer with no extensionPatterns and no {@link EventFormat}. + */ + public ToCloudEventTransformer() { + this(null); + } + + @Override + protected void onInit() { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + if (this.sourceExpression == null) { + ApplicationContext applicationContext = getApplicationContext(); + String appName = applicationContext.getEnvironment().getProperty("spring.application.name"); + if (!StringUtils.hasText(appName)) { + logger.warn("'spring.application.name' is not set. " + + "CloudEvent source URIs will use 'null' as the application name. " + + "Consider setting 'spring.application.name'"); + } + this.sourceExpression = new ValueExpression<>(URI.create("/spring/" + appName + "." + getBeanName())); + } + } + + /** + * Set the {@link Expression} to create CloudEvent ids. + * Default extracts the id from the {@link MessageHeaders} of the message. + * @param eventIdExpression the expression to create the id for each CloudEvent + */ + public void setEventIdExpression(Expression eventIdExpression) { + this.eventIdExpression = eventIdExpression; + } + + /** + * Set the {@link Expression} to create CloudEvent source. + * Default is {@code "/spring/" + appName + "." + getBeanName())}. + * @param sourceExpression the expression to create the source for each CloudEvent + */ + public void setSourceExpression(Expression sourceExpression) { + this.sourceExpression = sourceExpression; + } + + /** + * Set the {@link Expression} to extract the type for the CloudEvent. + * Default is "spring.message". + * @param typeExpression the expression to create the type for each CloudEvent + */ + public void setTypeExpression(Expression typeExpression) { + this.typeExpression = typeExpression; + } + + /** + * Set the {@link Expression} to create the dataSchema for the CloudEvent. + * @param dataSchemaExpression the expression to create the dataSchema for each CloudEvent + */ + public void setDataSchemaExpression(Expression dataSchemaExpression) { + this.dataSchemaExpression = dataSchemaExpression; + } + + /** + * Set the {@link Expression} to create the subject for the CloudEvent. + * @param subjectExpression the expression to create the subject for each CloudEvent + */ + public void setSubjectExpression(Expression subjectExpression) { + this.subjectExpression = subjectExpression; + } + + /** + * Set the {@link Expression} to create for the content type to use + * when {@link EventFormatProvider} is to be used to determine + * {@link EventFormat}. + * @param eventFormatContentTypeExpression the expression to create + * content type for the {@link EventFormatProvider} + */ + public void setEventFormatContentTypeExpression(Expression eventFormatContentTypeExpression) { + this.eventFormatContentTypeExpression = eventFormatContentTypeExpression; + } + + /** + * Set to {@code true} to fail if no {@link EventFormat} is found for message content type. + * When {@code false} and no {@link EventFormat} is found, then a {@link CloudEvent}' body is + * set as an output message's payload, and its attributes are set into headers. + * @param failOnNoFormat true to disable format serialization + */ + public void setFailOnNoFormat(boolean failOnNoFormat) { + this.failOnNoFormat = failOnNoFormat; + } + + /** + * Return whether the transformer will fail when no {@link EventFormat} + * is available for the content type. + * @return {@code true} if transformation should fail when no suitable + * {@link EventFormat} is found; {@code false} otherwise + */ + public boolean isFailOnNoFormat() { + return this.failOnNoFormat; + } + + /** + * Set the prefix for CloudEvent headers in binary content mode. + * @param cloudEventPrefix the prefix to use for CloudEvent headers + */ + public void setCloudEventPrefix(String cloudEventPrefix) { + this.cloudEventPrefix = cloudEventPrefix; + } + + /** + * Return the prefix used for CloudEvent headers in binary content mode. + * @return the CloudEvent header prefix + */ + public String getCloudEventPrefix() { + return this.cloudEventPrefix; + } + + /** + * Transform the input message into a CloudEvent message. + * @param message the input Spring Integration message to transform + * @return CloudEvent message in the specified format + * @throws RuntimeException if serialization fails + */ + @Override + protected Object doTransform(Message message) { + Object payload = message.getPayload(); + Assert.isInstanceOf(byte[].class, payload, "Message payload must be byte[]"); + + String id = this.eventIdExpression.getValue(this.evaluationContext, message, String.class); + URI source = this.sourceExpression.getValue(this.evaluationContext, message, URI.class); + String type = this.typeExpression.getValue(this.evaluationContext, message, String.class); + MessageHeaders headers = message.getHeaders(); + MimeType mimeType = StaticMessageHeaderAccessor.getContentType(message); + String contentType; + if (mimeType == null) { + contentType = "application/octet-stream"; + } + else { + contentType = mimeType.toString(); + } + + Map cloudEventExtensions = getCloudEventExtensions(headers); + + ToCloudEventTransformerExtension extensions = + new ToCloudEventTransformerExtension(cloudEventExtensions); + + CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1() + .withId(id) + .withSource(source) + .withType(type) + .withTime(OffsetDateTime.now()) + .withDataContentType(contentType); + + if (this.subjectExpression != null) { + cloudEventBuilder.withSubject( + this.subjectExpression.getValue(this.evaluationContext, message, String.class)); + } + if (this.dataSchemaExpression != null) { + cloudEventBuilder.withDataSchema( + this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class)); + } + + CloudEvent cloudEvent = cloudEventBuilder.withData((byte[]) payload) + .withExtension(extensions) + .build(); + + if (this.eventFormatContentTypeExpression != null) { + this.eventFormat = this.eventFormatProvider.resolveFormat( + this.eventFormatContentTypeExpression.getValue(this.evaluationContext, message, String.class)); + } + + if (this.eventFormat == null && this.failOnNoFormat) { + throw new MessageTransformationException("No EventFormat found for '" + contentType + "'"); + } + + if (this.eventFormat != null) { + return MessageBuilder.withPayload(this.eventFormat.serialize(cloudEvent)) + .copyHeaders(headers) + .setHeader(MessageHeaders.CONTENT_TYPE, this.eventFormat.serializedContentType()) + .build(); + } + HashMap messageMap = new HashMap<>(headers); + return CloudEventUtils.toReader(cloudEvent) + .read(new MessageBuilderMessageWriter(this.cloudEventPrefix, messageMap)); + } + + @Override + public String getComponentType() { + return "ce:to-cloudevent-transformer"; + } + + /** + * Extract CloudEvent extensions from message headers based on pattern matching. + * @param headers the message headers to extract extensions from + * @return a map of header key-value pairs that match the extension patterns; + * an empty map if no headers match the patterns + */ + private Map getCloudEventExtensions(MessageHeaders headers) { + Map cloudEventExtensions = new HashMap<>(); + for (Map.Entry header : headers.entrySet()) { + String headerKey = header.getKey(); + Boolean patternResult = PatternMatchUtils.smartMatch(headerKey, this.extensionPatterns); + if (Boolean.TRUE.equals(patternResult)) { + cloudEventExtensions.put(headerKey, header.getValue()); + } + } + return cloudEventExtensions; + } + + /** + * Custom CloudEvent extension implementation that wraps a map of headers + * as CloudEvent extension attributes. + */ + private static class ToCloudEventTransformerExtension implements CloudEventExtension { + + private final Map cloudEventExtensions; + + ToCloudEventTransformerExtension(Map headers) { + this.cloudEventExtensions = headers; + } + + @Override + public void readFrom(CloudEventExtensions extensions) { + throw new UnsupportedOperationException(); + } + + @Override + public @Nullable Object getValue(String key) throws IllegalArgumentException { + return this.cloudEventExtensions.get(key); + } + + @Override + public Set getKeys() { + return this.cloudEventExtensions.keySet(); + } + + } + + /** + * CloudEvent writer implementation that converts CloudEvent objects into + * Spring Integration {@link Message} instances with CloudEvent attributes as headers. + */ + private static class MessageBuilderMessageWriter implements CloudEventWriter>, + CloudEventWriterFactory> { + + private final String cloudEventPrefix; + + private final Map headers; + + MessageBuilderMessageWriter(String cloudEventPrefix, Map headers) { + this.headers = headers; + this.cloudEventPrefix = cloudEventPrefix; + } + + /** + * Complete the message creation with CloudEvent data. + * Create a message with the CloudEvent data as the payload. Set CloudEvent attributes + * as headers via {@link #withContextAttribute(String, String)}. + * @param value the CloudEvent data to use as the message payload + * @return the Spring Integration message with CloudEvent data and attributes + * @throws CloudEventRWException if an error occurs during message creation + */ + @Override + public Message end(CloudEventData value) throws CloudEventRWException { + return MessageBuilder + .withPayload(value.toBytes()) + .copyHeaders(this.headers) + .build(); + } + + /** + * Complete the message creation without CloudEvent data. + * Create a message with an empty payload when the CloudEvent contains no data. + * Set CloudEvent attributes as headers via {@link #withContextAttribute(String, String)}. + * @return the Spring Integration message with an empty payload and CloudEvent attributes as headers + */ + @Override + public Message end() { + return MessageBuilder + .withPayload(new byte[0]) + .copyHeaders(this.headers) + .build(); + } + + /** + * Add a CloudEvent context attribute to the message headers. + * Map the CloudEvent attribute to a message header by prepending the configured prefix + * to the attribute name (e.g., "id" becomes "ce-id" with default prefix). + * @param name the CloudEvent attribute name + * @param value the CloudEvent attribute value + * @return this writer for method chaining + * @throws CloudEventRWException if an error occurs while setting the attribute + */ + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + this.headers.put(this.cloudEventPrefix + name, value); + return this; + } + + /** + * Initialize the writer with the CloudEvent specification version. + * Set the specification version as a message header using the configured version key. + * @param version the CloudEvent specification version + * @return this writer for method chaining + */ + @Override + public MessageBuilderMessageWriter create(SpecVersion version) { + this.headers.put(this.cloudEventPrefix + "specversion", version.toString()); + return this; + } + + } + +} diff --git a/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java new file mode 100644 index 0000000000..4809240669 --- /dev/null +++ b/spring-integration-cloudevents/src/main/java/org/springframework/integration/cloudevents/transformer/package-info.java @@ -0,0 +1,6 @@ +/** + * Provides CloudEvents transformer implementations + */ + +@org.jspecify.annotations.NullMarked +package org.springframework.integration.cloudevents.transformer; diff --git a/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformerTests.java b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformerTests.java new file mode 100644 index 0000000000..53adc04614 --- /dev/null +++ b/spring-integration-cloudevents/src/test/java/org/springframework/integration/cloudevents/transformer/ToCloudEventTransformerTests.java @@ -0,0 +1,374 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.cloudevents.transformer; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.cloudevents.CloudEvent; +import io.cloudevents.jackson.JsonCloudEventData; +import io.cloudevents.jackson.JsonFormat; +import io.cloudevents.xml.XMLFormat; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.expression.ExpressionParser; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.transformer.MessageTransformationException; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + + +/** + * Test {@link ToCloudEventTransformer} transformer. + * + * @author Glenn Renfro + * + * @since 7.1 + */ +@DirtiesContext +@SpringJUnitConfig +class ToCloudEventTransformerTests { + + private static final String TRACE_HEADER = "traceid"; + + private static final String SPAN_HEADER = "spanid"; + + private static final String USER_HEADER = "userid"; + + private static final byte[] TEXT_PLAIN_PAYLOAD = "\"test message\"".getBytes(StandardCharsets.UTF_8); + + private static final byte[] XML_PAYLOAD = ("" + + "testmessage").getBytes(StandardCharsets.UTF_8); + + private static final byte[] JSON_PAYLOAD = ("{\"message\":\"Hello, World!\"}").getBytes(StandardCharsets.UTF_8); + + @Autowired + private ToCloudEventTransformer xmlTransformerWithNoExtensions; + + @Autowired + private ToCloudEventTransformer jsonTransformerWithNoExtensions; + + @Autowired + private ToCloudEventTransformer jsonTransformerWithExtensions; + + @Autowired + private ToCloudEventTransformer transformerWithNoExtensionsNoFormat; + + @Autowired + private ToCloudEventTransformer transformerWithNoExtensionsNoFormatEnabled; + + @Autowired + private ToCloudEventTransformer transformerWithExtensionsNoFormat; + + @Autowired + private ToCloudEventTransformer transformerWithExtensionsNoFormatWithPrefix; + + @Autowired + private ToCloudEventTransformer xmlTransformerWithInvalidIDExpression; + + @Autowired + private ToCloudEventTransformer transformerWithNoExtensionsNoFormatEnabledWithProviderExpression; + + private final JsonFormat jsonFormat = new JsonFormat(); + + private final XMLFormat xmlFormat = new XMLFormat(); + + @Test + void transformWithPayloadBasedOnJsonFormatContentTypeWithProviderExpression() { + Message originalMessage = createBaseMessage(JSON_PAYLOAD, "text/plain") + .setHeader("customheader", "test-value") + .setHeader("otherheader", "other-value") + .build(); + ToCloudEventTransformer transformer = this.transformerWithNoExtensionsNoFormatEnabledWithProviderExpression; + + Message message = (Message) transformer.doTransform(originalMessage); + + CloudEvent cloudEvent = this.jsonFormat.deserialize(message.getPayload()); + verifyCloudEvent(cloudEvent, "transformerWithNoExtensionsNoFormatEnabledWithProviderExpression", + "text/plain"); + assertThat(cloudEvent.getData().toBytes()).isEqualTo(JSON_PAYLOAD); + } + + @Test + void transformWithPayloadBasedOnJsonFormatContentType() { + Message message = + getTransformerNoExtensions(JSON_PAYLOAD, this.jsonTransformerWithNoExtensions, JsonFormat.CONTENT_TYPE); + CloudEvent cloudEvent = this.jsonFormat.deserialize(message.getPayload()); + verifyCloudEvent(cloudEvent, "jsonTransformerWithNoExtensions", JsonFormat.CONTENT_TYPE); + assertThat(((JsonCloudEventData) cloudEvent.getData()).getNode().toString().getBytes(StandardCharsets.UTF_8)). + isEqualTo(JSON_PAYLOAD); + } + + @Test + void transformWithPayloadBasedOnApplicationJsonType() { + Message message = + getTransformerNoExtensions(JSON_PAYLOAD, this.jsonTransformerWithNoExtensions, + "application/json"); + CloudEvent cloudEvent = this.jsonFormat.deserialize(message.getPayload()); + verifyCloudEvent(cloudEvent, "jsonTransformerWithNoExtensions", "application/json"); + assertThat(new String(cloudEvent.getData().toBytes())).contains("{\"message\":\"Hello, World!\"}"); + } + + @Test + void transformWithPayloadBasedOnApplicationXMLType() throws IOException { + Message message = getTransformerNoExtensions(XML_PAYLOAD, + this.xmlTransformerWithNoExtensions, "application/xml"); + CloudEvent cloudEvent = xmlFormat.deserialize(message.getPayload()); + verifyCloudEvent(cloudEvent, "xmlTransformerWithNoExtensions", "application/xml"); + assertThat(new String(cloudEvent.getData().toBytes())).contains("testmessage"); + } + + @Test + void transformWithPayloadBasedOnContentXMLFormatType() { + Message message = getTransformerNoExtensions(XML_PAYLOAD, + this.xmlTransformerWithNoExtensions, XMLFormat.XML_CONTENT_TYPE); + CloudEvent cloudEvent = this.xmlFormat.deserialize(message.getPayload()); + verifyCloudEvent(cloudEvent, "xmlTransformerWithNoExtensions", XMLFormat.XML_CONTENT_TYPE); + assertThat(cloudEvent.getData().toBytes()).isEqualTo(XML_PAYLOAD); + } + + @Test + void convertMessageNoExtensions() { + Message message = MessageBuilder.withPayload(TEXT_PLAIN_PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = (Message) this.transformerWithNoExtensionsNoFormat.doTransform(message); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER); + assertThat(result.getHeaders()).doesNotContainKeys("ce-" + TRACE_HEADER, "ce-" + SPAN_HEADER); + assertThat(result.getHeaders()).containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain"); + assertThat(result.getPayload()).isEqualTo(TEXT_PLAIN_PAYLOAD); + } + + @Test + void convertMessageWithExtensions() { + Message message = MessageBuilder.withPayload(TEXT_PLAIN_PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = (Message) transformerWithExtensionsNoFormat.doTransform(message); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER). + containsKeys("ce-" + TRACE_HEADER, "ce-" + SPAN_HEADER); + assertThat(result.getPayload()).isEqualTo(TEXT_PLAIN_PAYLOAD); + } + + @Test + void convertMessageWithExtensionsNewPrefix() { + Message message = MessageBuilder.withPayload(TEXT_PLAIN_PAYLOAD) + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .setHeader(TRACE_HEADER, "test-value") + .setHeader(SPAN_HEADER, "other-value") + .build(); + Message result = (Message) this.transformerWithExtensionsNoFormatWithPrefix.doTransform(message); + assertThat(result.getHeaders()).containsKeys(TRACE_HEADER, SPAN_HEADER, "CLOUDEVENTS-" + TRACE_HEADER, + "CLOUDEVENTS-" + SPAN_HEADER, "CLOUDEVENTS-id", "CLOUDEVENTS-specversion", + "CLOUDEVENTS-datacontenttype"); + assertThat(result.getPayload()).isEqualTo(TEXT_PLAIN_PAYLOAD); + } + + @Test + @SuppressWarnings("unchecked") + void doTransformWithObjectPayload() throws Exception { + TestRecord testRecord = new TestRecord("sample data"); + byte[] payload = convertPayloadToBytes(testRecord); + Message message = MessageBuilder.withPayload(payload).setHeader("test_id", "test-id") + .setHeader(MessageHeaders.CONTENT_TYPE, JsonFormat.CONTENT_TYPE) + .build(); + + Message resultMessage = (Message) this.jsonTransformerWithNoExtensions.doTransform(message); + CloudEvent cloudEvent = this.jsonFormat.deserialize(resultMessage.getPayload()); + verifyCloudEvent(cloudEvent, "jsonTransformerWithNoExtensions", JsonFormat.CONTENT_TYPE); + assertThat(new String(resultMessage.getPayload())).contains(new String(payload)); + } + + @Test + void noContentType() { + Message message = MessageBuilder.withPayload(TEXT_PLAIN_PAYLOAD).build(); + Message result = this.transformerWithNoExtensionsNoFormat.transform(message); + assertThat(result.getHeaders()).containsEntry("ce-datacontenttype", "application/octet-stream"); + assertThat(message.getPayload()).isEqualTo(TEXT_PLAIN_PAYLOAD); + } + + @Test + void noContentTypeNoFormatEnabled() { + Message message = MessageBuilder.withPayload(TEXT_PLAIN_PAYLOAD).build(); + assertThatThrownBy(() -> this.transformerWithNoExtensionsNoFormatEnabled.transform(message)) + .isInstanceOf(MessageTransformationException.class) + .hasMessageContaining("No EventFormat found for 'application/octet-stream'"); + } + + @Test + @SuppressWarnings({"unchecked"}) + void multipleExtensionMappingsWithJsonFormatType() { + Message message = createBaseMessage(JSON_PAYLOAD, JsonFormat.CONTENT_TYPE) + .setHeader("correlation-id", "corr-999") + .setHeader(TRACE_HEADER, "trace-123") + .build(); + + Message resultMessage = (Message) this.jsonTransformerWithExtensions.doTransform(message); + + CloudEvent cloudEvent = this.jsonFormat.deserialize(resultMessage.getPayload()); + + assertThat(resultMessage.getHeaders()).containsEntry("correlation-id", "corr-999"); + verifyCloudEvent(cloudEvent, "jsonTransformerWithExtensions", JsonFormat.CONTENT_TYPE); + assertThat(new String(resultMessage.getPayload())).contains("\"traceid\":\"trace-123\""); + } + + @Test + @SuppressWarnings({"unchecked"}) + void multipleExtensionMappingsWithApplicationJsonType() { + Message message = createBaseMessage(JSON_PAYLOAD, "application/json") + .setHeader("correlation-id", "corr-999") + .setHeader(TRACE_HEADER, "trace-123") + .build(); + + Object result = this.jsonTransformerWithExtensions.doTransform(message); + + Message resultMessage = (Message) result; + CloudEvent cloudEvent = this.jsonFormat.deserialize(resultMessage.getPayload()); + assertThat(resultMessage.getHeaders()).containsEntry("correlation-id", "corr-999"); + assertThat(cloudEvent.getExtensionNames()).containsExactly("traceid"). + doesNotContain("correlation-id"); + verifyCloudEvent(cloudEvent, "jsonTransformerWithExtensions", "application/json"); + } + + @Test + void emptyStringPayloadHandling() { + Message message = createBaseMessage("".getBytes(), "text/plain").build(); + Message resultMessage = (Message) this.jsonTransformerWithNoExtensions.doTransform(message); + CloudEvent cloudEvent = this.jsonFormat.deserialize(resultMessage.getPayload()); + assertThat(cloudEvent.getData().toBytes()).isEmpty(); + verifyCloudEvent(cloudEvent, "jsonTransformerWithNoExtensions", "text/plain"); + } + + @Test + void failWhenNoIdHeaderAndNoDefault() { + Message message = createBaseMessage(TEXT_PLAIN_PAYLOAD, JsonFormat.CONTENT_TYPE).build(); + assertThatThrownBy(() -> this.xmlTransformerWithInvalidIDExpression.transform(message)) + .isInstanceOf(MessageTransformationException.class) + .hasMessageContaining("failed to transform message"); + } + + private static void verifyCloudEvent(CloudEvent cloudEvent, String beanName, String type) { + assertThat(cloudEvent.getDataContentType()).isEqualTo(type); + assertThat(cloudEvent.getSource().toString()).isEqualTo("/spring/null." + beanName); + assertThat(cloudEvent.getType()).isEqualTo("spring.message"); + assertThat(cloudEvent.getDataSchema()).isNull(); + } + + private static Message getTransformerNoExtensions(byte[] payload, + ToCloudEventTransformer transformer, String contentType) { + Message message = createBaseMessage(payload, contentType) + .setHeader("customheader", "test-value") + .setHeader("otherheader", "other-value") + .build(); + return (Message) transformer.doTransform(message); + } + + private static byte[] convertPayloadToBytes(TestRecord testRecord) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectWriter writer = objectMapper.writer(); + return writer.writeValueAsBytes(testRecord); + } + + private static MessageBuilder createBaseMessage(byte[] payload, String contentType) { + return MessageBuilder.withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, contentType); + } + + @Configuration(proxyBeanMethods = false) + @EnableIntegration + public static class ContextConfiguration { + + private static final String[] TEST_PATTERNS = {"trace*", SPAN_HEADER, USER_HEADER}; + + private static final ExpressionParser parser = new SpelExpressionParser(); + + @Bean + public ToCloudEventTransformer xmlTransformerWithNoExtensions() { + return new ToCloudEventTransformer(new XMLFormat()); + } + + @Bean + public ToCloudEventTransformer jsonTransformerWithNoExtensions() { + return new ToCloudEventTransformer(new JsonFormat()); + } + + @Bean + public ToCloudEventTransformer jsonTransformerWithExtensions() { + return new ToCloudEventTransformer(new JsonFormat(), TEST_PATTERNS); + } + + @Bean + public ToCloudEventTransformer transformerWithNoExtensionsNoFormat() { + return new ToCloudEventTransformer(); + } + + @Bean + public ToCloudEventTransformer transformerWithExtensionsNoFormat() { + return new ToCloudEventTransformer(null, TEST_PATTERNS); + } + + @Bean + public ToCloudEventTransformer transformerWithExtensionsNoFormatWithPrefix() { + ToCloudEventTransformer toCloudEventsTransformer = new ToCloudEventTransformer(null, TEST_PATTERNS); + toCloudEventsTransformer.setCloudEventPrefix("CLOUDEVENTS-"); + return toCloudEventsTransformer; + } + + @Bean + public ToCloudEventTransformer xmlTransformerWithInvalidIDExpression() { + ToCloudEventTransformer transformer = new ToCloudEventTransformer(new XMLFormat()); + transformer.setEventIdExpression(parser.parseExpression("null")); + return transformer; + } + + @Bean + public ToCloudEventTransformer transformerWithNoExtensionsNoFormatEnabled() { + ToCloudEventTransformer toCloudEventsTransformer = new ToCloudEventTransformer(); + toCloudEventsTransformer.setFailOnNoFormat(true); + return toCloudEventsTransformer; + } + + @Bean + public ToCloudEventTransformer transformerWithNoExtensionsNoFormatEnabledWithProviderExpression() { + ToCloudEventTransformer toCloudEventsTransformer = new ToCloudEventTransformer(); + toCloudEventsTransformer.setEventFormatContentTypeExpression(new LiteralExpression(JsonFormat.CONTENT_TYPE)); + toCloudEventsTransformer.setFailOnNoFormat(true); + return toCloudEventsTransformer; + } + } + + private record TestRecord(String sampleValue) implements Serializable { } + +} diff --git a/src/reference/antora/modules/ROOT/nav.adoc b/src/reference/antora/modules/ROOT/nav.adoc index cd6d7d682d..4c7cd4a747 100644 --- a/src/reference/antora/modules/ROOT/nav.adoc +++ b/src/reference/antora/modules/ROOT/nav.adoc @@ -124,6 +124,7 @@ ** xref:amqp/amqp-1.0.adoc[] * xref:camel.adoc[] * xref:cassandra.adoc[] +* xref:cloudevents.adoc[] * xref:debezium.adoc[] * xref:event.adoc[] * xref:feed.adoc[] diff --git a/src/reference/antora/modules/ROOT/pages/cloudevents.adoc b/src/reference/antora/modules/ROOT/pages/cloudevents.adoc new file mode 100644 index 0000000000..4fe041f62b --- /dev/null +++ b/src/reference/antora/modules/ROOT/pages/cloudevents.adoc @@ -0,0 +1,171 @@ +[[cloudevents]] += CloudEvents Support + +Use Spring Integration transformers (starting with version 7.1) to transform messages into CloudEvent messages. +The implementation is fully based on the https://github.com/cloudevents/sdk-java[CloudEvents SDK] project. + +Add the following dependency to your project: + +[tabs] +====== +Maven:: ++ +[source, xml, subs="normal", role="primary"] +---- + + org.springframework.integration + spring-integration-cloudevents + {project-version} + +---- + +Gradle:: ++ +[source, groovy, subs="normal", role="secondary"] +---- +compile "org.springframework.integration:spring-integration-cloudevents:{project-version}" +---- +====== + +[[to-cloud-event-transformer]] + +=== `ToCloudEventTransformer` + +Use the `ToCloudEventTransformer` to convert Spring Integration messages into CloudEvents compliant messages. +This transformer supports the CloudEvents specification v1.0, automatically serializes CloudEvents if an EventFormat or eventFormatContentTypeExpression is specified. +The transformer supports defining attributes using Expressions, and identifies extensions in the message headers via patterns. + +NOTE: Ensure messages to be transformed have a payload of type `byte[]`. +The transformer throws an `IllegalArgumentException` if the payload is not a byte array. + +==== Attribute Expressions + +Set the CloudEvents' attributes of `id`, `source`, `type`, `dataSchema`, and `subject` through SpEL ``Expression``s. + +NOTE: The transformer sets the `time` attribute to the time when it creates the CloudEvent message. + +The following table lists the attribute names and the values the default ``Expression``s return. + +|=== +| Attribute Name | Default Value + +| `id` +| The id of the message. + +| `source` +| Prefix of "/spring/" followed by the appName, a period, and then the name of the transformer's bean (for example, `/spring/myapp.toCloudEventTransformerBean`). + +| `type` +| "spring.message" + +| `dataContentType` +| The contentType of the message, defaults to `application/octet-stream`. +Some other examples are but not limited to: `application/json`, `application/x-avro`, and `application/xml`. + +| `dataSchema` +| `null` + +| `subject` +| `null` + +| `time` +| The time the CloudEvent message is created. +|=== + +==== Extension Patterns + +Use the extensionPatterns constructor parameter (a vararg of strings) to specify pattern matching with wildcards (`*`). +The transformer includes message headers with keys matching any pattern as CloudEvent extensions. +Use a `!` prefix to explicitly exclude headers through negation. +Note that the first matching pattern wins (positive or negative). + +For example, configure patterns `"trace*", "span-id", "user-id"` to: +- Include headers starting with `trace` (e.g., `trace-id`, `traceparent`) +- Include headers with exact keys `span-id` and `user-id` +- Add all matching headers as extensions to the CloudEvent + +To exclude specific headers, use negated patterns: `"custom-*", "!custom-internal"` includes all headers starting with `custom-` except `custom-internal`. + +[[constructors]] + +==== Constructors + +Choose from three constructors for flexible configuration: + +[source,java] +---- +// Default constructor - no EventFormat, no extension patterns +new ToCloudEventTransformer() + +// Constructor with EventFormat injection +new ToCloudEventTransformer(eventFormat) + +// Constructor with EventFormat and extension patterns +new ToCloudEventTransformer(eventFormat, "trace*", "span-id", "user-id") +---- + +[[configuration-with-dsl]] + +==== Configuration With DSL + +Add the CloudEvent transformer to flows using the DSL: + +[source,java] +---- +@Bean +public ToCloudEventTransformer cloudEventTransformer() { + return new ToCloudEventTransformer(new JsonFormat(), "trace*", "correlation-id"); +} + +@Bean +public IntegrationFlow cloudEventTransformFlow(ToCloudEventTransformer toCloudEventTransformer) { + return IntegrationFlows + .from("inputChannel") + .transform(toCloudEventTransformer) + .channel("outputChannel") + .get(); +} +---- + +[[cloudevent-transformer-process]] + +==== CloudEvent Transformer Process + +Understand the transformation process: + +1. **CloudEvent Building**: Build CloudEvent attributes. +2. **Extension Extraction**: Build the CloudEvent extensions using the array of extensionPatterns passed into the constructor. +3. **Format Conversion**: Apply the specified `EventFormat` or, if not set, handle conversion via Binary Format Mode. + +A basic transformation may have the following pattern: + +[source,java] +---- +// Input message with headers +Message inputMessage = MessageBuilder + .withPayload("Hello CloudEvents".getBytes(StandardCharsets.UTF_8)) + .withHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .build(); + +ToCloudEventTransformer transformer = new ToCloudEventTransformer(); + +// Transform to CloudEvent +Object cloudEventMessage = transformer.transform(inputMessage); +---- + +[[eventformats]] + +==== EventFormats +Use ``EventFormat``s to serialize the CloudEvent into the message's payload when the EventFormat is available, or use Binary Format Mode when an `EventFormat` is not available. +Set the EventFormat in one of two ways: + +1. Set the desired `EventFormat` via the constructor +2. Set the `eventFormatContentTypeExpression` with an expression that resolves to a content type that `eventFormatProvider` can use to provide the required `EventFormat`. + +However, when you do not specify the `EventFormat` via the constructor and the type is not supported by one of the cloudevents dependencies and `failOnNoFormat` is set to `false` (for example `text/plain`), the transformer adds cloud event attributes and extensions to the message headers with the cloud event prefix (default is `ce-`) and leaves the payload unchanged (Binary Format Mode). +When `failOnNoFormat` is set to `true`, the transformer throws a `MessageTransformationException` if it cannot find an `EventFormat`. + +To utilize a specific `EventFormat`, add the associated dependency. +For example, to add the XML `EventFormat`, add the following dependency: `io.cloudevents:cloudevents-xml`. +See the https://github.com/cloudevents/sdk-java[CloudEvents SDK project's] reference docs for information on the ``EventFormat``s that are available. + diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index e46a1a2898..dbc8b85907 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -19,3 +19,9 @@ Java 17 is still the baseline, but Java 25 is supported. The Web Services Outbound Gateway now can rely on the provided `WebServiceTemplate.defaultUri`. See xref:ws.adoc[] for more information. + +[[x7.1-cloudevents]] +=== CloudEvents + +CloudEvents are now supported in the `spring-integration-cloudevents` module. +See xref:cloudevents.adoc[] for more information.