@@ -158,18 +158,19 @@ namespace sparrow_ipc
158158 using compress_func = std::function<std::vector<uint8_t >(std::span<const uint8_t >)>;
159159 using decompress_func = std::function<std::vector<uint8_t >(std::span<const uint8_t >, int64_t )>;
160160
161- std::vector<std::uint8_t > lz4_compress (std::span<const std::uint8_t > data)
161+ std::vector<std::uint8_t > lz4_compress_with_header (std::span<const std::uint8_t > data)
162162 {
163163 const std::int64_t uncompressed_size = data.size ();
164164 const size_t max_compressed_size = LZ4F_compressFrameBound (uncompressed_size, nullptr );
165- std::vector<std::uint8_t > compressed_data ( max_compressed_size);
166- const size_t compressed_size = LZ4F_compressFrame (compressed_data .data (), max_compressed_size, data.data (), uncompressed_size, nullptr );
165+ std::vector<std::uint8_t > result (details::CompressionHeaderSize + max_compressed_size);
166+ const size_t compressed_size = LZ4F_compressFrame (result .data () + details::CompressionHeaderSize , max_compressed_size, data.data (), uncompressed_size, nullptr );
167167 if (LZ4F_isError (compressed_size))
168168 {
169169 throw std::runtime_error (" Failed to compress data with LZ4 frame format" );
170170 }
171- compressed_data.resize (compressed_size);
172- return compressed_data;
171+ memcpy (result.data (), &uncompressed_size, sizeof (uncompressed_size));
172+ result.resize (details::CompressionHeaderSize + compressed_size);
173+ return result;
173174 }
174175
175176 std::vector<std::uint8_t > lz4_decompress (std::span<const std::uint8_t > data, const std::int64_t decompressed_size)
@@ -188,18 +189,19 @@ namespace sparrow_ipc
188189 return decompressed_data;
189190 }
190191
191- std::vector<std::uint8_t > zstd_compress (std::span<const std::uint8_t > data)
192+ std::vector<std::uint8_t > zstd_compress_with_header (std::span<const std::uint8_t > data)
192193 {
193194 const std::int64_t uncompressed_size = data.size ();
194195 const size_t max_compressed_size = ZSTD_compressBound (uncompressed_size);
195- std::vector<std::uint8_t > compressed_data ( max_compressed_size);
196- const size_t compressed_size = ZSTD_compress (compressed_data .data (), max_compressed_size, data.data (), uncompressed_size, 1 );
196+ std::vector<std::uint8_t > result (details::CompressionHeaderSize + max_compressed_size);
197+ const size_t compressed_size = ZSTD_compress (result .data () + details::CompressionHeaderSize , max_compressed_size, data.data (), uncompressed_size, 1 );
197198 if (ZSTD_isError (compressed_size))
198199 {
199200 throw std::runtime_error (" Failed to compress data with ZSTD" );
200201 }
201- compressed_data.resize (compressed_size);
202- return compressed_data;
202+ memcpy (result.data (), &uncompressed_size, sizeof (uncompressed_size));
203+ result.resize (details::CompressionHeaderSize + compressed_size);
204+ return result;
203205 }
204206
205207 std::vector<std::uint8_t > zstd_decompress (std::span<const std::uint8_t > data, const std::int64_t decompressed_size)
@@ -213,14 +215,6 @@ namespace sparrow_ipc
213215 return decompressed_data;
214216 }
215217
216- void insert_compressed_data (std::vector<uint8_t >& result, std::int64_t original_size, std::vector<uint8_t >&& compressed_body)
217- {
218- result.reserve (details::CompressionHeaderSize + compressed_body.size ());
219- result.insert (result.end (), reinterpret_cast <const uint8_t *>(&original_size), reinterpret_cast <const uint8_t *>(&original_size) + sizeof (original_size));
220- // TODO Think about avoid copying here (on every uint8_t), maybe use a list of vectors (header + body) and serialize separately on top level code instead of including header at this point?
221- result.insert (result.end (), std::make_move_iterator (compressed_body.begin ()), std::make_move_iterator (compressed_body.end ()));
222- }
223-
224218 void insert_uncompressed_data (std::vector<uint8_t >& result, const std::span<const uint8_t >& data)
225219 {
226220 const std::int64_t header = -1 ;
@@ -244,24 +238,18 @@ namespace sparrow_ipc
244238 }
245239
246240 // Not in cache, compress and store
247- const std::int64_t original_size = data.size ();
248-
249- std::vector<std::uint8_t > compressed_body;
250241 if (comp_func)
251242 {
252- compressed_body = comp_func (data);
243+ auto compressed_with_header = comp_func (data);
244+ // Compression is effective
245+ if (compressed_with_header.size () - details::CompressionHeaderSize < data.size ())
246+ {
247+ return cache.store (buffer_ptr, buffer_size, std::move (compressed_with_header));
248+ }
253249 }
254250
255251 std::vector<uint8_t > result_vec;
256- if (comp_func && compressed_body.size () < static_cast <size_t >(original_size))
257- {
258- insert_compressed_data (result_vec, original_size, std::move (compressed_body));
259- }
260- else
261- {
262- insert_uncompressed_data (result_vec, data);
263- }
264-
252+ insert_uncompressed_data (result_vec, data);
265253 return cache.store (buffer_ptr, buffer_size, std::move (result_vec));
266254 }
267255
@@ -301,11 +289,11 @@ namespace sparrow_ipc
301289 {
302290 case CompressionType::LZ4_FRAME:
303291 {
304- return compress_with_header (data, lz4_compress , cache);
292+ return compress_with_header (data, lz4_compress_with_header , cache);
305293 }
306294 case CompressionType::ZSTD:
307295 {
308- return compress_with_header (data, zstd_compress , cache);
296+ return compress_with_header (data, zstd_compress_with_header , cache);
309297 }
310298 }
311299 assert (false && " Unhandled compression type" );
0 commit comments