55
66namespace sparrow_ipc
77{
8- void common_serialize (
9- const flatbuffers::FlatBufferBuilder& builder,
10- any_output_stream& stream
11- )
8+ constexpr int64_t arrow_alignment = 8 ;
9+
10+ // Aligns a value to the next multiple of 8, as required by the Arrow IPC format for message bodies.
11+ int64_t align_to_8 ( int64_t n )
1212 {
13- stream.write (continuation);
14- const flatbuffers::uoffset_t size = builder.GetSize ();
15- const std::span<const uint8_t > size_span (reinterpret_cast <const uint8_t *>(&size), sizeof (uint32_t ));
16- stream.write (size_span);
17- stream.write (std::span (builder.GetBufferPointer (), size));
18- stream.add_padding ();
13+ return (n + arrow_alignment - 1 ) & -arrow_alignment;
1914 }
2015
2116 void serialize_schema_message (const sparrow::record_batch& record_batch, any_output_stream& stream)
@@ -27,7 +22,233 @@ namespace sparrow_ipc
2722 std::optional<CompressionType> compression,
2823 std::optional<std::reference_wrapper<CompressionCache>> cache)
2924 {
30- common_serialize (get_record_batch_message_builder (record_batch, compression, cache), stream);
31- generate_body (record_batch, stream, compression, cache);
25+ // Create a new builder for the Schema message's metadata
26+ flatbuffers::FlatBufferBuilder schema_builder;
27+
28+ // Create the Field metadata, which describes a single column (or array)
29+ flatbuffers::Offset<flatbuffers::String> fb_name_offset = 0 ;
30+ if (arrow_schema.name )
31+ {
32+ fb_name_offset = schema_builder.CreateString (arrow_schema.name );
33+ }
34+
35+ // Determine the Flatbuffer type information from the C schema's format string
36+ auto [type_enum, type_offset] = get_flatbuffer_type (schema_builder, arrow_schema.format );
37+
38+ // Handle metadata
39+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>>
40+ fb_metadata_offset = 0 ;
41+
42+ if (arr.metadata ())
43+ {
44+ sparrow::key_value_view metadata_view = *(arr.metadata ());
45+ std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>> kv_offsets;
46+
47+ auto mv_it = metadata_view.cbegin ();
48+ for (auto i = 0 ; i < metadata_view.size (); ++i, ++mv_it)
49+ {
50+ auto key_offset = schema_builder.CreateString (std::string ((*mv_it).first ));
51+ auto value_offset = schema_builder.CreateString (std::string ((*mv_it).second ));
52+ kv_offsets.push_back (
53+ org::apache::arrow::flatbuf::CreateKeyValue (schema_builder, key_offset, value_offset));
54+ }
55+ fb_metadata_offset = schema_builder.CreateVector (kv_offsets);
56+ }
57+
58+ // Build the Field object
59+ auto fb_field = org::apache::arrow::flatbuf::CreateField (
60+ schema_builder,
61+ fb_name_offset,
62+ (arrow_schema.flags & static_cast <int64_t >(sparrow::ArrowFlag::NULLABLE)) != 0 ,
63+ type_enum,
64+ type_offset,
65+ 0 , // dictionary
66+ 0 , // children
67+ fb_metadata_offset);
68+
69+ // A Schema contains a vector of fields. For this primitive array, there is only one
70+ std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> fields_vec = {fb_field};
71+ auto fb_fields = schema_builder.CreateVector (fields_vec);
72+
73+ // Build the Schema object from the vector of fields
74+ auto schema_offset = org::apache::arrow::flatbuf::CreateSchema (schema_builder, org::apache::arrow::flatbuf::Endianness::Little, fb_fields);
75+
76+ // Wrap the Schema in a top-level Message, which is the standard IPC envelope
77+ auto schema_message_offset = org::apache::arrow::flatbuf::CreateMessage (
78+ schema_builder,
79+ org::apache::arrow::flatbuf::MetadataVersion::V5,
80+ org::apache::arrow::flatbuf::MessageHeader::Schema,
81+ schema_offset.Union (),
82+ 0
83+ );
84+ schema_builder.Finish (schema_message_offset);
85+
86+ // Assemble the Schema message bytes
87+ uint32_t schema_len = schema_builder.GetSize (); // Get the size of the serialized metadata
88+ final_buffer.resize (sizeof (uint32_t ) + schema_len); // Resize the buffer to hold the message
89+ // Copy the metadata into the buffer, after the 4-byte length prefix
90+ memcpy (final_buffer.data () + sizeof (uint32_t ), schema_builder.GetBufferPointer (), schema_len);
91+ // Write the 4-byte metadata length at the beginning of the message
92+ memcpy (final_buffer.data (), &schema_len, sizeof (schema_len));
93+ }
94+
95+ // II - Serialize the RecordBatch message
96+ // After the Schema, we send the RecordBatch containing the actual data
97+ {
98+ // Create a new builder for the RecordBatch message's metadata
99+ flatbuffers::FlatBufferBuilder batch_builder;
100+
101+ // arrow_arr.buffers[0] is the validity bitmap
102+ // arrow_arr.buffers[1] is the data buffer
103+ const uint8_t * validity_bitmap = static_cast <const uint8_t *>(arrow_arr.buffers [0 ]);
104+ const uint8_t * data_buffer = static_cast <const uint8_t *>(arrow_arr.buffers [1 ]);
105+
106+ // Calculate the size of the validity and data buffers
107+ int64_t validity_size = (arrow_arr.length + arrow_alignment - 1 ) / arrow_alignment;
108+ int64_t data_size = arrow_arr.length * sizeof (T);
109+ int64_t body_len = validity_size + data_size; // The total size of the message body
110+
111+ // Create Flatbuffer descriptions for the data buffers
112+ org::apache::arrow::flatbuf::Buffer validity_buffer_struct (0 , validity_size);
113+ org::apache::arrow::flatbuf::Buffer data_buffer_struct (validity_size, data_size);
114+ // Create the FieldNode, which describes the layout of the array data
115+ org::apache::arrow::flatbuf::FieldNode field_node_struct (arrow_arr.length , arrow_arr.null_count );
116+
117+ // A RecordBatch contains a vector of nodes and a vector of buffers
118+ auto fb_nodes_vector = batch_builder.CreateVectorOfStructs (&field_node_struct, 1 );
119+ std::vector<org::apache::arrow::flatbuf::Buffer> buffers_vec = {validity_buffer_struct, data_buffer_struct};
120+ auto fb_buffers_vector = batch_builder.CreateVectorOfStructs (buffers_vec);
121+
122+ // Build the RecordBatch metadata object
123+ auto record_batch_offset = org::apache::arrow::flatbuf::CreateRecordBatch (batch_builder, arrow_arr.length , fb_nodes_vector, fb_buffers_vector);
124+
125+ // Wrap the RecordBatch in a top-level Message
126+ auto batch_message_offset = org::apache::arrow::flatbuf::CreateMessage (
127+ batch_builder,
128+ org::apache::arrow::flatbuf::MetadataVersion::V5,
129+ org::apache::arrow::flatbuf::MessageHeader::RecordBatch,
130+ record_batch_offset.Union (),
131+ body_len
132+ );
133+ batch_builder.Finish (batch_message_offset);
134+
135+ // III - Append the RecordBatch message to the final buffer
136+ uint32_t batch_meta_len = batch_builder.GetSize (); // Get the size of the batch metadata
137+ int64_t aligned_batch_meta_len = align_to_8 (batch_meta_len); // Calculate the padded length
138+
139+ size_t current_size = final_buffer.size (); // Get the current size (which is the end of the Schema message)
140+ // Resize the buffer to append the new message
141+ final_buffer.resize (current_size + sizeof (uint32_t ) + aligned_batch_meta_len + body_len);
142+ uint8_t * dst = final_buffer.data () + current_size; // Get a pointer to where the new message will start
143+
144+ // Write the 4-byte metadata length for the RecordBatch message
145+ memcpy (dst, &batch_meta_len, sizeof (batch_meta_len));
146+ dst += sizeof (uint32_t );
147+ // Copy the RecordBatch metadata into the buffer
148+ memcpy (dst, batch_builder.GetBufferPointer (), batch_meta_len);
149+ // Add padding to align the body to an 8-byte boundary
150+ memset (dst + batch_meta_len, 0 , aligned_batch_meta_len - batch_meta_len);
151+ dst += aligned_batch_meta_len;
152+ // Copy the actual data buffers (the message body) into the buffer
153+ if (validity_bitmap)
154+ {
155+ memcpy (dst, validity_bitmap, validity_size);
156+ }
157+ else
158+ {
159+ // If validity_bitmap is null, it means there are no nulls
160+ memset (dst, 0xFF , validity_size);
161+ }
162+ dst += validity_size;
163+ if (data_buffer)
164+ {
165+ memcpy (dst, data_buffer, data_size);
166+ }
32167 }
168+
169+ // Release the memory managed by the C structures
170+ arrow_arr.release(&arrow_arr);
171+ arrow_schema.release(&arrow_schema);
172+
173+ // Return the final buffer containing the complete IPC stream
174+ return final_buffer;
175+ }
176+
177+ template <typename T>
178+ sparrow::primitive_array<T> deserialize_primitive_array (const std::vector<uint8_t >& buffer) {
179+ const uint8_t * buf_ptr = buffer.data ();
180+ size_t current_offset = 0 ;
181+
182+ // I - Deserialize the Schema message
183+ uint32_t schema_meta_len;
184+ memcpy (&schema_meta_len, buf_ptr + current_offset, sizeof (schema_meta_len));
185+ current_offset += sizeof (uint32_t );
186+ auto schema_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
187+ if (schema_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::Schema)
188+ {
189+ throw std::runtime_error (" Expected Schema message at the start of the buffer." );
190+ }
191+ auto flatbuffer_schema = static_cast <const org::apache::arrow::flatbuf::Schema*>(schema_message->header ());
192+ auto fields = flatbuffer_schema->fields ();
193+ if (fields->size () != 1 )
194+ {
195+ throw std::runtime_error (" Expected schema with exactly one field for primitive_array." );
196+ }
197+ bool is_nullable = fields->Get (0 )->nullable ();
198+ current_offset += schema_meta_len;
199+
200+ // II - Deserialize the RecordBatch message
201+ uint32_t batch_meta_len;
202+ memcpy (&batch_meta_len, buf_ptr + current_offset, sizeof (batch_meta_len));
203+ current_offset += sizeof (uint32_t );
204+ auto batch_message = org::apache::arrow::flatbuf::GetMessage (buf_ptr + current_offset);
205+ if (batch_message->header_type () != org::apache::arrow::flatbuf::MessageHeader::RecordBatch)
206+ {
207+ throw std::runtime_error (" Expected RecordBatch message, but got a different type." );
208+ }
209+ auto record_batch = static_cast <const org::apache::arrow::flatbuf::RecordBatch*>(batch_message->header ());
210+ current_offset += align_to_8 (batch_meta_len);
211+ const uint8_t * body_ptr = buf_ptr + current_offset;
212+
213+ // Extract metadata from the RecordBatch
214+ auto buffers_meta = record_batch->buffers ();
215+ auto nodes_meta = record_batch->nodes ();
216+ auto node_meta = nodes_meta->Get (0 );
217+
218+ // The body contains the validity bitmap and the data buffer concatenated
219+ // We need to copy this data into memory owned by the new ArrowArray
220+ int64_t validity_len = buffers_meta->Get (0 )->length ();
221+ int64_t data_len = buffers_meta->Get (1 )->length ();
222+
223+ uint8_t * validity_buffer_copy = new uint8_t [validity_len];
224+ memcpy (validity_buffer_copy, body_ptr + buffers_meta->Get (0 )->offset (), validity_len);
225+
226+ uint8_t * data_buffer_copy = new uint8_t [data_len];
227+ memcpy (data_buffer_copy, body_ptr + buffers_meta->Get (1 )->offset (), data_len);
228+
229+ // Get name
230+ std::optional<std::string_view> name;
231+ const flatbuffers::String* fb_name_flatbuffer = fields->Get (0 )->name ();
232+ if (fb_name_flatbuffer)
233+ {
234+ name = std::string_view (fb_name_flatbuffer->c_str (), fb_name_flatbuffer->size ());
235+ }
236+
237+ // Handle metadata
238+ std::optional<std::vector<sparrow::metadata_pair>> metadata;
239+ auto fb_metadata = fields->Get (0 )->custom_metadata ();
240+ if (fb_metadata && !fb_metadata->empty ())
241+ {
242+ metadata = std::vector<sparrow::metadata_pair>();
243+ metadata->reserve (fb_metadata->size ());
244+ for (const auto & kv : *fb_metadata)
245+ {
246+ metadata->emplace_back (kv->key ()->c_str (), kv->value ()->c_str ());
247+ }
248+ }
249+
250+ auto data = sparrow::u8_buffer<T>(reinterpret_cast <T*>(data_buffer_copy), node_meta->length ());
251+ auto bitmap = sparrow::validity_bitmap (validity_buffer_copy, node_meta->length ());
252+
253+ return sparrow::primitive_array<T>(std::move (data), node_meta->length (), std::move (bitmap), name, metadata);
33254}
0 commit comments