Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/sparrow_ipc/flatbuffer_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ namespace sparrow_ipc
{
std::vector<org::apache::arrow::flatbuf::Buffer> buffers;
int64_t offset = 0;
for (const auto& column : record_batch.columns())
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
fill_buffers_func(arrow_proxy, buffers, offset);
}
Expand Down
31 changes: 15 additions & 16 deletions src/flatbuffer_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,13 @@ namespace sparrow_ipc
::flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::Field>>>
create_children(flatbuffers::FlatBufferBuilder& builder, const sparrow::record_batch& record_batch)
{
const auto& columns = record_batch.columns();
std::vector<flatbuffers::Offset<org::apache::arrow::flatbuf::Field>> children_vec;
children_vec.reserve(columns.size());
children_vec.reserve(record_batch.nb_columns());
const auto names = record_batch.names();
for (size_t i = 0; i < columns.size(); ++i)
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(columns[i]).schema();
const auto& column = record_batch.get_column(i);
const auto& arrow_schema = sparrow::detail::array_access::get_arrow_proxy(column).schema();
flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(
builder,
arrow_schema,
Expand Down Expand Up @@ -523,9 +523,10 @@ namespace sparrow_ipc
create_fieldnodes(const sparrow::record_batch& record_batch)
{
std::vector<org::apache::arrow::flatbuf::FieldNode> nodes;
nodes.reserve(record_batch.columns().size());
for (const auto& column : record_batch.columns())
nodes.reserve(record_batch.nb_columns());
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
fill_fieldnodes(sparrow::detail::array_access::get_arrow_proxy(column), nodes);
}
return nodes;
Expand Down Expand Up @@ -608,16 +609,14 @@ namespace sparrow_ipc
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache)
{
return std::accumulate(
record_batch.columns().begin(),
record_batch.columns().end(),
int64_t{0},
[&](int64_t acc, const sparrow::array& arr)
{
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
return acc + calculate_body_size(arrow_proxy, compression, cache);
}
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work to instantiate the temporary view returned by record_batch only once before using it in std::accumulate? I.e. doing something like:

auto cols = record_batch.columns();
return std::accumulate(cols.begin(), cols.end(), ....);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also introduce a version of accumulate that accepts a range instead of a pair of iterators.

int64_t acc = 0;
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& arr = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(arr);
acc += calculate_body_size(arrow_proxy, compression, cache);
}
return acc;
}

flatbuffers::FlatBufferBuilder get_record_batch_message_builder(const sparrow::record_batch& record_batch,
Expand Down
2 changes: 1 addition & 1 deletion src/metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ namespace sparrow_ipc
);
return sparrow_metadata;
}
}
}
18 changes: 8 additions & 10 deletions src/serialize_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ namespace sparrow_ipc
std::optional<CompressionType> compression,
std::optional<std::reference_wrapper<CompressionCache>> cache)
{
std::for_each(record_batch.columns().begin(), record_batch.columns().end(), [&](const auto& column) {
for (size_t i = 0; i < record_batch.nb_columns(); ++i)
{
const auto& column = record_batch.get_column(i);
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
fill_body(arrow_proxy, stream, compression, cache);
});
}
}

std::size_t calculate_schema_message_size(const sparrow::record_batch& record_batch)
Expand Down Expand Up @@ -85,14 +87,10 @@ namespace sparrow_ipc
{
std::vector<sparrow::data_type> dtypes;
dtypes.reserve(rb.nb_columns());
std::ranges::transform(
rb.columns(),
std::back_inserter(dtypes),
[](const auto& col)
{
return col.data_type();
}
);
for (size_t i = 0; i < rb.nb_columns(); ++i)
{
dtypes.push_back(rb.get_column(i).data_type());
}
return dtypes;
}
}
4 changes: 2 additions & 2 deletions src/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace sparrow_ipc
{
std::vector<sparrow::data_type> dtypes;
dtypes.reserve(rb.nb_columns());
for (const auto& col : rb.columns())
for (size_t i = 0; i < rb.nb_columns(); ++i)
{
dtypes.push_back(col.data_type());
dtypes.push_back(rb.get_column(i).data_type());
}
return dtypes;
}
Expand Down
Loading