Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \return instance of Table implementation referred to by identifier or
/// ErrorKind::kNoSuchTable if the table does not exist
virtual Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) = 0;
virtual Result<std::shared_ptr<Table>> LoadTable(
const TableIdentifier& identifier) const = 0;

/// \brief Register a table with the catalog if it does not exist
///
Expand Down
5 changes: 3 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier& from,
}

Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) {
const TableIdentifier& identifier) const {
if (!file_io_) [[unlikely]] {
return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
}
Expand All @@ -480,8 +480,9 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(

ICEBERG_ASSIGN_OR_RAISE(auto metadata,
TableMetadataUtil::Read(*file_io_, metadata_location));
auto non_const_catalog = std::const_pointer_cast<InMemoryCatalog>(shared_from_this());
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
file_io_, shared_from_this());
file_io_, non_const_catalog);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class ICEBERG_EXPORT InMemoryCatalog

Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;

Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
Result<std::shared_ptr<Table>> LoadTable(
const TableIdentifier& identifier) const override;

Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
Expand Down
18 changes: 9 additions & 9 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ Status HandleFailureResponse(const cpr::Response& response,
} // namespace

void HttpClient::PrepareSession(
const std::string& path,
const std::unordered_map<std::string, std::string>& request_headers,
const std::unordered_map<std::string, std::string>& params) {
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& request_headers) {
session_->SetUrl(cpr::Url{path});
session_->SetParameters(GetParameters(params));
session_->RemoveContent();
Expand All @@ -164,7 +163,7 @@ Result<HttpResponse> HttpClient::Get(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers, params);
PrepareSession(path, params, headers);
response = session_->Get();
}

Expand All @@ -181,7 +180,7 @@ Result<HttpResponse> HttpClient::Post(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, /*params=*/{}, headers);
session_->SetBody(cpr::Body{body});
response = session_->Post();
}
Expand All @@ -206,7 +205,7 @@ Result<HttpResponse> HttpClient::PostForm(
auto form_headers = headers;
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;

PrepareSession(path, form_headers);
PrepareSession(path, /*params=*/{}, form_headers);
std::vector<cpr::Pair> pair_list;
pair_list.reserve(form_data.size());
for (const auto& [key, val] : form_data) {
Expand All @@ -229,7 +228,7 @@ Result<HttpResponse> HttpClient::Head(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, /*params=*/{}, headers);
response = session_->Head();
}

Expand All @@ -240,12 +239,13 @@ Result<HttpResponse> HttpClient::Head(
}

Result<HttpResponse> HttpClient::Delete(
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, headers);
PrepareSession(path, params, headers);
response = session_->Delete();
}

Expand Down
7 changes: 4 additions & 3 deletions src/iceberg/catalog/rest/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient {

/// \brief Sends a DELETE request.
Result<HttpResponse> Delete(const std::string& path,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler);

private:
void PrepareSession(const std::string& path,
const std::unordered_map<std::string, std::string>& request_headers,
const std::unordered_map<std::string, std::string>& params = {});
void PrepareSession(
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& request_headers);

std::unordered_map<std::string, std::string> default_headers_;

Expand Down
72 changes: 58 additions & 14 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ Status RestCatalog::DropNamespace(const Namespace& ns) {
ICEBERG_RETURN_UNEXPECTED(
CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace()));
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(const auto response,
client_->Delete(path, /*params=*/{}, /*headers=*/{},
*DropNamespaceErrorHandler::Instance()));
return {};
}

Expand All @@ -204,17 +205,16 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
return false;
}
ICEBERG_RETURN_UNEXPECTED(result);
// GET succeeded, namespace exists
// GetNamespaceProperties succeeded, namespace exists
return true;
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
auto response_or_error =
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance());
if (!response_or_error.has_value()) {
const auto& error = response_or_error.error();
// catch NoSuchNamespaceException/404 and return false
if (error.kind == ErrorKind::kNoSuchNamespace) {
if (response_or_error.error().kind == ErrorKind::kNoSuchNamespace) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(response_or_error);
Expand Down Expand Up @@ -294,14 +294,44 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
return NotImplemented("Not implemented");
}

Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] bool purge) {
return NotImplemented("Not implemented");
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::DeleteTable()));
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

std::unordered_map<std::string, std::string> params;
if (purge) {
params["purgeRequested"] = "true";
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
return {};
}

Result<bool> RestCatalog::TableExists(
[[maybe_unused]] const TableIdentifier& identifier) const {
return NotImplemented("Not implemented");
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
auto check = CheckEndpoint(supported_endpoints_, Endpoint::TableExists());
if (!check.has_value()) {
// Fall back to LoadTable endpoint (GET)
auto result = LoadTable(identifier);
if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchTable) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(result);
// LoadTable succeeded, table exists
return true;
}

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
auto response_or_error =
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance());
if (!response_or_error.has_value()) {
// catch NoSuchTableException/404 and return false
if (response_or_error.error().kind == ErrorKind::kNoSuchTable) {
return false;
}
ICEBERG_RETURN_UNEXPECTED(response_or_error);
}
return true;
}

Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
Expand All @@ -310,8 +340,22 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
}

Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
[[maybe_unused]] const TableIdentifier& identifier) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier) const {
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::LoadTable()));
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));

// TODO(Feiyang Li): support load metadata table
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
// Cast away const since Table needs non-const Catalog pointer for mutations
auto non_const_catalog = std::const_pointer_cast<RestCatalog>(shared_from_this());
return Table::Make(identifier, load_result.metadata,
std::move(load_result.metadata_location), file_io_,
non_const_catalog);
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,

Status DropTable(const TableIdentifier& identifier, bool purge) override;

Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
Result<std::shared_ptr<Table>> LoadTable(
const TableIdentifier& identifier) const override;

Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/mock_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MockCatalog : public Catalog {
(override));

MOCK_METHOD((Result<std::shared_ptr<Table>>), LoadTable, (const TableIdentifier&),
(override));
(const, override));

MOCK_METHOD((Result<std::shared_ptr<Table>>), RegisterTable,
(const TableIdentifier&, const std::string&), (override));
Expand Down
Loading
Loading