From 7ed66980a01f5052c84d687f7b57eeb7b70c3164 Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Tue, 21 Oct 2025 16:10:45 +0200 Subject: [PATCH 1/7] Add deserializer class --- CMakeLists.txt | 1 + include/sparrow_ipc/deserializer.hpp | 48 +++ src/deserialize.cpp | 26 +- tests/CMakeLists.txt | 1 + tests/test_deserializer.cpp | 496 +++++++++++++++++++++++++++ 5 files changed, 565 insertions(+), 7 deletions(-) create mode 100644 include/sparrow_ipc/deserializer.hpp create mode 100644 tests/test_deserializer.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 02d2f1e..a2a704b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp diff --git a/include/sparrow_ipc/deserializer.hpp b/include/sparrow_ipc/deserializer.hpp new file mode 100644 index 0000000..c2cc319 --- /dev/null +++ b/include/sparrow_ipc/deserializer.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include + +#include + +#include "deserialize.hpp" +#include "sparrow_ipc/deserialize.hpp" + +namespace sparrow_ipc +{ + template + requires std::same_as, sparrow::record_batch> + class SPARROW_IPC_API deserializer + { + public: + + deserializer(R& data) + : m_data(&data) + { + } + + void deserialize(std::span data) + { + // Insert at the end of m_data container the deserialized record batches + auto& container = *m_data; + auto deserialized_batches = sparrow_ipc::deserialize_stream(data); + container.insert( + std::end(container), + std::make_move_iterator(std::begin(deserialized_batches)), + std::make_move_iterator(std::end(deserialized_batches)) + ); + } + + deserializer& operator<<(std::span data) + { + deserialize(data); + return *this; + } + + private: + + R* m_data; + }; +} diff --git a/src/deserialize.cpp b/src/deserialize.cpp index 5779ca9..62b76d5 100644 --- a/src/deserialize.cpp +++ b/src/deserialize.cpp @@ -52,8 +52,13 @@ namespace sparrow_ipc const size_t length = static_cast(record_batch.length()); size_t buffer_index = 0; + const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast(schema.fields()->size()); std::vector arrays; - arrays.reserve(schema.fields()->size()); + if (num_fields == 0) + { + return arrays; + } + arrays.reserve(num_fields); size_t field_idx = 0; for (const auto field : *(schema.fields())) { @@ -215,18 +220,24 @@ namespace sparrow_ipc case org::apache::arrow::flatbuf::MessageHeader::Schema: { schema = message->header_as_Schema(); - const size_t size = static_cast(schema->fields()->size()); + const size_t size = schema->fields() == nullptr + ? 0 + : static_cast(schema->fields()->size()); field_names.reserve(size); fields_nullable.reserve(size); fields_metadata.reserve(size); - + if (schema->fields() == nullptr) + { + break; + } for (const auto field : *(schema->fields())) { - if(field != nullptr && field->name() != nullptr) + if (field != nullptr && field->name() != nullptr) { - field_names.emplace_back(field->name()->str()); + field_names.emplace_back(field->name()->str()); } - else { + else + { field_names.emplace_back("_unnamed_"); } fields_nullable.push_back(field->nullable()); @@ -257,7 +268,8 @@ namespace sparrow_ipc encapsulated_message, fields_metadata ); - auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed + auto names_copy = field_names; // TODO: Remove when issue with the to_vector of + // record_batch is fixed sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays)); record_batches.emplace_back(std::move(sp_record_batch)); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 11c2f9f..bff0af2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -11,6 +11,7 @@ set(SPARROW_IPC_TESTS_SRC test_chunk_memory_output_stream.cpp test_chunk_memory_serializer.cpp test_de_serialization_with_files.cpp + test_deserializer.cpp $<$>:test_flatbuffer_utils.cpp> test_memory_output_streams.cpp test_serialize_utils.cpp diff --git a/tests/test_deserializer.cpp b/tests/test_deserializer.cpp new file mode 100644 index 0000000..b12176c --- /dev/null +++ b/tests/test_deserializer.cpp @@ -0,0 +1,496 @@ +#include +#include +#include +#include + +#include +#include + +#include "sparrow_ipc/deserializer.hpp" +#include "sparrow_ipc/memory_output_stream.hpp" +#include "sparrow_ipc/serializer.hpp" +#include "sparrow_ipc_tests_helpers.hpp" + +namespace sparrow_ipc +{ + namespace sp = sparrow; + + // Helper function to serialize record batches to a byte buffer + std::vector serialize_record_batches(const std::vector& batches) + { + std::vector buffer; + memory_output_stream stream(buffer); + serializer ser(stream); + ser << batches << end_stream; + return buffer; + } + + // Helper function to create multiple compatible record batches + std::vector create_test_record_batches(size_t count) + { + std::vector batches; + for (size_t i = 0; i < count; ++i) + { + auto int_array = sp::primitive_array({static_cast(i * 10), + static_cast(i * 10 + 1), + static_cast(i * 10 + 2)}); + auto string_array = sp::string_array( + std::vector{"batch_" + std::to_string(i) + "_a", + "batch_" + std::to_string(i) + "_b", + "batch_" + std::to_string(i) + "_c"} + ); + batches.push_back(sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}})); + } + return batches; + } + + TEST_SUITE("deserializer") + { + TEST_CASE("construction with empty vector") + { + SUBCASE("Construct with empty vector reference") + { + std::vector batches; + deserializer deser(batches); + CHECK_EQ(batches.size(), 0); + } + } + + TEST_CASE("deserialize single record batch") + { + SUBCASE("Deserialize one batch into empty vector") + { + std::vector batches; + deserializer deser(batches); + + // Create and serialize a single record batch + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(batches[0].nb_rows(), original_batch.nb_rows()); + } + + SUBCASE("Deserialize batch with different data types") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({1, 2, 3}); + auto double_array = sp::primitive_array({1.5, 2.5, 3.5}); + auto float_array = sp::primitive_array({1.0f, 2.0f, 3.0f}); + + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"double_col", sp::array(std::move(double_array))}, + {"float_col", sp::array(std::move(float_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 3); + CHECK_EQ(batches[0].nb_rows(), 3); + } + + SUBCASE("Deserialize empty record batch") + { + std::vector batches; + deserializer deser(batches); + + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + } + + TEST_CASE("deserialize multiple record batches") + { + SUBCASE("Deserialize multiple batches at once") + { + std::vector batches; + deserializer deser(batches); + + // Create multiple compatible batches + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 3); + for (size_t i = 0; i < batches.size(); ++i) + { + CHECK_EQ(batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(batches[i].nb_rows(), original_batches[i].nb_rows()); + } + } + + SUBCASE("Deserialize large number of batches") + { + std::vector batches; + deserializer deser(batches); + + const size_t num_batches = 100; + auto original_batches = create_test_record_batches(num_batches); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), num_batches); + } + } + + TEST_CASE("incremental deserialization") + { + SUBCASE("Deserialize in multiple calls") + { + std::vector batches; + deserializer deser(batches); + + // First deserialization + auto batch1 = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 2); + + // Second deserialization - should append to existing batches + auto batch2 = create_test_record_batches(3); + auto serialized_data2 = serialize_record_batches(batch2); + deser.deserialize(std::span(serialized_data2)); + + CHECK_EQ(batches.size(), 5); + } + + SUBCASE("Multiple incremental deserializations") + { + std::vector batches; + deserializer deser(batches); + + for (size_t i = 0; i < 5; ++i) + { + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), (i + 1) * 2); + } + } + + SUBCASE("Deserialize into non-empty vector") + { + // Start with existing batches + std::vector batches = {create_test_record_batch()}; + CHECK_EQ(batches.size(), 1); + + deserializer deser(batches); + + // Add more batches + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("operator<< for deserialization") + { + SUBCASE("Single deserialization with <<") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser << std::span(serialized_data); + + REQUIRE_EQ(batches.size(), 1); + } + + SUBCASE("Chain multiple deserializations with <<") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + + auto batch3 = create_test_record_batches(1); + auto serialized_data3 = serialize_record_batches(batch3); + + deser << std::span(serialized_data1) + << std::span(serialized_data2) + << std::span(serialized_data3); + + CHECK_EQ(batches.size(), 4); + } + + SUBCASE("Mix deserialization methods") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + deser << std::span(serialized_data2); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("deserialize with different container types") + { + SUBCASE("std::deque") + { + std::deque batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 2); + } + + SUBCASE("std::list") + { + std::list batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 3); + } + } + + TEST_CASE("round-trip serialization and deserialization") + { + SUBCASE("Single batch round-trip") + { + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), 1); + CHECK_EQ(deserialized_batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(deserialized_batches[0].nb_rows(), original_batch.nb_rows()); + + // Verify column names match + for (size_t i = 0; i < original_batch.nb_columns(); ++i) + { + CHECK_EQ(deserialized_batches[0].names()[i], original_batch.names()[i]); + } + } + + SUBCASE("Multiple batches round-trip") + { + auto original_batches = create_test_record_batches(5); + auto serialized_data = serialize_record_batches(original_batches); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), original_batches.size()); + + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches[i].nb_rows(), original_batches[i].nb_rows()); + } + } + + SUBCASE("Double round-trip") + { + // First round-trip + auto original_batches = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(original_batches); + + std::vector deserialized_batches1; + deserializer deser1(deserialized_batches1); + deser1.deserialize(std::span(serialized_data1)); + + // Second round-trip + auto serialized_data2 = serialize_record_batches(deserialized_batches1); + + std::vector deserialized_batches2; + deserializer deser2(deserialized_batches2); + deser2.deserialize(std::span(serialized_data2)); + + // Verify both results match + REQUIRE_EQ(deserialized_batches2.size(), original_batches.size()); + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches2[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches2[i].nb_rows(), original_batches[i].nb_rows()); + } + } + } + + TEST_CASE("deserialize with complex data types") + { + SUBCASE("Mixed primitive types") + { + std::vector batches; + deserializer deser(batches); + + auto int8_array = sp::primitive_array({1, 2, 3}); + auto int16_array = sp::primitive_array({100, 200, 300}); + auto int32_array = sp::primitive_array({1000, 2000, 3000}); + auto int64_array = sp::primitive_array({10000, 20000, 30000}); + + auto rb = sp::record_batch({{"int8_col", sp::array(std::move(int8_array))}, + {"int16_col", sp::array(std::move(int16_array))}, + {"int32_col", sp::array(std::move(int32_array))}, + {"int64_col", sp::array(std::move(int64_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 4); + } + + SUBCASE("String arrays") + { + std::vector batches; + deserializer deser(batches); + + auto string_array = sp::string_array( + std::vector{"hello", "world", "test", "data"} + ); + auto rb = sp::record_batch({{"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 4); + } + } + + TEST_CASE("edge cases") + { + SUBCASE("Deserialize empty data") + { + std::vector batches; + deserializer deser(batches); + + // Create an empty batch + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + + SUBCASE("Very large batch") + { + std::vector batches; + deserializer deser(batches); + + // Create a large array + std::vector large_data(10000); + std::iota(large_data.begin(), large_data.end(), 0); + auto large_array = sp::primitive_array(large_data); + auto rb = sp::record_batch({{"large_col", sp::array(std::move(large_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 10000); + } + + SUBCASE("Single row batches") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({42}); + auto string_array = sp::string_array(std::vector{"single"}); + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 1); + } + } + + TEST_CASE("workflow example") + { + SUBCASE("Typical streaming deserialization workflow") + { + std::vector batches; + deserializer deser(batches); + + // Simulate receiving data in chunks + for (size_t i = 0; i < 3; ++i) + { + auto chunk_batches = create_test_record_batches(2); + auto serialized_chunk = serialize_record_batches(chunk_batches); + deser << std::span(serialized_chunk); + } + + // Verify all batches accumulated + CHECK_EQ(batches.size(), 6); + + // Add one more batch using deserialize method + auto final_batch = create_test_record_batches(1); + auto serialized_final = serialize_record_batches(final_batch); + deser.deserialize(std::span(serialized_final)); + + CHECK_EQ(batches.size(), 7); + } + } + + TEST_CASE("deserializer with const container") + { + SUBCASE("Works with non-const reference") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 1); + } + } + } +} From 876d5555449b122e6df1e09f3b16ed33a0f61e78 Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Tue, 21 Oct 2025 16:22:05 +0200 Subject: [PATCH 2/7] fix --- include/sparrow_ipc/deserializer.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/sparrow_ipc/deserializer.hpp b/include/sparrow_ipc/deserializer.hpp index c2cc319..28738b4 100644 --- a/include/sparrow_ipc/deserializer.hpp +++ b/include/sparrow_ipc/deserializer.hpp @@ -14,7 +14,7 @@ namespace sparrow_ipc { template requires std::same_as, sparrow::record_batch> - class SPARROW_IPC_API deserializer + class deserializer { public: From 5293ca094028b6b4e3db29f7b496da6404dca00d Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Fri, 21 Nov 2025 10:07:18 +0100 Subject: [PATCH 3/7] add record batch content comparison in tests --- tests/test_deserializer.cpp | 184 ++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/tests/test_deserializer.cpp b/tests/test_deserializer.cpp index b12176c..2600e60 100644 --- a/tests/test_deserializer.cpp +++ b/tests/test_deserializer.cpp @@ -45,6 +45,68 @@ namespace sparrow_ipc return batches; } + // Helper function to verify test_record_batch content (from create_test_record_batch) + void verify_test_record_batch_content(const sp::record_batch& batch) + { + CHECK_EQ(batch.nb_columns(), 2); + CHECK_EQ(batch.nb_rows(), 5); + + // Verify int column content + const auto& int_col = batch.get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + CHECK_EQ(impl[3].value(), 4); + CHECK_EQ(impl[4].value(), 5); + } + }); + + // Verify string column content + const auto& string_col = batch.get_column(1); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "hello"); + CHECK_EQ(impl[1].value(), "world"); + CHECK_EQ(impl[2].value(), "test"); + CHECK_EQ(impl[3].value(), "data"); + CHECK_EQ(impl[4].value(), "batch"); + } + }); + } + + // Helper function to verify content of batches created by create_test_record_batches + void verify_test_record_batches_content(const sp::record_batch& batch, size_t batch_index) + { + CHECK_EQ(batch.nb_columns(), 2); + CHECK_EQ(batch.nb_rows(), 3); + + // Verify int column content + const auto& int_col = batch.get_column(0); + int_col.visit([batch_index](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), static_cast(batch_index * 10)); + CHECK_EQ(impl[1].value(), static_cast(batch_index * 10 + 1)); + CHECK_EQ(impl[2].value(), static_cast(batch_index * 10 + 2)); + } + }); + + // Verify string column content + const auto& string_col = batch.get_column(1); + string_col.visit([batch_index](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "batch_" + std::to_string(batch_index) + "_a"); + CHECK_EQ(impl[1].value(), "batch_" + std::to_string(batch_index) + "_b"); + CHECK_EQ(impl[2].value(), "batch_" + std::to_string(batch_index) + "_c"); + } + }); + } + TEST_SUITE("deserializer") { TEST_CASE("construction with empty vector") @@ -75,6 +137,7 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_columns(), original_batch.nb_columns()); CHECK_EQ(batches[0].nb_rows(), original_batch.nb_rows()); + verify_test_record_batch_content(batches[0]); } SUBCASE("Deserialize batch with different data types") @@ -96,6 +159,39 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_columns(), 3); CHECK_EQ(batches[0].nb_rows(), 3); + + // Verify int column content + const auto& int_col = batches[0].get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + } + }); + + // Verify double column content + const auto& double_col = batches[0].get_column(1); + double_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), doctest::Approx(1.5)); + CHECK_EQ(impl[1].value(), doctest::Approx(2.5)); + CHECK_EQ(impl[2].value(), doctest::Approx(3.5)); + } + }); + + // Verify float column content + const auto& float_col = batches[0].get_column(2); + float_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), doctest::Approx(1.0f)); + CHECK_EQ(impl[1].value(), doctest::Approx(2.0f)); + CHECK_EQ(impl[2].value(), doctest::Approx(3.0f)); + } + }); } SUBCASE("Deserialize empty record batch") @@ -132,6 +228,7 @@ namespace sparrow_ipc { CHECK_EQ(batches[i].nb_columns(), original_batches[i].nb_columns()); CHECK_EQ(batches[i].nb_rows(), original_batches[i].nb_rows()); + verify_test_record_batches_content(batches[i], i); } } @@ -308,6 +405,7 @@ namespace sparrow_ipc { CHECK_EQ(deserialized_batches[0].names()[i], original_batch.names()[i]); } + verify_test_record_batch_content(deserialized_batches[0]); } SUBCASE("Multiple batches round-trip") @@ -377,6 +475,50 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_columns(), 4); + + // Verify int8 column + const auto& int8_col = batches[0].get_column(0); + int8_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + } + }); + + // Verify int16 column + const auto& int16_col = batches[0].get_column(1); + int16_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 100); + CHECK_EQ(impl[1].value(), 200); + CHECK_EQ(impl[2].value(), 300); + } + }); + + // Verify int32 column + const auto& int32_col = batches[0].get_column(2); + int32_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1000); + CHECK_EQ(impl[1].value(), 2000); + CHECK_EQ(impl[2].value(), 3000); + } + }); + + // Verify int64 column + const auto& int64_col = batches[0].get_column(3); + int64_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 10000); + CHECK_EQ(impl[1].value(), 20000); + CHECK_EQ(impl[2].value(), 30000); + } + }); } SUBCASE("String arrays") @@ -394,6 +536,18 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_rows(), 4); + + // Verify string column content + const auto& string_col = batches[0].get_column(0); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "hello"); + CHECK_EQ(impl[1].value(), "world"); + CHECK_EQ(impl[2].value(), "test"); + CHECK_EQ(impl[3].value(), "data"); + } + }); } } @@ -430,6 +584,19 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_rows(), 10000); + + // Verify some sample values + const auto& col = batches[0].get_column(0); + col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 0); + CHECK_EQ(impl[100].value(), 100); + CHECK_EQ(impl[1000].value(), 1000); + CHECK_EQ(impl[5000].value(), 5000); + CHECK_EQ(impl[9999].value(), 9999); + } + }); } SUBCASE("Single row batches") @@ -447,6 +614,23 @@ namespace sparrow_ipc REQUIRE_EQ(batches.size(), 1); CHECK_EQ(batches[0].nb_rows(), 1); + + // Verify content + const auto& int_col = batches[0].get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 42); + } + }); + + const auto& string_col = batches[0].get_column(1); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "single"); + } + }); } } From 79fb000525b36bef7a1f7ff41e8c89b09cd3f020 Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Fri, 21 Nov 2025 10:08:41 +0100 Subject: [PATCH 4/7] improve documentation --- README.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ffc0fb9..5106df5 100644 --- a/README.md +++ b/README.md @@ -107,9 +107,11 @@ void stream_record_batches(std::ostream& os, record_batch_source& source) ### Deserialize a stream into record batches +#### Using the function API + ```cpp #include -#include +#include #include namespace sp = sparrow; @@ -122,6 +124,36 @@ std::vector deserialize_stream_to_batches(const std::vector +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserialize_incremental_stream(const std::vector>& stream_chunks) +{ + std::vector batches; + sp_ipc::deserializer deser(batches); + + // Deserialize chunks incrementally as they arrive + for (const auto& chunk : stream_chunks) + { + deser << std::span(chunk); + } + + // Process accumulated batches + for (const auto& batch : batches) + { + // Process each batch... + } +} +``` + ## Documentation The documentation (currently being written) can be found at https://quantstack.github.io/sparrow-ipc/index.html From f88e168584b53d3201a50d40569694d02b837f16 Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Thu, 4 Dec 2025 10:41:50 +0100 Subject: [PATCH 5/7] Add examples --- .github/workflows/linux.yml | 16 +++ .github/workflows/osx.yml | 16 +++ .github/workflows/windows.yml | 16 +++ README.md | 54 ++++++-- examples/CMakeLists.txt | 56 ++++++++- examples/deserializer_example.cpp | 199 ++++++++++++++++++++++++++++++ 6 files changed, 347 insertions(+), 10 deletions(-) create mode 100644 examples/deserializer_example.cpp diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 8a5fc6c..c9d15d4 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -59,6 +59,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . @@ -103,6 +111,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml index 15dfab0..9517ded 100644 --- a/.github/workflows/osx.yml +++ b/.github/workflows/osx.yml @@ -64,6 +64,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . @@ -113,6 +121,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 50b0f58..0886697 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -64,6 +64,14 @@ jobs: working-directory: build run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} @@ -113,6 +121,14 @@ jobs: working-directory: build run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} diff --git a/README.md b/README.md index 5106df5..8dfbf10 100644 --- a/README.md +++ b/README.md @@ -117,8 +117,9 @@ void stream_record_batches(std::ostream& os, record_batch_source& source) namespace sp = sparrow; namespace sp_ipc = sparrow_ipc; -std::vector deserialize_stream_to_batches(const std::vector& stream_data) +std::vector deserialize_stream_example(const std::vector& stream_data) { + // Deserialize the entire stream at once auto batches = sp_ipc::deserialize_stream(stream_data); return batches; } @@ -126,7 +127,10 @@ std::vector deserialize_stream_to_batches(const std::vector #include #include #include @@ -135,22 +139,56 @@ std::vector deserialize_stream_to_batches(const std::vector>& stream_chunks) +void deserializer_basic_example(const std::vector& stream_data) { + // Create a container to hold the deserialized batches std::vector batches; + + // Create a deserializer that will append to our container sp_ipc::deserializer deser(batches); - // Deserialize chunks incrementally as they arrive - for (const auto& chunk : stream_chunks) + // Deserialize the stream data + deser.deserialize(std::span(stream_data)); + + // Process the accumulated batches + for (const auto& batch : batches) { - deser << std::span(chunk); + std::cout << "Batch with " << batch.nb_rows() << " rows and " << batch.nb_columns() << " columns\n"; } +} +``` - // Process accumulated batches - for (const auto& batch : batches) +#### Incremental deserialization + +The deserializer class is particularly useful for streaming scenarios where data arrives in chunks: + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_incremental_example(const std::vector>& stream_chunks) +{ + // Container to accumulate all deserialized batches + std::vector batches; + + // Create a deserializer + sp_ipc::deserializer deser(batches); + + // Deserialize chunks as they arrive using the streaming operator + for (const auto& chunk : stream_chunks) { - // Process each batch... + deser << std::span(chunk); + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; } + + // All batches are now available in the container + std::cout << "Total batches deserialized: " << batches.size() << "\n"; } ``` diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index c7efee0..d78a52c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -11,8 +11,18 @@ target_link_libraries(write_and_read_streams arrow-testing-data ) +# Create executable for the deserializer_example +add_executable(deserializer_example deserializer_example.cpp) + +# Link against sparrow-ipc and its dependencies +target_link_libraries(deserializer_example + PRIVATE + sparrow-ipc + sparrow::sparrow +) + # Set C++ standard to match the main project -set_target_properties(write_and_read_streams +set_target_properties(write_and_read_streams deserializer_example PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON @@ -26,8 +36,15 @@ target_include_directories(write_and_read_streams ${CMAKE_BINARY_DIR}/generated ) +target_include_directories(deserializer_example + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + # Ensure generated flatbuffer headers are available add_dependencies(write_and_read_streams generate_flatbuffers_headers) +add_dependencies(deserializer_example generate_flatbuffers_headers) # Optional: Copy to build directory for easy execution if(WIN32) @@ -38,7 +55,7 @@ if(WIN32) set(ZSTD_DLL_TARGET libzstd_static) endif() - # On Windows, copy required DLLs + # On Windows, copy required DLLs for write_and_read_streams set(DLL_COPY_COMMANDS "") # Initialize a list to hold all copy commands # Add unconditional copy commands list(APPEND DLL_COPY_COMMANDS @@ -66,6 +83,31 @@ if(WIN32) ${DLL_COPY_COMMANDS} COMMENT "Copying required DLLs to example executable directory" ) + + # On Windows, copy required DLLs for deserializer_example + set(DLL_COPY_COMMANDS_DESER "") # Initialize a list to hold all copy commands + list(APPEND DLL_COPY_COMMANDS_DESER + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + ) + + if(ZSTD_DLL_TARGET) + list(APPEND DLL_COPY_COMMANDS_DESER + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + ) + endif() + + add_custom_command( + TARGET deserializer_example POST_BUILD + ${DLL_COPY_COMMANDS_DESER} + COMMENT "Copying required DLLs to deserializer_example executable directory" + ) endif() # Create a custom target to easily run the example @@ -77,3 +119,13 @@ add_custom_target(run_example ) set_target_properties(run_example PROPERTIES FOLDER "Examples") + +# Create a custom target to run the deserializer example +add_custom_target(run_deserializer_example + COMMAND deserializer_example + DEPENDS deserializer_example + COMMENT "Running deserializer_example" + USES_TERMINAL +) + +set_target_properties(run_deserializer_example PROPERTIES FOLDER "Examples") diff --git a/examples/deserializer_example.cpp b/examples/deserializer_example.cpp new file mode 100644 index 0000000..28499f8 --- /dev/null +++ b/examples/deserializer_example.cpp @@ -0,0 +1,199 @@ +/** + * @file deserializer_example.cpp + * @brief Examples demonstrating deserialization of Arrow IPC streams using sparrow-ipc + * + * This file shows different ways to deserialize Arrow IPC streams: + * 1. Using the function API (deserialize_stream) + * 2. Using the deserializer class for incremental deserialization + */ + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +/** + * Helper function to create sample record batches for demonstration + */ +std::vector create_sample_batches(size_t count) +{ + std::vector batches; + for (size_t i = 0; i < count; ++i) + { + auto int_array = sp::primitive_array( + {static_cast(i * 10), static_cast(i * 10 + 1), static_cast(i * 10 + 2)} + ); + auto string_array = sp::string_array(std::vector{ + "batch_" + std::to_string(i) + "_a", + "batch_" + std::to_string(i) + "_b", + "batch_" + std::to_string(i) + "_c" + }); + batches.push_back(sp::record_batch( + {{"id", sp::array(std::move(int_array))}, {"name", sp::array(std::move(string_array))}} + )); + } + return batches; +} + +/** + * Helper function to serialize batches to a byte buffer + */ +std::vector serialize_batches(const std::vector& batches) +{ + std::vector buffer; + sp_ipc::memory_output_stream stream(buffer); + sp_ipc::serializer ser(stream); + ser << batches << sp_ipc::end_stream; + return buffer; +} + +// [example_deserialize_stream] +/** + * Example: Deserialize a stream using the function API + * + * This is the simplest way to deserialize an Arrow IPC stream. + * Use this when you have the complete stream data available. + */ +std::vector deserialize_stream_example(const std::vector& stream_data) +{ + // Deserialize the entire stream at once + auto batches = sp_ipc::deserialize_stream(stream_data); + return batches; +} + +// [example_deserialize_stream] + +// [example_deserializer_basic] +/** + * Example: Basic usage of the deserializer class + * + * The deserializer class allows you to accumulate record batches + * into an existing container as you deserialize data. + */ +void deserializer_basic_example(const std::vector& stream_data) +{ + // Create a container to hold the deserialized batches + std::vector batches; + + // Create a deserializer that will append to our container + sp_ipc::deserializer deser(batches); + + // Deserialize the stream data + deser.deserialize(std::span(stream_data)); + + // Process the accumulated batches + for (const auto& batch : batches) + { + std::cout << "Batch with " << batch.nb_rows() << " rows and " << batch.nb_columns() << " columns\n"; + } +} + +// [example_deserializer_basic] + +// [example_deserializer_incremental] +/** + * Example: Incremental deserialization with the deserializer class + * + * This example shows how to deserialize data incrementally as it arrives, + * which is useful for streaming scenarios where data comes in chunks. + */ +void deserializer_incremental_example(const std::vector>& stream_chunks) +{ + // Container to accumulate all deserialized batches + std::vector batches; + + // Create a deserializer + sp_ipc::deserializer deser(batches); + + // Deserialize chunks as they arrive using the streaming operator + for (const auto& chunk : stream_chunks) + { + deser << std::span(chunk); + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; + } + + // All batches are now available in the container + std::cout << "Total batches deserialized: " << batches.size() << "\n"; +} + +// [example_deserializer_incremental] + +// [example_deserializer_chaining] +/** + * Example: Chaining multiple deserializations + * + * The streaming operator can be chained for fluent API usage. + */ +void deserializer_chaining_example( + const std::vector& chunk1, + const std::vector& chunk2, + const std::vector& chunk3 +) +{ + std::vector batches; + sp_ipc::deserializer deser(batches); + + // Chain multiple deserializations in a single expression + deser << std::span(chunk1) << std::span(chunk2) + << std::span(chunk3); + + std::cout << "Deserialized " << batches.size() << " batches from 3 chunks\n"; +} + +// [example_deserializer_chaining] + +int main() +{ + std::cout << "=== Sparrow IPC Deserializer Examples ===\n\n"; + + try + { + // Create sample data + auto original_batches = create_sample_batches(3); + auto stream_data = serialize_batches(original_batches); + + std::cout << "1. Function API Example (deserialize_stream)\n"; + std::cout << " ----------------------------------------\n"; + auto deserialized = deserialize_stream_example(stream_data); + std::cout << " Deserialized " << deserialized.size() << " batches\n\n"; + + std::cout << "2. Basic Deserializer Class Example\n"; + std::cout << " ---------------------------------\n"; + deserializer_basic_example(stream_data); + std::cout << "\n"; + + std::cout << "3. Incremental Deserialization Example\n"; + std::cout << " ------------------------------------\n"; + // Create multiple chunks (each containing different batches) + std::vector> chunks; + for (size_t i = 0; i < 3; ++i) + { + auto batch = create_sample_batches(1); + chunks.push_back(serialize_batches(batch)); + } + deserializer_incremental_example(chunks); + std::cout << "\n"; + + std::cout << "4. Chaining Example\n"; + std::cout << " -----------------\n"; + deserializer_chaining_example(chunks[0], chunks[1], chunks[2]); + + std::cout << "\n=== All examples completed successfully! ===\n"; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return 1; + } + + return 0; +} From 115d95bf478d828a102e29889e19492d5d4d0d5b Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Thu, 4 Dec 2025 10:48:42 +0100 Subject: [PATCH 6/7] Add documentation --- docs/source/main_page.md | 38 +++++ docs/source/serialization.md | 311 +++++++++++++++++++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 docs/source/serialization.md diff --git a/docs/source/main_page.md b/docs/source/main_page.md index f5109d6..53db3d3 100644 --- a/docs/source/main_page.md +++ b/docs/source/main_page.md @@ -21,3 +21,41 @@ Sparrow-IPC requires a modern C++ compiler supporting C++20: | MSVC | 19.41 or higher | This software is licensed under the BSD-3-Clause license. See the [LICENSE](https://github.com/QuantStack/sparrow-ipc/blob/main/LICENSE) file for details. + +Getting Started +--------------- + +### Quick Example + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +// Serialize record batches +std::vector serialize(const std::vector& batches) +{ + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + serializer << batches << sp_ipc::end_stream; + return stream_data; +} + +// Deserialize record batches +std::vector deserialize(const std::vector& stream_data) +{ + return sp_ipc::deserialize_stream(stream_data); +} +``` + +Documentation +------------- + +- @ref serialization "Serialization and Deserialization" - How to serialize and deserialize record batches +- @ref dev_build "Development Build" - How to build the project for development diff --git a/docs/source/serialization.md b/docs/source/serialization.md new file mode 100644 index 0000000..65e47f4 --- /dev/null +++ b/docs/source/serialization.md @@ -0,0 +1,311 @@ +# Serialization and Deserialization {#serialization} + +This page describes how to serialize and deserialize record batches using `sparrow-ipc`. + +## Overview + +`sparrow-ipc` provides two main approaches for both serialization and deserialization: + +- **Function API**: Simple one-shot operations for serializing/deserializing complete data +- **Class API**: Streaming-oriented classes (`serializer` and `deserializer`) for incremental operations + +## Serialization + +### Serialize record batches to a memory stream + +The simplest way to serialize record batches is to use the `serializer` class with a `memory_output_stream`: + +```cpp +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +std::vector serialize_batches_to_stream(const std::vector& batches) +{ + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + + // Serialize all batches using the streaming operator + serializer << batches << sp_ipc::end_stream; + + return stream_data; +} +``` + +### Serialize individual record batches + +You can also serialize record batches one at a time: + +```cpp +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +std::vector serialize_batches_individually(const std::vector& batches) +{ + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + + // Serialize batches one by one + for (const auto& batch : batches) + { + serializer << batch; + } + + // Don't forget to end the stream + serializer << sp_ipc::end_stream; + + return stream_data; +} +``` + +### Pipe a source of record batches to a stream + +For streaming scenarios where batches are generated on-the-fly: + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +class record_batch_source +{ +public: + std::optional next(); +}; + +std::vector stream_from_source(record_batch_source& source) +{ + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + + std::optional batch; + while ((batch = source.next())) + { + serializer << *batch; + } + serializer << sp_ipc::end_stream; + + return stream_data; +} +``` + +## Deserialization + +### Using the function API + +The simplest way to deserialize a complete Arrow IPC stream is using `deserialize_stream`: + +```cpp +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +std::vector deserialize_stream_example(const std::vector& stream_data) +{ + // Deserialize the entire stream at once + auto batches = sp_ipc::deserialize_stream(stream_data); + return batches; +} +``` + +### Using the deserializer class + +The `deserializer` class provides more control over deserialization and is useful when you want to: +- Accumulate batches into an existing container +- Deserialize data incrementally as it arrives +- Process multiple streams into a single container + +#### Basic usage + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_basic_example(const std::vector& stream_data) +{ + // Create a container to hold the deserialized batches + std::vector batches; + + // Create a deserializer that will append to our container + sp_ipc::deserializer deser(batches); + + // Deserialize the stream data + deser.deserialize(std::span(stream_data)); + + // Process the accumulated batches + for (const auto& batch : batches) + { + std::cout << "Batch with " << batch.nb_rows() << " rows and " + << batch.nb_columns() << " columns\n"; + } +} +``` + +#### Incremental deserialization + +The `deserializer` class is particularly useful for streaming scenarios where data arrives in chunks: + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_incremental_example(const std::vector>& stream_chunks) +{ + // Container to accumulate all deserialized batches + std::vector batches; + + // Create a deserializer + sp_ipc::deserializer deser(batches); + + // Deserialize chunks as they arrive using the streaming operator + for (const auto& chunk : stream_chunks) + { + deser << std::span(chunk); + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; + } + + // All batches are now available in the container + std::cout << "Total batches deserialized: " << batches.size() << "\n"; +} +``` + +#### Chaining deserializations + +The streaming operator can be chained for fluent API usage: + +```cpp +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_chaining_example( + const std::vector& chunk1, + const std::vector& chunk2, + const std::vector& chunk3) +{ + std::vector batches; + sp_ipc::deserializer deser(batches); + + // Chain multiple deserializations in a single expression + deser << std::span(chunk1) + << std::span(chunk2) + << std::span(chunk3); +} +``` + +### Using different container types + +The `deserializer` class works with any container that satisfies `std::ranges::input_range` and supports `insert` at the end: + +```cpp +#include +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void different_containers_example(const std::vector& stream_data) +{ + // Using std::deque + std::deque deque_batches; + sp_ipc::deserializer deser_deque(deque_batches); + deser_deque.deserialize(std::span(stream_data)); + + // Using std::list + std::list list_batches; + sp_ipc::deserializer deser_list(list_batches); + deser_list.deserialize(std::span(stream_data)); +} +``` + +## Round-trip example + +Here's a complete example showing serialization and deserialization: + +```cpp +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void round_trip_example() +{ + // Create sample data + auto int_array = sp::primitive_array({1, 2, 3, 4, 5}); + auto string_array = sp::string_array( + std::vector{"hello", "world", "test", "data", "example"} + ); + + sp::record_batch original_batch( + {{"id", sp::array(std::move(int_array))}, + {"name", sp::array(std::move(string_array))}} + ); + + // Serialize + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + serializer << original_batch << sp_ipc::end_stream; + + // Deserialize using function API + auto deserialized_batches = sp_ipc::deserialize_stream(stream_data); + + assert(deserialized_batches.size() == 1); + assert(deserialized_batches[0].nb_rows() == original_batch.nb_rows()); + assert(deserialized_batches[0].nb_columns() == original_batch.nb_columns()); + + // Or using deserializer class + std::vector batches; + sp_ipc::deserializer deser(batches); + deser << std::span(stream_data); + + assert(batches.size() == 1); +} +``` From 516ef3f699e88c42d0c8030693bca24fd9cae905 Mon Sep 17 00:00:00 2001 From: Alexis Placet Date: Thu, 4 Dec 2025 11:01:43 +0100 Subject: [PATCH 7/7] Improve documentation --- docs/source/serialization.md | 268 +--------------------------- examples/write_and_read_streams.cpp | 6 + 2 files changed, 12 insertions(+), 262 deletions(-) diff --git a/docs/source/serialization.md b/docs/source/serialization.md index 65e47f4..3763f58 100644 --- a/docs/source/serialization.md +++ b/docs/source/serialization.md @@ -15,96 +15,13 @@ This page describes how to serialize and deserialize record batches using `sparr The simplest way to serialize record batches is to use the `serializer` class with a `memory_output_stream`: -```cpp -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -std::vector serialize_batches_to_stream(const std::vector& batches) -{ - std::vector stream_data; - sp_ipc::memory_output_stream stream(stream_data); - sp_ipc::serializer serializer(stream); - - // Serialize all batches using the streaming operator - serializer << batches << sp_ipc::end_stream; - - return stream_data; -} -``` +\snippet write_and_read_streams.cpp example_serialize_to_stream ### Serialize individual record batches You can also serialize record batches one at a time: -```cpp -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -std::vector serialize_batches_individually(const std::vector& batches) -{ - std::vector stream_data; - sp_ipc::memory_output_stream stream(stream_data); - sp_ipc::serializer serializer(stream); - - // Serialize batches one by one - for (const auto& batch : batches) - { - serializer << batch; - } - - // Don't forget to end the stream - serializer << sp_ipc::end_stream; - - return stream_data; -} -``` - -### Pipe a source of record batches to a stream - -For streaming scenarios where batches are generated on-the-fly: - -```cpp -#include -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -class record_batch_source -{ -public: - std::optional next(); -}; - -std::vector stream_from_source(record_batch_source& source) -{ - std::vector stream_data; - sp_ipc::memory_output_stream stream(stream_data); - sp_ipc::serializer serializer(stream); - - std::optional batch; - while ((batch = source.next())) - { - serializer << *batch; - } - serializer << sp_ipc::end_stream; - - return stream_data; -} -``` +\snippet write_and_read_streams.cpp example_serialize_individual ## Deserialization @@ -112,21 +29,7 @@ std::vector stream_from_source(record_batch_source& source) The simplest way to deserialize a complete Arrow IPC stream is using `deserialize_stream`: -```cpp -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -std::vector deserialize_stream_example(const std::vector& stream_data) -{ - // Deserialize the entire stream at once - auto batches = sp_ipc::deserialize_stream(stream_data); - return batches; -} -``` +\snippet deserializer_example.cpp example_deserialize_stream ### Using the deserializer class @@ -137,175 +40,16 @@ The `deserializer` class provides more control over deserialization and is usefu #### Basic usage -```cpp -#include -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -void deserializer_basic_example(const std::vector& stream_data) -{ - // Create a container to hold the deserialized batches - std::vector batches; - - // Create a deserializer that will append to our container - sp_ipc::deserializer deser(batches); - - // Deserialize the stream data - deser.deserialize(std::span(stream_data)); - - // Process the accumulated batches - for (const auto& batch : batches) - { - std::cout << "Batch with " << batch.nb_rows() << " rows and " - << batch.nb_columns() << " columns\n"; - } -} -``` +\snippet deserializer_example.cpp example_deserializer_basic #### Incremental deserialization The `deserializer` class is particularly useful for streaming scenarios where data arrives in chunks: -```cpp -#include -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -void deserializer_incremental_example(const std::vector>& stream_chunks) -{ - // Container to accumulate all deserialized batches - std::vector batches; - - // Create a deserializer - sp_ipc::deserializer deser(batches); - - // Deserialize chunks as they arrive using the streaming operator - for (const auto& chunk : stream_chunks) - { - deser << std::span(chunk); - std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; - } - - // All batches are now available in the container - std::cout << "Total batches deserialized: " << batches.size() << "\n"; -} -``` +\snippet deserializer_example.cpp example_deserializer_incremental #### Chaining deserializations The streaming operator can be chained for fluent API usage: -```cpp -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -void deserializer_chaining_example( - const std::vector& chunk1, - const std::vector& chunk2, - const std::vector& chunk3) -{ - std::vector batches; - sp_ipc::deserializer deser(batches); - - // Chain multiple deserializations in a single expression - deser << std::span(chunk1) - << std::span(chunk2) - << std::span(chunk3); -} -``` - -### Using different container types - -The `deserializer` class works with any container that satisfies `std::ranges::input_range` and supports `insert` at the end: - -```cpp -#include -#include -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -void different_containers_example(const std::vector& stream_data) -{ - // Using std::deque - std::deque deque_batches; - sp_ipc::deserializer deser_deque(deque_batches); - deser_deque.deserialize(std::span(stream_data)); - - // Using std::list - std::list list_batches; - sp_ipc::deserializer deser_list(list_batches); - deser_list.deserialize(std::span(stream_data)); -} -``` - -## Round-trip example - -Here's a complete example showing serialization and deserialization: - -```cpp -#include -#include -#include -#include -#include -#include -#include -#include - -namespace sp = sparrow; -namespace sp_ipc = sparrow_ipc; - -void round_trip_example() -{ - // Create sample data - auto int_array = sp::primitive_array({1, 2, 3, 4, 5}); - auto string_array = sp::string_array( - std::vector{"hello", "world", "test", "data", "example"} - ); - - sp::record_batch original_batch( - {{"id", sp::array(std::move(int_array))}, - {"name", sp::array(std::move(string_array))}} - ); - - // Serialize - std::vector stream_data; - sp_ipc::memory_output_stream stream(stream_data); - sp_ipc::serializer serializer(stream); - serializer << original_batch << sp_ipc::end_stream; - - // Deserialize using function API - auto deserialized_batches = sp_ipc::deserialize_stream(stream_data); - - assert(deserialized_batches.size() == 1); - assert(deserialized_batches[0].nb_rows() == original_batch.nb_rows()); - assert(deserialized_batches[0].nb_columns() == original_batch.nb_columns()); - - // Or using deserializer class - std::vector batches; - sp_ipc::deserializer deser(batches); - deser << std::span(stream_data); - - assert(batches.size() == 1); -} -``` +\snippet deserializer_example.cpp example_deserializer_chaining diff --git a/examples/write_and_read_streams.cpp b/examples/write_and_read_streams.cpp index 104d496..ace47bf 100644 --- a/examples/write_and_read_streams.cpp +++ b/examples/write_and_read_streams.cpp @@ -182,6 +182,7 @@ std::vector create_record_batches(size_t num_batches, size_t r return batches; } +// [example_serialize_to_stream] /** * Serialize record batches to a stream */ @@ -200,7 +201,9 @@ std::vector serialize_batches_to_stream(const std::vector deserialize_stream_to_batches(const std::vector