From 8dc48db03f36ccc7283e49e6c9f8fa16aeeea2b8 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 29 Jul 2025 16:06:11 +0200 Subject: [PATCH 1/6] MINIFICPP-2596 Add XMLRecordSetWriter controller service --- CONTROLLERS.md | 21 ++ .../controllers/XMLRecordSetWriter.cpp | 145 ++++++++ .../controllers/XMLRecordSetWriter.h | 129 +++++++ .../tests/unit/XMLRecordSetWriterTests.cpp | 339 ++++++++++++++++++ 4 files changed, 634 insertions(+) create mode 100644 extensions/standard-processors/controllers/XMLRecordSetWriter.cpp create mode 100644 extensions/standard-processors/controllers/XMLRecordSetWriter.h create mode 100644 extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 5e325f999b..71a6de95c1 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -33,6 +33,7 @@ limitations under the License. - [UpdatePolicyControllerService](#UpdatePolicyControllerService) - [VolatileMapStateStorage](#VolatileMapStateStorage) - [XMLReader](#XMLReader) +- [XMLRecordSetWriter](#XMLRecordSetWriter) ## AWSCredentialsService @@ -351,3 +352,23 @@ In the list below, the names of required properties appear in bold. Any other pr | **Parse XML Attributes** | false | true
false | When this property is 'true' then XML attributes are parsed and added to the record as new fields, otherwise XML attributes and their values are ignored. | | Attribute Prefix | | | If this property is set, the name of attributes will be prepended with a prefix when they are added to a record. | | **Expect Records as Array** | false | true
false | This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element". Because XML does not provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire XML blob with a "wrapper element". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a "wrapper element" that will be ignored. | + + +## XMLRecordSetWriter + +### Description + +Writes a RecordSet to XML. The records are wrapped by a root tag. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|-----------------------------|---------------|-----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Array Tag Name | | | Name of the tag used by property "Wrap Elements of Arrays" to write arrays | +| **Wrap Elements of Arrays** | No Wrapping | Use Property as Wrapper
Use Property for Elements
No Wrapping | Specifies how the writer wraps elements of fields of type array | +| **Omit XML Declaration** | false | true
false | Specifies whether or not to include XML declaration | +| **Pretty Print XML** | false | true
false | Specifies whether or not the XML should be pretty printed | +| **Name of Record Tag** | | | Specifies the name of the XML record tag wrapping the record fields. | +| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile. | diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp new file mode 100644 index 0000000000..64c5ee1755 --- /dev/null +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp @@ -0,0 +1,145 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "XMLRecordSetWriter.h" + +#include "core/Resource.h" +#include "Exception.h" +#include "utils/TimeUtil.h" + +namespace org::apache::nifi::minifi::standard { + +void XMLRecordSetWriter::onEnable() { + if (auto wrap_elements_of_arrays = magic_enum::enum_cast(getProperty(WrapElementsOfArrays.name).value_or("No Wrapping"))) { + wrap_elements_of_arrays_ = *wrap_elements_of_arrays; + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid value for Wrap Elements of Arrays property: " + getProperty(WrapElementsOfArrays.name).value_or("")); + } + + array_tag_name_ = getProperty(ArrayTagName.name).value_or(""); + if (array_tag_name_.empty() && + (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper || + wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Array Tag Name property must be set when Wrap Elements of Arrays is set to Use Property as Wrapper or Use Property for Elements"); + } + + omit_xml_declaration_ = getProperty(OmitXMLDeclaration.name).value_or("false") == "true"; + pretty_print_xml_ = getProperty(PrettyPrintXML.name).value_or("false") == "true"; + + name_of_record_tag_ = getProperty(NameOfRecordTag.name).value_or(""); + if (name_of_record_tag_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Name of Record Tag property must be set"); + } + + name_of_root_tag_ = getProperty(NameOfRootTag.name).value_or(""); + if (name_of_root_tag_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Name of Root Tag property must be set"); + } +} + +std::string XMLRecordSetWriter::formatXmlOutput(pugi::xml_document& xml_doc) const { + std::ostringstream xml_string_stream; + uint64_t xml_formatting_flags = 0; + if (pretty_print_xml_) { + xml_formatting_flags |= pugi::format_indent; + } else { + xml_formatting_flags |= pugi::format_raw; + } + if (omit_xml_declaration_) { + xml_formatting_flags |= pugi::format_no_declaration; + } + xml_doc.save(xml_string_stream, " ", gsl::narrow(xml_formatting_flags)); + return xml_string_stream.str(); +} + +void XMLRecordSetWriter::convertRecordArrayField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const { + const auto& record_array = std::get(field.value_); + pugi::xml_node array_node; + if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper) { + array_node = parent_node.append_child(array_tag_name_.c_str()); + } else if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements) { + array_node = parent_node.append_child(field_name.c_str()); + } + for (const auto& array_field : record_array) { + if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyAsWrapper) { + convertRecordField(field_name, array_field, array_node); + } else if (wrap_elements_of_arrays_ == WrapElementsOfArraysOptions::UsePropertyForElements) { + convertRecordField(array_tag_name_, array_field, array_node); + } else { + convertRecordField(field_name, array_field, parent_node); + } + } +} + +void XMLRecordSetWriter::convertRecordField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const { + if (std::holds_alternative(field.value_)) { + convertRecordArrayField(field_name, field, parent_node); + return; + } + + pugi::xml_node field_node = parent_node.append_child(field_name.c_str()); + if (std::holds_alternative(field.value_)) { + field_node.text().set(std::get(field.value_)); + } else if (std::holds_alternative(field.value_)) { + field_node.text().set(std::to_string(std::get(field.value_)).c_str()); + } else if (std::holds_alternative(field.value_)) { + field_node.text().set(std::to_string(std::get(field.value_)).c_str()); + } else if (std::holds_alternative(field.value_)) { + field_node.text().set(fmt::format("{:g}", std::get(field.value_)).c_str()); + } else if (std::holds_alternative(field.value_)) { + field_node.text().set(std::get(field.value_) ? "true" : "false"); + } else if (std::holds_alternative(field.value_)) { + auto time_point = std::get(field.value_); + auto time_str = utils::timeutils::getDateTimeStr(std::chrono::time_point_cast(time_point)); + field_node.text().set(time_str.c_str()); + } else if (std::holds_alternative(field.value_)) { + const auto& record_object = std::get(field.value_); + for (const auto& [obj_key, obj_field] : record_object) { + convertRecordField(obj_key, obj_field, field_node); + } + } +} + +std::string XMLRecordSetWriter::convertRecordSetToXml(const core::RecordSet& record_set) const { + gsl_Expects(!name_of_record_tag_.empty() && !name_of_root_tag_.empty()); + pugi::xml_document xml_doc; + auto root_node = xml_doc.append_child(name_of_root_tag_.c_str()); + + for (const auto& record : record_set) { + auto record_node = root_node.append_child(name_of_record_tag_.c_str()); + for (const auto& [key, field] : record) { + convertRecordField(key, field, record_node); + } + } + + return formatXmlOutput(xml_doc); +} + +void XMLRecordSetWriter::write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) { + if (!flow_file) { + logger_->log_error("FlowFile is null, cannot write RecordSet to XML"); + return; + } + + auto xml_content = convertRecordSetToXml(record_set); + session.write(flow_file, [&xml_content](const std::shared_ptr& stream) -> int64_t { + stream->write(reinterpret_cast(xml_content.data()), xml_content.size()); + return gsl::narrow(xml_content.size()); + }); +} + +REGISTER_RESOURCE(XMLRecordSetWriter, ControllerService); +} // namespace org::apache::nifi::minifi::standard diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.h b/extensions/standard-processors/controllers/XMLRecordSetWriter.h new file mode 100644 index 0000000000..00e66284bd --- /dev/null +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.h @@ -0,0 +1,129 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "controllers/RecordSetWriter.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerFactory.h" +#include "pugixml.hpp" + +namespace org::apache::nifi::minifi::standard { +enum class WrapElementsOfArraysOptions { + UsePropertyAsWrapper, + UsePropertyForElements, + NoWrapping +}; +} // namespace org::apache::nifi::minifi::standard + +namespace magic_enum::customize { +using WrapElementsOfArraysOptions = org::apache::nifi::minifi::standard::WrapElementsOfArraysOptions; + +template <> +constexpr customize_t enum_name(WrapElementsOfArraysOptions value) noexcept { + switch (value) { + case WrapElementsOfArraysOptions::UsePropertyAsWrapper: + return "Use Property as Wrapper"; + case WrapElementsOfArraysOptions::UsePropertyForElements: + return "Use Property for Elements"; + case WrapElementsOfArraysOptions::NoWrapping: + return "No Wrapping"; + } + return invalid_tag; +} +} // namespace magic_enum::customize + +namespace org::apache::nifi::minifi::standard { + +class XMLRecordSetWriter final : public core::RecordSetWriterImpl { + public: + explicit XMLRecordSetWriter(const std::string_view name, const utils::Identifier& uuid = {}) : RecordSetWriterImpl(name, uuid) {} + + XMLRecordSetWriter(XMLRecordSetWriter&&) = delete; + XMLRecordSetWriter(const XMLRecordSetWriter&) = delete; + XMLRecordSetWriter& operator=(XMLRecordSetWriter&&) = delete; + XMLRecordSetWriter& operator=(const XMLRecordSetWriter&) = delete; + + ~XMLRecordSetWriter() override = default; + + EXTENSIONAPI static constexpr const char* Description = "Writes a RecordSet to XML. The records are wrapped by a root tag."; + + EXTENSIONAPI static constexpr auto ArrayTagName = core::PropertyDefinitionBuilder<>::createProperty("Array Tag Name") + .withDescription("Name of the tag used by property \"Wrap Elements of Arrays\" to write arrays") + .build(); + EXTENSIONAPI static constexpr auto WrapElementsOfArrays = core::PropertyDefinitionBuilder<3>::createProperty("Wrap Elements of Arrays") + .withDescription("Specifies how the writer wraps elements of fields of type array") + .withDefaultValue(magic_enum::enum_name(WrapElementsOfArraysOptions::NoWrapping)) + .withAllowedValues(magic_enum::enum_names()) + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto OmitXMLDeclaration = core::PropertyDefinitionBuilder<>::createProperty("Omit XML Declaration") + .withDescription("Specifies whether or not to include XML declaration") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) + .withDefaultValue("false") + .build(); + EXTENSIONAPI static constexpr auto PrettyPrintXML = core::PropertyDefinitionBuilder<>::createProperty("Pretty Print XML") + .withDescription("Specifies whether or not the XML should be pretty printed") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) + .withDefaultValue("false") + .build(); + EXTENSIONAPI static constexpr auto NameOfRecordTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Record Tag") + .withDescription("Specifies the name of the XML record tag wrapping the record fields.") + .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto NameOfRootTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Root Tag") + .withDescription("Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile.") + .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::array{ + ArrayTagName, WrapElementsOfArrays, OmitXMLDeclaration, PrettyPrintXML, NameOfRecordTag, NameOfRootTag + }; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + void write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) override; + + void initialize() override { + setSupportedProperties(Properties); + } + void onEnable() override; + void yield() override {} + bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; } + bool isWorkAvailable() override { return false; } + + private: + std::string formatXmlOutput(pugi::xml_document& xml_doc) const; + std::string convertRecordSetToXml(const core::RecordSet& record_set) const; + void convertRecordArrayField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const; + void convertRecordField(const std::string& field_name, const core::RecordField& field, pugi::xml_node& parent_node) const; + + WrapElementsOfArraysOptions wrap_elements_of_arrays_ = WrapElementsOfArraysOptions::NoWrapping; + std::string array_tag_name_; + bool omit_xml_declaration_ = false; + bool pretty_print_xml_ = false; + std::string name_of_record_tag_; + std::string name_of_root_tag_; + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::standard diff --git a/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp b/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp new file mode 100644 index 0000000000..4efc5ab5ea --- /dev/null +++ b/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp @@ -0,0 +1,339 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "pugixml.hpp" +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "../controllers/XMLRecordSetWriter.h" +#include "io/BufferStream.h" +#include "core/ProcessSession.h" +#include "catch2/generators/catch_generators.hpp" +#include "utils/StringUtils.h" + +namespace org::apache::nifi::minifi::test { + +class XMLRecordSetWriterTestFixture { + public: + const core::Relationship Success{"success", "everything is fine"}; + + XMLRecordSetWriterTestFixture() : xml_record_set_writer_("XMLRecordSetWriter") { + test_plan_ = test_controller_.createPlan(); + dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); + context_ = [this] { + test_plan_->runNextProcessor(); + return test_plan_->getCurrentContext(); + }(); + process_session_ = std::make_unique(context_); + } + + std::string writeRecordsAsXml(const core::RecordSet& record_set, const std::unordered_map& properties) { + xml_record_set_writer_.initialize(); + for (const auto& [key, value] : properties) { + REQUIRE(xml_record_set_writer_.setProperty(key, std::string{value})); + } + xml_record_set_writer_.onEnable(); + + auto flow_file = process_session_->create(); + xml_record_set_writer_.write(record_set, flow_file, *process_session_); + transferAndCommit(flow_file); + std::string xml_content; + process_session_->read(*flow_file, [&xml_content](const std::shared_ptr& input_stream) { + std::vector buffer(input_stream->size()); + input_stream->read(buffer); + xml_content = std::string(reinterpret_cast(buffer.data()), buffer.size()); + return gsl::narrow(input_stream->size()); + }); + return xml_content; + } + + static void verifyValuesUnderNode(const std::string& xml_content, const std::string& node_path, const std::unordered_map& expected_values) { + gsl_Expects(!expected_values.empty()); + pugi::xml_document doc; + REQUIRE(doc.load_string(xml_content.c_str())); + + pugi::xml_node node = doc.document_element(); + auto node_names = minifi::utils::string::splitAndTrimRemovingEmpty(node_path, "/"); + gsl_Assert(!node_names.empty()); + REQUIRE(std::string(node.name()) == node_names[0]); + for (size_t i = 1; i < node_names.size(); ++i) { + node = node.child(node_names[i].c_str()); + REQUIRE(node); + } + + for (const auto& [field_name, expected_value] : expected_values) { + verifyXmlValue(node, field_name, expected_value); + } + } + + static void verifyArrayValuesUnderNode(const std::string& xml_content, const std::string& node_path, const std::unordered_set& expected_values) { + gsl_Expects(!expected_values.empty()); + pugi::xml_document doc; + REQUIRE(doc.load_string(xml_content.c_str())); + + pugi::xml_node node = doc.document_element(); + auto node_names = minifi::utils::string::splitAndTrimRemovingEmpty(node_path, "/"); + gsl_Assert(!node_names.empty()); + REQUIRE(std::string(node.name()) == node_names[0]); + for (size_t i = 1; i < node_names.size() - 1; ++i) { + node = node.child(node_names[i].c_str()); + REQUIRE(node); + } + + size_t count = 0; + for (const auto& child : node.children(node_names.back().c_str())) { + ++count; + REQUIRE(child); + CHECK(expected_values.contains(std::string{child.child_value()})); + } + + REQUIRE(count == expected_values.size()); + } + + static void verifyXmlValue(const pugi::xml_node& node, const std::string& field_name, const std::string& expected_value) { + auto field_node = node.child(field_name.c_str()); + REQUIRE(field_node); + std::string child_value = field_node.child_value(); + CHECK(child_value == expected_value); + }; + + private: + void transferAndCommit(const std::shared_ptr& flow_file) { + process_session_->transfer(flow_file, Success); + process_session_->commit(); + } + + TestController test_controller_; + + std::shared_ptr test_plan_; + core::Processor* dummy_processor_; + std::shared_ptr context_; + std::unique_ptr process_session_; + standard::XMLRecordSetWriter xml_record_set_writer_; +}; + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "If wrap elements of arrays is set then Array Tag Name property must be set", "[XMLRecordSetWriter]") { + standard::XMLRecordSetWriter xml_record_set_writer("XMLRecordSetWriter"); + xml_record_set_writer.initialize(); + REQUIRE(xml_record_set_writer.setProperty(standard::XMLRecordSetWriter::NameOfRecordTag.name, "record")); + REQUIRE(xml_record_set_writer.setProperty(standard::XMLRecordSetWriter::NameOfRootTag.name, "root")); + std::string wrap_element_option = GENERATE("Use Property as Wrapper", "Use Property for Elements"); + REQUIRE(xml_record_set_writer.setProperty(standard::XMLRecordSetWriter::WrapElementsOfArrays.name, wrap_element_option)); + REQUIRE_THROWS_WITH(xml_record_set_writer.onEnable(), + "Process Schedule Operation: Array Tag Name property must be set when Wrap Elements of Arrays is set to Use Property as Wrapper or Use Property for Elements"); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Name of Record Tag must be set", "[XMLRecordSetWriter]") { + standard::XMLRecordSetWriter xml_record_set_writer("XMLRecordSetWriter"); + xml_record_set_writer.initialize(); + REQUIRE_THROWS_WITH(xml_record_set_writer.onEnable(), "Process Schedule Operation: Name of Record Tag property must be set"); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Name of Root Tag must be set", "[XMLRecordSetWriter]") { + standard::XMLRecordSetWriter xml_record_set_writer("XMLRecordSetWriter"); + xml_record_set_writer.initialize(); + REQUIRE(xml_record_set_writer.setProperty(standard::XMLRecordSetWriter::NameOfRecordTag.name, "record")); + REQUIRE_THROWS_WITH(xml_record_set_writer.onEnable(), "Process Schedule Operation: Name of Root Tag property must be set"); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test empty record set", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + + bool omit_xml_declaration = false; + std::string expected_xml; + SECTION("Use XML declaration") { + expected_xml = R"()"; + } + + SECTION("Omit XML declaration") { + omit_xml_declaration = true; + expected_xml = R"()"; + } + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::OmitXMLDeclaration.name, omit_xml_declaration ? "true" : "false"}, + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"} + }); + REQUIRE(xml_content == expected_xml); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with primitive values", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("string_field", core::RecordField(std::string("value1"))); + record_object.emplace("uint_field", core::RecordField(static_cast(42))); + record_object.emplace("double_field", core::RecordField(static_cast(2.3))); + record_object.emplace("bool_field", core::RecordField(true)); + record_object.emplace("time_point_field", core::RecordField(std::chrono::system_clock::time_point(std::chrono::sys_days(std::chrono::year(2025)/1/1)))); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"} + }); + + verifyValuesUnderNode(xml_content, "root/record", { + {"string_field", "value1"}, + {"uint_field", "42"}, + {"double_field", "2.3"}, + {"bool_field", "true"}, + {"time_point_field", "2025-01-01T00:00:00Z"} + }); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with object value", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("string_field", core::RecordField(std::string("value1"))); + core::RecordObject inner_object; + inner_object.emplace("inner_field", core::RecordField(std::string("inner_value"))); + record_object.emplace("inner_object", core::RecordField(std::move(inner_object))); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"} + }); + + verifyValuesUnderNode(xml_content, "root/record", { + {"string_field", "value1"} + }); + verifyValuesUnderNode(xml_content, "root/record/inner_object", { + {"inner_field", "inner_value"} + }); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with object array", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("string_field", core::RecordField(std::string("value1"))); + core::RecordObject inner_object; + inner_object.emplace("inner_field", core::RecordField(core::RecordArray{ + core::RecordField(std::string("inner_value1")), + core::RecordField(std::string("inner_value2")) + })); + record_object.emplace("inner_object", core::RecordField(std::move(inner_object))); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"} + }); + + verifyValuesUnderNode(xml_content, "root/record", { + {"string_field", "value1"} + }); + + verifyArrayValuesUnderNode(xml_content, "root/record/inner_object/inner_field", {"inner_value1", "inner_value2"}); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with array tag name used as wrapper node", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("array_field", core::RecordField(core::RecordArray{ + core::RecordField(std::string("inner_value1")), + core::RecordField(std::string("inner_value2")) + })); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"}, + {standard::XMLRecordSetWriter::WrapElementsOfArrays.name, "Use Property as Wrapper"}, + {standard::XMLRecordSetWriter::ArrayTagName.name, "array"} + }); + + verifyArrayValuesUnderNode(xml_content, "root/record/array/array_field", {"inner_value1", "inner_value2"}); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with array tag name used as element node", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("array_field", core::RecordField(core::RecordArray{ + core::RecordField(std::string("inner_value1")), + core::RecordField(std::string("inner_value2")) + })); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"}, + {standard::XMLRecordSetWriter::WrapElementsOfArrays.name, "Use Property for Elements"}, + {standard::XMLRecordSetWriter::ArrayTagName.name, "element_name"} + }); + + verifyArrayValuesUnderNode(xml_content, "root/record/array_field/element_name", {"inner_value1", "inner_value2"}); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test multiple records wrapped", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object_1; + record_object_1.emplace("string_field", core::RecordField(std::string("value1"))); + record_object_1.emplace("uint_field", core::RecordField(static_cast(42))); + record_set.emplace_back(std::move(record_object_1)); + core::RecordObject record_object_2; + record_object_2.emplace("string_field", core::RecordField(std::string("value1"))); + record_object_2.emplace("uint_field", core::RecordField(static_cast(42))); + record_set.emplace_back(std::move(record_object_2)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"} + }); + + pugi::xml_document doc; + REQUIRE(doc.load_string(xml_content.c_str())); + auto root_node = doc.child("root"); + REQUIRE(root_node); + + size_t count = 0; + for (const auto& record_node : root_node.children("record")) { + REQUIRE(record_node); + verifyXmlValue(record_node, "string_field", "value1"); + verifyXmlValue(record_node, "uint_field", "42"); + ++count; + } + + REQUIRE(count == 2); +} + +TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test pretty print XML", "[XMLRecordSetWriter]") { + core::RecordSet record_set; + core::RecordObject record_object; + record_object.emplace("bool_field", core::RecordField(true)); + record_set.emplace_back(std::move(record_object)); + + auto xml_content = writeRecordsAsXml(record_set, { + {standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"}, + {standard::XMLRecordSetWriter::NameOfRootTag.name, "root"}, + {standard::XMLRecordSetWriter::PrettyPrintXML.name, "true"} + }); + + REQUIRE(xml_content == +R"( + + + true + + +)"); +} + +} // namespace org::apache::nifi::minifi::test From e07773559e3cfebc159eef502484713f6be269a5 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 14 Aug 2025 13:53:00 +0200 Subject: [PATCH 2/6] Fix license --- .../standard-processors/controllers/XMLRecordSetWriter.cpp | 2 +- extensions/standard-processors/controllers/XMLRecordSetWriter.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp index 64c5ee1755..5b531ee321 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp @@ -1,5 +1,5 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.h b/extensions/standard-processors/controllers/XMLRecordSetWriter.h index 00e66284bd..8d0c6a316e 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.h +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.h @@ -1,5 +1,5 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 From a03562cea386aa050ec94a7d0de95a80bf739774 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 26 Aug 2025 11:04:35 +0200 Subject: [PATCH 3/6] Fix clang tidy issues --- .../standard-processors/tests/unit/XMLRecordSetWriterTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp b/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp index 4efc5ab5ea..dec6f7d5cf 100644 --- a/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp +++ b/extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp @@ -179,7 +179,7 @@ TEST_CASE_METHOD(XMLRecordSetWriterTestFixture, "Test single record with primiti core::RecordObject record_object; record_object.emplace("string_field", core::RecordField(std::string("value1"))); record_object.emplace("uint_field", core::RecordField(static_cast(42))); - record_object.emplace("double_field", core::RecordField(static_cast(2.3))); + record_object.emplace("double_field", core::RecordField(2.3)); record_object.emplace("bool_field", core::RecordField(true)); record_object.emplace("time_point_field", core::RecordField(std::chrono::system_clock::time_point(std::chrono::sys_days(std::chrono::year(2025)/1/1)))); record_set.emplace_back(std::move(record_object)); From c211b83424ca4133fcb8e673a3e15d0e3a3341d3 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 9 Sep 2025 09:35:49 +0200 Subject: [PATCH 4/6] Review update --- CONTROLLERS.md | 17 ++--- .../controllers/XMLRecordSetWriter.cpp | 65 ++++++++++++------- .../controllers/XMLRecordSetWriter.h | 7 +- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 71a6de95c1..589798a740 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -364,11 +364,12 @@ Writes a RecordSet to XML. The records are wrapped by a root tag. In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|-----------------------------|---------------|-----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Array Tag Name | | | Name of the tag used by property "Wrap Elements of Arrays" to write arrays | -| **Wrap Elements of Arrays** | No Wrapping | Use Property as Wrapper
Use Property for Elements
No Wrapping | Specifies how the writer wraps elements of fields of type array | -| **Omit XML Declaration** | false | true
false | Specifies whether or not to include XML declaration | -| **Pretty Print XML** | false | true
false | Specifies whether or not the XML should be pretty printed | -| **Name of Record Tag** | | | Specifies the name of the XML record tag wrapping the record fields. | -| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile. | +| Name | Default Value | Allowable Values | Description | +|-----------------------------|---------------|-----------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Array Tag Name | | | Name of the tag used by property "Wrap Elements of Arrays" to write arrays | +| **Wrap Elements of Arrays** | No Wrapping | Use Property as Wrapper
Use Property for Elements
No Wrapping | Specifies how the writer wraps elements of fields of type array. If 'Use Property as Wrapper' is set, the property "Array Tag Name" will be used as the tag name to wrap elements of an array. The field name of the array field will be used for the tag name of the elements. If 'Use Property for Elements' is set, the property "Array Tag Name" will be used for the tag name of the elements of an array. The field name of the array field will be used as the tag name to wrap elements. If 'No Wrapping' is set, the elements of an array will not be wrapped. | +| **Omit XML Declaration** | false | true
false | Specifies whether or not to include XML declaration | +| **Pretty Print XML** | false | true
false | Specifies whether or not the XML should be pretty printed | +| **Name of Record Tag** | | | Specifies the name of the XML record tag wrapping the record fields. | +| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. This property has to be set if the writer is supposed to write multiple records in a single FlowFile. | + diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp index 5b531ee321..8241f5ab3b 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.cpp @@ -19,6 +19,8 @@ #include "core/Resource.h" #include "Exception.h" #include "utils/TimeUtil.h" +#include "utils/ParsingUtils.h" +#include "utils/GeneralUtils.h" namespace org::apache::nifi::minifi::standard { @@ -36,8 +38,18 @@ void XMLRecordSetWriter::onEnable() { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Array Tag Name property must be set when Wrap Elements of Arrays is set to Use Property as Wrapper or Use Property for Elements"); } - omit_xml_declaration_ = getProperty(OmitXMLDeclaration.name).value_or("false") == "true"; - pretty_print_xml_ = getProperty(PrettyPrintXML.name).value_or("false") == "true"; + auto parseBoolProperty = [this](std::string_view property_name) -> bool { + if (auto property_value_str = getProperty(property_name); property_value_str && !property_value_str->empty()) { + if (auto property_value = parsing::parseBool(*property_value_str)) { + return *property_value; + } + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Invalid value for {} property: {}", property_name, *property_value_str)); + } + return false; + }; + + omit_xml_declaration_ = parseBoolProperty(OmitXMLDeclaration.name); + pretty_print_xml_ = parseBoolProperty(PrettyPrintXML.name); name_of_record_tag_ = getProperty(NameOfRecordTag.name).value_or(""); if (name_of_record_tag_.empty()) { @@ -52,7 +64,7 @@ void XMLRecordSetWriter::onEnable() { std::string XMLRecordSetWriter::formatXmlOutput(pugi::xml_document& xml_doc) const { std::ostringstream xml_string_stream; - uint64_t xml_formatting_flags = 0; + unsigned int xml_formatting_flags = 0; if (pretty_print_xml_) { xml_formatting_flags |= pugi::format_indent; } else { @@ -61,7 +73,7 @@ std::string XMLRecordSetWriter::formatXmlOutput(pugi::xml_document& xml_doc) con if (omit_xml_declaration_) { xml_formatting_flags |= pugi::format_no_declaration; } - xml_doc.save(xml_string_stream, " ", gsl::narrow(xml_formatting_flags)); + xml_doc.save(xml_string_stream, " ", xml_formatting_flags); return xml_string_stream.str(); } @@ -91,26 +103,33 @@ void XMLRecordSetWriter::convertRecordField(const std::string& field_name, const } pugi::xml_node field_node = parent_node.append_child(field_name.c_str()); - if (std::holds_alternative(field.value_)) { - field_node.text().set(std::get(field.value_)); - } else if (std::holds_alternative(field.value_)) { - field_node.text().set(std::to_string(std::get(field.value_)).c_str()); - } else if (std::holds_alternative(field.value_)) { - field_node.text().set(std::to_string(std::get(field.value_)).c_str()); - } else if (std::holds_alternative(field.value_)) { - field_node.text().set(fmt::format("{:g}", std::get(field.value_)).c_str()); - } else if (std::holds_alternative(field.value_)) { - field_node.text().set(std::get(field.value_) ? "true" : "false"); - } else if (std::holds_alternative(field.value_)) { - auto time_point = std::get(field.value_); - auto time_str = utils::timeutils::getDateTimeStr(std::chrono::time_point_cast(time_point)); - field_node.text().set(time_str.c_str()); - } else if (std::holds_alternative(field.value_)) { - const auto& record_object = std::get(field.value_); - for (const auto& [obj_key, obj_field] : record_object) { - convertRecordField(obj_key, obj_field, field_node); + std::visit(utils::overloaded { + [&field_node](const std::string& str_val) { + field_node.text().set(str_val); + }, + [&field_node](int64_t i64_val) { + field_node.text().set(std::to_string(i64_val).c_str()); + }, + [&field_node](uint64_t u64_val) { + field_node.text().set(std::to_string(u64_val).c_str()); + }, + [&field_node](double double_val) { + field_node.text().set(fmt::format("{:g}", double_val).c_str()); + }, + [&field_node](bool bool_val) { + field_node.text().set(bool_val ? "true" : "false"); + }, + [&field_node](const std::chrono::system_clock::time_point& time_point) { + auto time_str = utils::timeutils::getDateTimeStr(std::chrono::time_point_cast(time_point)); + field_node.text().set(time_str.c_str()); + }, + [](const core::RecordArray&) {}, + [this, &field_node](const core::RecordObject& record_object) { + for (const auto& [obj_key, obj_field] : record_object) { + convertRecordField(obj_key, obj_field, field_node); + } } - } + }, field.value_); } std::string XMLRecordSetWriter::convertRecordSetToXml(const core::RecordSet& record_set) const { diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.h b/extensions/standard-processors/controllers/XMLRecordSetWriter.h index 8d0c6a316e..b1b5d1e3d8 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.h +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.h @@ -66,7 +66,10 @@ class XMLRecordSetWriter final : public core::RecordSetWriterImpl { .withDescription("Name of the tag used by property \"Wrap Elements of Arrays\" to write arrays") .build(); EXTENSIONAPI static constexpr auto WrapElementsOfArrays = core::PropertyDefinitionBuilder<3>::createProperty("Wrap Elements of Arrays") - .withDescription("Specifies how the writer wraps elements of fields of type array") + .withDescription("Specifies how the writer wraps elements of fields of type array. If 'Use Property as Wrapper' is set, the property \"Array Tag Name\" will be used as the tag name to wrap " + "elements of an array. The field name of the array field will be used for the tag name of the elements. If 'Use Property for Elements' is set, the property \"Array Tag Name\" will be " + "used for the tag name of the elements of an array. The field name of the array field will be used as the tag name to wrap elements. If 'No Wrapping' is set, the elements of an array " + "will not be wrapped.") .withDefaultValue(magic_enum::enum_name(WrapElementsOfArraysOptions::NoWrapping)) .withAllowedValues(magic_enum::enum_names()) .isRequired(true) @@ -89,7 +92,7 @@ class XMLRecordSetWriter final : public core::RecordSetWriterImpl { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto NameOfRootTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Root Tag") - .withDescription("Specifies the name of the XML root tag wrapping the record set. This property has to be defined if the writer is supposed to write multiple records in a single FlowFile.") + .withDescription("Specifies the name of the XML root tag wrapping the record set. This property has to be set if the writer is supposed to write multiple records in a single FlowFile.") .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .isRequired(true) .build(); From ce50c324ae43c61b0e589a8434db7c3211d2c1e6 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 9 Sep 2025 11:27:07 +0200 Subject: [PATCH 5/6] Review update --- CONTROLLERS.md | 2 +- extensions/standard-processors/controllers/XMLRecordSetWriter.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 589798a740..52bf80d400 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -371,5 +371,5 @@ In the list below, the names of required properties appear in bold. Any other pr | **Omit XML Declaration** | false | true
false | Specifies whether or not to include XML declaration | | **Pretty Print XML** | false | true
false | Specifies whether or not the XML should be pretty printed | | **Name of Record Tag** | | | Specifies the name of the XML record tag wrapping the record fields. | -| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. This property has to be set if the writer is supposed to write multiple records in a single FlowFile. | +| **Name of Root Tag** | | | Specifies the name of the XML root tag wrapping the record set. | diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.h b/extensions/standard-processors/controllers/XMLRecordSetWriter.h index b1b5d1e3d8..adaa285955 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.h +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.h @@ -92,7 +92,7 @@ class XMLRecordSetWriter final : public core::RecordSetWriterImpl { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto NameOfRootTag = core::PropertyDefinitionBuilder<>::createProperty("Name of Root Tag") - .withDescription("Specifies the name of the XML root tag wrapping the record set. This property has to be set if the writer is supposed to write multiple records in a single FlowFile.") + .withDescription("Specifies the name of the XML root tag wrapping the record set.") .withValidator(core::StandardPropertyValidators::NON_BLANK_VALIDATOR) .isRequired(true) .build(); From 5a0c4ee8d362ce664bb049bfdbae20640d72ae38 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Fri, 12 Sep 2025 17:31:15 +0200 Subject: [PATCH 6/6] Review update --- extensions/standard-processors/controllers/XMLRecordSetWriter.h | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions/standard-processors/controllers/XMLRecordSetWriter.h b/extensions/standard-processors/controllers/XMLRecordSetWriter.h index adaa285955..e9d27b995f 100644 --- a/extensions/standard-processors/controllers/XMLRecordSetWriter.h +++ b/extensions/standard-processors/controllers/XMLRecordSetWriter.h @@ -102,6 +102,7 @@ class XMLRecordSetWriter final : public core::RecordSetWriterImpl { }; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr auto ImplementsApis = std::array{ RecordSetWriter::ProvidesApi }; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES void write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) override;