diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 8a5fc6cd..c9d15d44 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -59,6 +59,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . @@ -103,6 +111,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/osx.yml b/.github/workflows/osx.yml index 15dfab08..9517dedd 100644 --- a/.github/workflows/osx.yml +++ b/.github/workflows/osx.yml @@ -64,6 +64,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . @@ -113,6 +121,14 @@ jobs: working-directory: build run: cmake --build . --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --target run_deserializer_example + - name: Install working-directory: build run: sudo cmake --install . diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 50b0f581..08866975 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -64,6 +64,14 @@ jobs: working-directory: build run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} @@ -113,6 +121,14 @@ jobs: working-directory: build run: cmake --build . --config ${{ matrix.build_type }} --target run_example + - name: Build deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target deserializer_example + + - name: Run deserializer example + working-directory: build + run: cmake --build . --config ${{ matrix.build_type }} --target run_deserializer_example + - name: Install working-directory: build run: cmake --install . --config ${{ matrix.build_type }} diff --git a/CMakeLists.txt b/CMakeLists.txt index f40fbfaf..bee18194 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,6 +114,7 @@ set(SPARROW_IPC_HEADERS ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp + ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserializer.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp ${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp diff --git a/README.md b/README.md index ffc0fb93..8dfbf109 100644 --- a/README.md +++ b/README.md @@ -107,21 +107,91 @@ void stream_record_batches(std::ostream& os, record_batch_source& source) ### Deserialize a stream into record batches +#### Using the function API + ```cpp #include -#include +#include #include namespace sp = sparrow; namespace sp_ipc = sparrow_ipc; -std::vector deserialize_stream_to_batches(const std::vector& stream_data) +std::vector deserialize_stream_example(const std::vector& stream_data) { + // Deserialize the entire stream at once auto batches = sp_ipc::deserialize_stream(stream_data); return batches; } ``` +#### Using the deserializer class + +The deserializer class allows you to accumulate record batches into an existing container as you deserialize data: + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_basic_example(const std::vector& stream_data) +{ + // Create a container to hold the deserialized batches + std::vector batches; + + // Create a deserializer that will append to our container + sp_ipc::deserializer deser(batches); + + // Deserialize the stream data + deser.deserialize(std::span(stream_data)); + + // Process the accumulated batches + for (const auto& batch : batches) + { + std::cout << "Batch with " << batch.nb_rows() << " rows and " << batch.nb_columns() << " columns\n"; + } +} +``` + +#### Incremental deserialization + +The deserializer class is particularly useful for streaming scenarios where data arrives in chunks: + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +void deserializer_incremental_example(const std::vector>& stream_chunks) +{ + // Container to accumulate all deserialized batches + std::vector batches; + + // Create a deserializer + sp_ipc::deserializer deser(batches); + + // Deserialize chunks as they arrive using the streaming operator + for (const auto& chunk : stream_chunks) + { + deser << std::span(chunk); + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; + } + + // All batches are now available in the container + std::cout << "Total batches deserialized: " << batches.size() << "\n"; +} +``` + ## Documentation The documentation (currently being written) can be found at https://quantstack.github.io/sparrow-ipc/index.html diff --git a/docs/source/main_page.md b/docs/source/main_page.md index f5109d60..53db3d39 100644 --- a/docs/source/main_page.md +++ b/docs/source/main_page.md @@ -21,3 +21,41 @@ Sparrow-IPC requires a modern C++ compiler supporting C++20: | MSVC | 19.41 or higher | This software is licensed under the BSD-3-Clause license. See the [LICENSE](https://github.com/QuantStack/sparrow-ipc/blob/main/LICENSE) file for details. + +Getting Started +--------------- + +### Quick Example + +```cpp +#include +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +// Serialize record batches +std::vector serialize(const std::vector& batches) +{ + std::vector stream_data; + sp_ipc::memory_output_stream stream(stream_data); + sp_ipc::serializer serializer(stream); + serializer << batches << sp_ipc::end_stream; + return stream_data; +} + +// Deserialize record batches +std::vector deserialize(const std::vector& stream_data) +{ + return sp_ipc::deserialize_stream(stream_data); +} +``` + +Documentation +------------- + +- @ref serialization "Serialization and Deserialization" - How to serialize and deserialize record batches +- @ref dev_build "Development Build" - How to build the project for development diff --git a/docs/source/serialization.md b/docs/source/serialization.md new file mode 100644 index 00000000..3763f58e --- /dev/null +++ b/docs/source/serialization.md @@ -0,0 +1,55 @@ +# Serialization and Deserialization {#serialization} + +This page describes how to serialize and deserialize record batches using `sparrow-ipc`. + +## Overview + +`sparrow-ipc` provides two main approaches for both serialization and deserialization: + +- **Function API**: Simple one-shot operations for serializing/deserializing complete data +- **Class API**: Streaming-oriented classes (`serializer` and `deserializer`) for incremental operations + +## Serialization + +### Serialize record batches to a memory stream + +The simplest way to serialize record batches is to use the `serializer` class with a `memory_output_stream`: + +\snippet write_and_read_streams.cpp example_serialize_to_stream + +### Serialize individual record batches + +You can also serialize record batches one at a time: + +\snippet write_and_read_streams.cpp example_serialize_individual + +## Deserialization + +### Using the function API + +The simplest way to deserialize a complete Arrow IPC stream is using `deserialize_stream`: + +\snippet deserializer_example.cpp example_deserialize_stream + +### Using the deserializer class + +The `deserializer` class provides more control over deserialization and is useful when you want to: +- Accumulate batches into an existing container +- Deserialize data incrementally as it arrives +- Process multiple streams into a single container + +#### Basic usage + +\snippet deserializer_example.cpp example_deserializer_basic + +#### Incremental deserialization + +The `deserializer` class is particularly useful for streaming scenarios where data arrives in chunks: + +\snippet deserializer_example.cpp example_deserializer_incremental + +#### Chaining deserializations + +The streaming operator can be chained for fluent API usage: + +\snippet deserializer_example.cpp example_deserializer_chaining diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index c7efee09..d78a52cf 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -11,8 +11,18 @@ target_link_libraries(write_and_read_streams arrow-testing-data ) +# Create executable for the deserializer_example +add_executable(deserializer_example deserializer_example.cpp) + +# Link against sparrow-ipc and its dependencies +target_link_libraries(deserializer_example + PRIVATE + sparrow-ipc + sparrow::sparrow +) + # Set C++ standard to match the main project -set_target_properties(write_and_read_streams +set_target_properties(write_and_read_streams deserializer_example PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON @@ -26,8 +36,15 @@ target_include_directories(write_and_read_streams ${CMAKE_BINARY_DIR}/generated ) +target_include_directories(deserializer_example + PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_BINARY_DIR}/generated +) + # Ensure generated flatbuffer headers are available add_dependencies(write_and_read_streams generate_flatbuffers_headers) +add_dependencies(deserializer_example generate_flatbuffers_headers) # Optional: Copy to build directory for easy execution if(WIN32) @@ -38,7 +55,7 @@ if(WIN32) set(ZSTD_DLL_TARGET libzstd_static) endif() - # On Windows, copy required DLLs + # On Windows, copy required DLLs for write_and_read_streams set(DLL_COPY_COMMANDS "") # Initialize a list to hold all copy commands # Add unconditional copy commands list(APPEND DLL_COPY_COMMANDS @@ -66,6 +83,31 @@ if(WIN32) ${DLL_COPY_COMMANDS} COMMENT "Copying required DLLs to example executable directory" ) + + # On Windows, copy required DLLs for deserializer_example + set(DLL_COPY_COMMANDS_DESER "") # Initialize a list to hold all copy commands + list(APPEND DLL_COPY_COMMANDS_DESER + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + ) + + if(ZSTD_DLL_TARGET) + list(APPEND DLL_COPY_COMMANDS_DESER + COMMAND ${CMAKE_COMMAND} -E copy_if_different + "$" + "$" + ) + endif() + + add_custom_command( + TARGET deserializer_example POST_BUILD + ${DLL_COPY_COMMANDS_DESER} + COMMENT "Copying required DLLs to deserializer_example executable directory" + ) endif() # Create a custom target to easily run the example @@ -77,3 +119,13 @@ add_custom_target(run_example ) set_target_properties(run_example PROPERTIES FOLDER "Examples") + +# Create a custom target to run the deserializer example +add_custom_target(run_deserializer_example + COMMAND deserializer_example + DEPENDS deserializer_example + COMMENT "Running deserializer_example" + USES_TERMINAL +) + +set_target_properties(run_deserializer_example PROPERTIES FOLDER "Examples") diff --git a/examples/deserializer_example.cpp b/examples/deserializer_example.cpp new file mode 100644 index 00000000..28499f8c --- /dev/null +++ b/examples/deserializer_example.cpp @@ -0,0 +1,199 @@ +/** + * @file deserializer_example.cpp + * @brief Examples demonstrating deserialization of Arrow IPC streams using sparrow-ipc + * + * This file shows different ways to deserialize Arrow IPC streams: + * 1. Using the function API (deserialize_stream) + * 2. Using the deserializer class for incremental deserialization + */ + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace sp = sparrow; +namespace sp_ipc = sparrow_ipc; + +/** + * Helper function to create sample record batches for demonstration + */ +std::vector create_sample_batches(size_t count) +{ + std::vector batches; + for (size_t i = 0; i < count; ++i) + { + auto int_array = sp::primitive_array( + {static_cast(i * 10), static_cast(i * 10 + 1), static_cast(i * 10 + 2)} + ); + auto string_array = sp::string_array(std::vector{ + "batch_" + std::to_string(i) + "_a", + "batch_" + std::to_string(i) + "_b", + "batch_" + std::to_string(i) + "_c" + }); + batches.push_back(sp::record_batch( + {{"id", sp::array(std::move(int_array))}, {"name", sp::array(std::move(string_array))}} + )); + } + return batches; +} + +/** + * Helper function to serialize batches to a byte buffer + */ +std::vector serialize_batches(const std::vector& batches) +{ + std::vector buffer; + sp_ipc::memory_output_stream stream(buffer); + sp_ipc::serializer ser(stream); + ser << batches << sp_ipc::end_stream; + return buffer; +} + +// [example_deserialize_stream] +/** + * Example: Deserialize a stream using the function API + * + * This is the simplest way to deserialize an Arrow IPC stream. + * Use this when you have the complete stream data available. + */ +std::vector deserialize_stream_example(const std::vector& stream_data) +{ + // Deserialize the entire stream at once + auto batches = sp_ipc::deserialize_stream(stream_data); + return batches; +} + +// [example_deserialize_stream] + +// [example_deserializer_basic] +/** + * Example: Basic usage of the deserializer class + * + * The deserializer class allows you to accumulate record batches + * into an existing container as you deserialize data. + */ +void deserializer_basic_example(const std::vector& stream_data) +{ + // Create a container to hold the deserialized batches + std::vector batches; + + // Create a deserializer that will append to our container + sp_ipc::deserializer deser(batches); + + // Deserialize the stream data + deser.deserialize(std::span(stream_data)); + + // Process the accumulated batches + for (const auto& batch : batches) + { + std::cout << "Batch with " << batch.nb_rows() << " rows and " << batch.nb_columns() << " columns\n"; + } +} + +// [example_deserializer_basic] + +// [example_deserializer_incremental] +/** + * Example: Incremental deserialization with the deserializer class + * + * This example shows how to deserialize data incrementally as it arrives, + * which is useful for streaming scenarios where data comes in chunks. + */ +void deserializer_incremental_example(const std::vector>& stream_chunks) +{ + // Container to accumulate all deserialized batches + std::vector batches; + + // Create a deserializer + sp_ipc::deserializer deser(batches); + + // Deserialize chunks as they arrive using the streaming operator + for (const auto& chunk : stream_chunks) + { + deser << std::span(chunk); + std::cout << "After chunk: " << batches.size() << " batches accumulated\n"; + } + + // All batches are now available in the container + std::cout << "Total batches deserialized: " << batches.size() << "\n"; +} + +// [example_deserializer_incremental] + +// [example_deserializer_chaining] +/** + * Example: Chaining multiple deserializations + * + * The streaming operator can be chained for fluent API usage. + */ +void deserializer_chaining_example( + const std::vector& chunk1, + const std::vector& chunk2, + const std::vector& chunk3 +) +{ + std::vector batches; + sp_ipc::deserializer deser(batches); + + // Chain multiple deserializations in a single expression + deser << std::span(chunk1) << std::span(chunk2) + << std::span(chunk3); + + std::cout << "Deserialized " << batches.size() << " batches from 3 chunks\n"; +} + +// [example_deserializer_chaining] + +int main() +{ + std::cout << "=== Sparrow IPC Deserializer Examples ===\n\n"; + + try + { + // Create sample data + auto original_batches = create_sample_batches(3); + auto stream_data = serialize_batches(original_batches); + + std::cout << "1. Function API Example (deserialize_stream)\n"; + std::cout << " ----------------------------------------\n"; + auto deserialized = deserialize_stream_example(stream_data); + std::cout << " Deserialized " << deserialized.size() << " batches\n\n"; + + std::cout << "2. Basic Deserializer Class Example\n"; + std::cout << " ---------------------------------\n"; + deserializer_basic_example(stream_data); + std::cout << "\n"; + + std::cout << "3. Incremental Deserialization Example\n"; + std::cout << " ------------------------------------\n"; + // Create multiple chunks (each containing different batches) + std::vector> chunks; + for (size_t i = 0; i < 3; ++i) + { + auto batch = create_sample_batches(1); + chunks.push_back(serialize_batches(batch)); + } + deserializer_incremental_example(chunks); + std::cout << "\n"; + + std::cout << "4. Chaining Example\n"; + std::cout << " -----------------\n"; + deserializer_chaining_example(chunks[0], chunks[1], chunks[2]); + + std::cout << "\n=== All examples completed successfully! ===\n"; + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << "\n"; + return 1; + } + + return 0; +} diff --git a/examples/write_and_read_streams.cpp b/examples/write_and_read_streams.cpp index 104d4964..ace47bfc 100644 --- a/examples/write_and_read_streams.cpp +++ b/examples/write_and_read_streams.cpp @@ -182,6 +182,7 @@ std::vector create_record_batches(size_t num_batches, size_t r return batches; } +// [example_serialize_to_stream] /** * Serialize record batches to a stream */ @@ -200,7 +201,9 @@ std::vector serialize_batches_to_stream(const std::vector deserialize_stream_to_batches(const std::vector +#include +#include +#include + +#include + +#include "deserialize.hpp" +#include "sparrow_ipc/deserialize.hpp" + +namespace sparrow_ipc +{ + template + requires std::same_as, sparrow::record_batch> + class deserializer + { + public: + + deserializer(R& data) + : m_data(&data) + { + } + + void deserialize(std::span data) + { + // Insert at the end of m_data container the deserialized record batches + auto& container = *m_data; + auto deserialized_batches = sparrow_ipc::deserialize_stream(data); + container.insert( + std::end(container), + std::make_move_iterator(std::begin(deserialized_batches)), + std::make_move_iterator(std::end(deserialized_batches)) + ); + } + + deserializer& operator<<(std::span data) + { + deserialize(data); + return *this; + } + + private: + + R* m_data; + }; +} diff --git a/src/deserialize.cpp b/src/deserialize.cpp index 436cc97d..b91ad86d 100644 --- a/src/deserialize.cpp +++ b/src/deserialize.cpp @@ -31,8 +31,13 @@ namespace sparrow_ipc { size_t buffer_index = 0; + const size_t num_fields = schema.fields() == nullptr ? 0 : static_cast(schema.fields()->size()); std::vector arrays; - arrays.reserve(schema.fields()->size()); + if (num_fields == 0) + { + return arrays; + } + arrays.reserve(num_fields); size_t field_idx = 0; for (const auto field : *(schema.fields())) { @@ -207,18 +212,24 @@ namespace sparrow_ipc case org::apache::arrow::flatbuf::MessageHeader::Schema: { schema = message->header_as_Schema(); - const size_t size = static_cast(schema->fields()->size()); + const size_t size = schema->fields() == nullptr + ? 0 + : static_cast(schema->fields()->size()); field_names.reserve(size); fields_nullable.reserve(size); fields_metadata.reserve(size); - + if (schema->fields() == nullptr) + { + break; + } for (const auto field : *(schema->fields())) { - if(field != nullptr && field->name() != nullptr) + if (field != nullptr && field->name() != nullptr) { - field_names.emplace_back(field->name()->str()); + field_names.emplace_back(field->name()->str()); } - else { + else + { field_names.emplace_back("_unnamed_"); } fields_nullable.push_back(field->nullable()); @@ -249,7 +260,8 @@ namespace sparrow_ipc encapsulated_message, fields_metadata ); - auto names_copy = field_names; // TODO: Remove when issue with the to_vector of record_batch is fixed + auto names_copy = field_names; // TODO: Remove when issue with the to_vector of + // record_batch is fixed sparrow::record_batch sp_record_batch(std::move(names_copy), std::move(arrays)); record_batches.emplace_back(std::move(sp_record_batch)); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d36e9aa3..a516d1e8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -12,6 +12,7 @@ set(SPARROW_IPC_TESTS_SRC test_chunk_memory_serializer.cpp test_compression.cpp test_de_serialization_with_files.cpp + test_deserializer.cpp $<$>:test_flatbuffer_utils.cpp> test_memory_output_streams.cpp test_serialize_utils.cpp diff --git a/tests/test_deserializer.cpp b/tests/test_deserializer.cpp new file mode 100644 index 00000000..2600e60d --- /dev/null +++ b/tests/test_deserializer.cpp @@ -0,0 +1,680 @@ +#include +#include +#include +#include + +#include +#include + +#include "sparrow_ipc/deserializer.hpp" +#include "sparrow_ipc/memory_output_stream.hpp" +#include "sparrow_ipc/serializer.hpp" +#include "sparrow_ipc_tests_helpers.hpp" + +namespace sparrow_ipc +{ + namespace sp = sparrow; + + // Helper function to serialize record batches to a byte buffer + std::vector serialize_record_batches(const std::vector& batches) + { + std::vector buffer; + memory_output_stream stream(buffer); + serializer ser(stream); + ser << batches << end_stream; + return buffer; + } + + // Helper function to create multiple compatible record batches + std::vector create_test_record_batches(size_t count) + { + std::vector batches; + for (size_t i = 0; i < count; ++i) + { + auto int_array = sp::primitive_array({static_cast(i * 10), + static_cast(i * 10 + 1), + static_cast(i * 10 + 2)}); + auto string_array = sp::string_array( + std::vector{"batch_" + std::to_string(i) + "_a", + "batch_" + std::to_string(i) + "_b", + "batch_" + std::to_string(i) + "_c"} + ); + batches.push_back(sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}})); + } + return batches; + } + + // Helper function to verify test_record_batch content (from create_test_record_batch) + void verify_test_record_batch_content(const sp::record_batch& batch) + { + CHECK_EQ(batch.nb_columns(), 2); + CHECK_EQ(batch.nb_rows(), 5); + + // Verify int column content + const auto& int_col = batch.get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + CHECK_EQ(impl[3].value(), 4); + CHECK_EQ(impl[4].value(), 5); + } + }); + + // Verify string column content + const auto& string_col = batch.get_column(1); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "hello"); + CHECK_EQ(impl[1].value(), "world"); + CHECK_EQ(impl[2].value(), "test"); + CHECK_EQ(impl[3].value(), "data"); + CHECK_EQ(impl[4].value(), "batch"); + } + }); + } + + // Helper function to verify content of batches created by create_test_record_batches + void verify_test_record_batches_content(const sp::record_batch& batch, size_t batch_index) + { + CHECK_EQ(batch.nb_columns(), 2); + CHECK_EQ(batch.nb_rows(), 3); + + // Verify int column content + const auto& int_col = batch.get_column(0); + int_col.visit([batch_index](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), static_cast(batch_index * 10)); + CHECK_EQ(impl[1].value(), static_cast(batch_index * 10 + 1)); + CHECK_EQ(impl[2].value(), static_cast(batch_index * 10 + 2)); + } + }); + + // Verify string column content + const auto& string_col = batch.get_column(1); + string_col.visit([batch_index](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "batch_" + std::to_string(batch_index) + "_a"); + CHECK_EQ(impl[1].value(), "batch_" + std::to_string(batch_index) + "_b"); + CHECK_EQ(impl[2].value(), "batch_" + std::to_string(batch_index) + "_c"); + } + }); + } + + TEST_SUITE("deserializer") + { + TEST_CASE("construction with empty vector") + { + SUBCASE("Construct with empty vector reference") + { + std::vector batches; + deserializer deser(batches); + CHECK_EQ(batches.size(), 0); + } + } + + TEST_CASE("deserialize single record batch") + { + SUBCASE("Deserialize one batch into empty vector") + { + std::vector batches; + deserializer deser(batches); + + // Create and serialize a single record batch + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(batches[0].nb_rows(), original_batch.nb_rows()); + verify_test_record_batch_content(batches[0]); + } + + SUBCASE("Deserialize batch with different data types") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({1, 2, 3}); + auto double_array = sp::primitive_array({1.5, 2.5, 3.5}); + auto float_array = sp::primitive_array({1.0f, 2.0f, 3.0f}); + + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"double_col", sp::array(std::move(double_array))}, + {"float_col", sp::array(std::move(float_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 3); + CHECK_EQ(batches[0].nb_rows(), 3); + + // Verify int column content + const auto& int_col = batches[0].get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + } + }); + + // Verify double column content + const auto& double_col = batches[0].get_column(1); + double_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), doctest::Approx(1.5)); + CHECK_EQ(impl[1].value(), doctest::Approx(2.5)); + CHECK_EQ(impl[2].value(), doctest::Approx(3.5)); + } + }); + + // Verify float column content + const auto& float_col = batches[0].get_column(2); + float_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), doctest::Approx(1.0f)); + CHECK_EQ(impl[1].value(), doctest::Approx(2.0f)); + CHECK_EQ(impl[2].value(), doctest::Approx(3.0f)); + } + }); + } + + SUBCASE("Deserialize empty record batch") + { + std::vector batches; + deserializer deser(batches); + + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + } + + TEST_CASE("deserialize multiple record batches") + { + SUBCASE("Deserialize multiple batches at once") + { + std::vector batches; + deserializer deser(batches); + + // Create multiple compatible batches + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + // Deserialize + deser.deserialize(std::span(serialized_data)); + + // Verify + REQUIRE_EQ(batches.size(), 3); + for (size_t i = 0; i < batches.size(); ++i) + { + CHECK_EQ(batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(batches[i].nb_rows(), original_batches[i].nb_rows()); + verify_test_record_batches_content(batches[i], i); + } + } + + SUBCASE("Deserialize large number of batches") + { + std::vector batches; + deserializer deser(batches); + + const size_t num_batches = 100; + auto original_batches = create_test_record_batches(num_batches); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), num_batches); + } + } + + TEST_CASE("incremental deserialization") + { + SUBCASE("Deserialize in multiple calls") + { + std::vector batches; + deserializer deser(batches); + + // First deserialization + auto batch1 = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 2); + + // Second deserialization - should append to existing batches + auto batch2 = create_test_record_batches(3); + auto serialized_data2 = serialize_record_batches(batch2); + deser.deserialize(std::span(serialized_data2)); + + CHECK_EQ(batches.size(), 5); + } + + SUBCASE("Multiple incremental deserializations") + { + std::vector batches; + deserializer deser(batches); + + for (size_t i = 0; i < 5; ++i) + { + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), (i + 1) * 2); + } + } + + SUBCASE("Deserialize into non-empty vector") + { + // Start with existing batches + std::vector batches = {create_test_record_batch()}; + CHECK_EQ(batches.size(), 1); + + deserializer deser(batches); + + // Add more batches + auto new_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(new_batches); + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("operator<< for deserialization") + { + SUBCASE("Single deserialization with <<") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser << std::span(serialized_data); + + REQUIRE_EQ(batches.size(), 1); + } + + SUBCASE("Chain multiple deserializations with <<") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + + auto batch3 = create_test_record_batches(1); + auto serialized_data3 = serialize_record_batches(batch3); + + deser << std::span(serialized_data1) + << std::span(serialized_data2) + << std::span(serialized_data3); + + CHECK_EQ(batches.size(), 4); + } + + SUBCASE("Mix deserialization methods") + { + std::vector batches; + deserializer deser(batches); + + auto batch1 = create_test_record_batches(1); + auto serialized_data1 = serialize_record_batches(batch1); + deser.deserialize(std::span(serialized_data1)); + + CHECK_EQ(batches.size(), 1); + + auto batch2 = create_test_record_batches(2); + auto serialized_data2 = serialize_record_batches(batch2); + deser << std::span(serialized_data2); + + CHECK_EQ(batches.size(), 3); + } + } + + TEST_CASE("deserialize with different container types") + { + SUBCASE("std::deque") + { + std::deque batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(2); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 2); + } + + SUBCASE("std::list") + { + std::list batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(3); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 3); + } + } + + TEST_CASE("round-trip serialization and deserialization") + { + SUBCASE("Single batch round-trip") + { + auto original_batch = create_test_record_batch(); + auto serialized_data = serialize_record_batches({original_batch}); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), 1); + CHECK_EQ(deserialized_batches[0].nb_columns(), original_batch.nb_columns()); + CHECK_EQ(deserialized_batches[0].nb_rows(), original_batch.nb_rows()); + + // Verify column names match + for (size_t i = 0; i < original_batch.nb_columns(); ++i) + { + CHECK_EQ(deserialized_batches[0].names()[i], original_batch.names()[i]); + } + verify_test_record_batch_content(deserialized_batches[0]); + } + + SUBCASE("Multiple batches round-trip") + { + auto original_batches = create_test_record_batches(5); + auto serialized_data = serialize_record_batches(original_batches); + + std::vector deserialized_batches; + deserializer deser(deserialized_batches); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(deserialized_batches.size(), original_batches.size()); + + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches[i].nb_rows(), original_batches[i].nb_rows()); + } + } + + SUBCASE("Double round-trip") + { + // First round-trip + auto original_batches = create_test_record_batches(2); + auto serialized_data1 = serialize_record_batches(original_batches); + + std::vector deserialized_batches1; + deserializer deser1(deserialized_batches1); + deser1.deserialize(std::span(serialized_data1)); + + // Second round-trip + auto serialized_data2 = serialize_record_batches(deserialized_batches1); + + std::vector deserialized_batches2; + deserializer deser2(deserialized_batches2); + deser2.deserialize(std::span(serialized_data2)); + + // Verify both results match + REQUIRE_EQ(deserialized_batches2.size(), original_batches.size()); + for (size_t i = 0; i < original_batches.size(); ++i) + { + CHECK_EQ(deserialized_batches2[i].nb_columns(), original_batches[i].nb_columns()); + CHECK_EQ(deserialized_batches2[i].nb_rows(), original_batches[i].nb_rows()); + } + } + } + + TEST_CASE("deserialize with complex data types") + { + SUBCASE("Mixed primitive types") + { + std::vector batches; + deserializer deser(batches); + + auto int8_array = sp::primitive_array({1, 2, 3}); + auto int16_array = sp::primitive_array({100, 200, 300}); + auto int32_array = sp::primitive_array({1000, 2000, 3000}); + auto int64_array = sp::primitive_array({10000, 20000, 30000}); + + auto rb = sp::record_batch({{"int8_col", sp::array(std::move(int8_array))}, + {"int16_col", sp::array(std::move(int16_array))}, + {"int32_col", sp::array(std::move(int32_array))}, + {"int64_col", sp::array(std::move(int64_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 4); + + // Verify int8 column + const auto& int8_col = batches[0].get_column(0); + int8_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1); + CHECK_EQ(impl[1].value(), 2); + CHECK_EQ(impl[2].value(), 3); + } + }); + + // Verify int16 column + const auto& int16_col = batches[0].get_column(1); + int16_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 100); + CHECK_EQ(impl[1].value(), 200); + CHECK_EQ(impl[2].value(), 300); + } + }); + + // Verify int32 column + const auto& int32_col = batches[0].get_column(2); + int32_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 1000); + CHECK_EQ(impl[1].value(), 2000); + CHECK_EQ(impl[2].value(), 3000); + } + }); + + // Verify int64 column + const auto& int64_col = batches[0].get_column(3); + int64_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 10000); + CHECK_EQ(impl[1].value(), 20000); + CHECK_EQ(impl[2].value(), 30000); + } + }); + } + + SUBCASE("String arrays") + { + std::vector batches; + deserializer deser(batches); + + auto string_array = sp::string_array( + std::vector{"hello", "world", "test", "data"} + ); + auto rb = sp::record_batch({{"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 4); + + // Verify string column content + const auto& string_col = batches[0].get_column(0); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "hello"); + CHECK_EQ(impl[1].value(), "world"); + CHECK_EQ(impl[2].value(), "test"); + CHECK_EQ(impl[3].value(), "data"); + } + }); + } + } + + TEST_CASE("edge cases") + { + SUBCASE("Deserialize empty data") + { + std::vector batches; + deserializer deser(batches); + + // Create an empty batch + auto empty_batch = sp::record_batch({}); + auto serialized_data = serialize_record_batches({empty_batch}); + + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_columns(), 0); + } + + SUBCASE("Very large batch") + { + std::vector batches; + deserializer deser(batches); + + // Create a large array + std::vector large_data(10000); + std::iota(large_data.begin(), large_data.end(), 0); + auto large_array = sp::primitive_array(large_data); + auto rb = sp::record_batch({{"large_col", sp::array(std::move(large_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 10000); + + // Verify some sample values + const auto& col = batches[0].get_column(0); + col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 0); + CHECK_EQ(impl[100].value(), 100); + CHECK_EQ(impl[1000].value(), 1000); + CHECK_EQ(impl[5000].value(), 5000); + CHECK_EQ(impl[9999].value(), 9999); + } + }); + } + + SUBCASE("Single row batches") + { + std::vector batches; + deserializer deser(batches); + + auto int_array = sp::primitive_array({42}); + auto string_array = sp::string_array(std::vector{"single"}); + auto rb = sp::record_batch({{"int_col", sp::array(std::move(int_array))}, + {"string_col", sp::array(std::move(string_array))}}); + + auto serialized_data = serialize_record_batches({rb}); + deser.deserialize(std::span(serialized_data)); + + REQUIRE_EQ(batches.size(), 1); + CHECK_EQ(batches[0].nb_rows(), 1); + + // Verify content + const auto& int_col = batches[0].get_column(0); + int_col.visit([](const auto& impl) { + if constexpr (sp::is_primitive_array_v>) + { + CHECK_EQ(impl[0].value(), 42); + } + }); + + const auto& string_col = batches[0].get_column(1); + string_col.visit([](const auto& impl) { + if constexpr (sp::is_string_array_v>) + { + CHECK_EQ(impl[0].value(), "single"); + } + }); + } + } + + TEST_CASE("workflow example") + { + SUBCASE("Typical streaming deserialization workflow") + { + std::vector batches; + deserializer deser(batches); + + // Simulate receiving data in chunks + for (size_t i = 0; i < 3; ++i) + { + auto chunk_batches = create_test_record_batches(2); + auto serialized_chunk = serialize_record_batches(chunk_batches); + deser << std::span(serialized_chunk); + } + + // Verify all batches accumulated + CHECK_EQ(batches.size(), 6); + + // Add one more batch using deserialize method + auto final_batch = create_test_record_batches(1); + auto serialized_final = serialize_record_batches(final_batch); + deser.deserialize(std::span(serialized_final)); + + CHECK_EQ(batches.size(), 7); + } + } + + TEST_CASE("deserializer with const container") + { + SUBCASE("Works with non-const reference") + { + std::vector batches; + deserializer deser(batches); + + auto original_batches = create_test_record_batches(1); + auto serialized_data = serialize_record_batches(original_batches); + + deser.deserialize(std::span(serialized_data)); + + CHECK_EQ(batches.size(), 1); + } + } + } +}