-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-48467: [C++][Parquet] Add configure to limit the row group size #48468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
Thanks for working on this! Since I'm still new to the Arrow codebase, I reviewed the PR at a high level and it helped me understand how WriterProperties and row group configuration are implemented. I don’t have enough experience yet to provide a full technical review, but the approach looks consistent with the design discussed in the issue. Thanks again for sharing this! |
| }; | ||
|
|
||
| // Max number of rows allowed in a row group. | ||
| const int64_t max_row_group_length = this->properties().max_row_group_length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we should validate that such config value is positive.
If it's set to 0, the processor would never exit the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should validate this in the properties.h when it is being set?
HuaHuaY
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| row_group_writer_->num_rows(); | ||
| chunk_size = std::min( | ||
| chunk_size, | ||
| static_cast<int64_t>(this->properties().max_row_group_bytes() / avg_row_size)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be rows written in row_group_writer_? The condition contains row_group_writer_->num_rows() > 0 so I guess you think the answer is true. Then why don’t need to subtract these?
int64_t buffered_bytes = row_group_writer_->current_buffered_bytes();
double avg_row_bytes = buffered_bytes * 1.0 / group_rows;
chunk_size = std::min(
chunk_size,
static_cast<int64_t>((this->properties().max_row_group_bytes() - buffered_bytes) / avg_row_bytes));There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually each batch will be written to a new row group, we just use the avg_row_bytes to estimate batch_size, and the data will not be appended to existing row group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it.
| return this; | ||
| } | ||
|
|
||
| /// Specify the max number of bytes to put in a single row group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size is after encoding and compression, right? It would be good to document this.
cpp/src/parquet/file_writer.cc
Outdated
| return total_compressed_bytes_written; | ||
| } | ||
|
|
||
| int64_t estimated_buffered_value_bytes() const override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| int64_t estimated_buffered_value_bytes() const override { | |
| int64_t EstimatedBufferedValueBytes() const override { |
This is not a trivial getter so we need to use initial-capitalized camel form. Unfortunately functions like total_compressed_bytes() have already used the wrong form so it looks confusing. :/
cpp/src/parquet/file_writer.cc
Outdated
| return contents_->total_compressed_bytes_written(); | ||
| } | ||
|
|
||
| int64_t RowGroupWriter::current_buffered_bytes() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name is a little misleading because readers may think it is same as contents_->estimated_buffered_value_bytes().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to total_buffered_bytes()
| chunk_size = this->properties().max_row_group_length(); | ||
| } | ||
| // max_row_group_bytes is applied only after the row group has accumulated data. | ||
| if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_group_writer_->num_rows() > 0 can only happen when the current row group writer is in the buffered mode. Usually users calling WriteTable will never use buffered mode so this approach seems not working in the majority of cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead, can we gather this information from all written row groups (if available)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac If user use the static WriteTable function, the arrow FileWriter is always recreated and we can not gather the old written row groups.
arrow/cpp/src/parquet/arrow/writer.cc
Lines 591 to 601 in 8040f2a
| Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool, | |
| std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size, | |
| std::shared_ptr<WriterProperties> properties, | |
| std::shared_ptr<ArrowWriterProperties> arrow_properties) { | |
| std::unique_ptr<FileWriter> writer; | |
| ARROW_ASSIGN_OR_RAISE( | |
| writer, FileWriter::Open(*table.schema(), pool, std::move(sink), | |
| std::move(properties), std::move(arrow_properties))); | |
| RETURN_NOT_OK(writer->WriteTable(table, chunk_size)); | |
| return writer->Close(); | |
| } |
If user use the internal WriteTable function, we can get avg_row_bytes by last row_group_writer_ or gathering all previous row group writers.
arrow/cpp/src/parquet/arrow/writer.cc
Line 394 in 8040f2a
| Status WriteTable(const Table& table, int64_t chunk_size) override { |
| }; | ||
|
|
||
| // Max number of rows allowed in a row group. | ||
| const int64_t max_row_group_length = this->properties().max_row_group_length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should validate this in the properties.h when it is being set?
| int64_t group_rows = row_group_writer_->num_rows(); | ||
| int64_t batch_size = | ||
| std::min(max_row_group_length - group_rows, batch.num_rows() - offset); | ||
| if (group_rows > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my comment above, should we consider all written row groups as well to estimate the average row size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change to use all written row groups, then the first row group size can only be determined by max_row_group_length, is it OK or just use current row group writer's buffered data?
Rationale for this change
Limit the row group size.
What changes are included in this PR?
Add a new config
parquet::WriterProperties::max_row_group_bytes.Are these changes tested?
Yes, add unit test.
Are there any user-facing changes?
Yes, user could use the new config to limit row group size.