Skip to content

Commit 659f2ce

Browse files
authored
Support arrow file format (#62)
1 parent 5558c55 commit 659f2ce

File tree

9 files changed

+779
-21
lines changed

9 files changed

+779
-21
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ set(SPARROW_IPC_HEADERS
122122
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize_utils.hpp
123123
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
124124
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serializer.hpp
125+
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/stream_file_serializer.hpp
125126
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp
126127
)
127128

@@ -143,6 +144,7 @@ set(SPARROW_IPC_SRC
143144
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
144145
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
145146
${SPARROW_IPC_SOURCE_DIR}/serializer.cpp
147+
${SPARROW_IPC_SOURCE_DIR}/stream_file_serializer.cpp
146148
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
147149
)
148150

include/sparrow_ipc/deserialize.hpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,46 @@
22

33
#include <optional>
44
#include <string>
5+
#include <vector>
56

67
#include <sparrow/record_batch.hpp>
78

89
#include "Message_generated.h"
10+
#include "Schema_generated.h"
911
#include "sparrow_ipc/config/config.hpp"
1012
#include "sparrow_ipc/encapsulated_message.hpp"
13+
#include "sparrow_ipc/metadata.hpp"
1114

1215
namespace sparrow_ipc
1316
{
17+
/**
18+
* @brief Deserializes arrays from an Apache Arrow RecordBatch using the provided schema.
19+
*
20+
* This function processes each field in the schema and deserializes the corresponding
21+
* data from the RecordBatch into sparrow::array objects. It handles various Arrow data
22+
* types including primitive types (bool, integers, floating point), binary data, and
23+
* string data with their respective size variants.
24+
*
25+
* @param record_batch The Apache Arrow FlatBuffer RecordBatch containing the serialized data
26+
* @param schema The Apache Arrow FlatBuffer Schema defining the structure and types of the data
27+
* @param encapsulated_message The message containing the binary data buffers
28+
* @param field_metadata Metadata for each field
29+
*
30+
* @return std::vector<sparrow::array> A vector of deserialized arrays, one for each field in the schema
31+
*
32+
* @throws std::runtime_error If an unsupported data type, integer bit width, or floating point precision
33+
* is encountered
34+
*
35+
* The function maintains a buffer index that is incremented as it processes each field
36+
* to correctly map data buffers to their corresponding arrays.
37+
*/
38+
[[nodiscard]] SPARROW_IPC_API std::vector<sparrow::array> get_arrays_from_record_batch(
39+
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
40+
const org::apache::arrow::flatbuf::Schema& schema,
41+
const encapsulated_message& encapsulated_message,
42+
const std::vector<std::optional<std::vector<sparrow::metadata_pair>>>& field_metadata
43+
);
44+
1445
/**
1546
* @brief Deserializes an Arrow IPC stream from binary data into a vector of record batches.
1647
*

include/sparrow_ipc/flatbuffer_utils.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ namespace sparrow_ipc
280280
get_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type);
281281

282282
/**
283-
* @brief Calculates the total size of the body section for an Arrow array.
283+
* @brief Calculates the total aligned size in bytes of all buffers in an Arrow array structure.
284284
*
285285
* This function recursively computes the total size needed for all buffers
286286
* in an Arrow array structure, including buffers from child arrays. Each

include/sparrow_ipc/magic_values.hpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <array>
55
#include <cstdint>
66
#include <istream>
7+
#include <ranges>
78

89
namespace sparrow_ipc
910
{
@@ -19,6 +20,19 @@ namespace sparrow_ipc
1920
*/
2021
inline constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
2122

23+
/**
24+
* Magic bytes for Arrow file format defined in the Arrow IPC specification:
25+
* https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
26+
* The magic string is "ARROW1" (6 bytes) followed by 2 padding bytes to reach 8-byte alignment
27+
*/
28+
inline constexpr std::array<std::uint8_t, 6> arrow_file_magic = {'A', 'R', 'R', 'O', 'W', '1'};
29+
inline constexpr std::size_t arrow_file_magic_size = arrow_file_magic.size();
30+
31+
/**
32+
* Magic bytes with padding for file header (8 bytes total for alignment)
33+
*/
34+
inline constexpr std::array<std::uint8_t, 8> arrow_file_header_magic = {'A', 'R', 'R', 'O', 'W', '1', 0x00, 0x00};
35+
2236
template <std::ranges::input_range R>
2337
[[nodiscard]] bool is_continuation(const R& buf)
2438
{
@@ -30,4 +44,15 @@ namespace sparrow_ipc
3044
{
3145
return std::ranges::equal(buf, end_of_stream);
3246
}
47+
48+
template <std::ranges::input_range R>
49+
[[nodiscard]] bool is_arrow_file_magic(const R& buf)
50+
{
51+
if (std::ranges::size(buf) < arrow_file_magic_size)
52+
{
53+
return false;
54+
}
55+
auto buf_begin = std::ranges::begin(buf);
56+
return std::equal(buf_begin, buf_begin + arrow_file_magic_size, arrow_file_magic.begin());
57+
}
3358
}

0 commit comments

Comments
 (0)