Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
Expand Down
10 changes: 2 additions & 8 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* under the License.
*/

#include <charconv>
#include <format>
#include <mutex>
#include <sstream>
Expand All @@ -40,6 +39,7 @@
#include "iceberg/schema_util_internal.h"
#include "iceberg/util/formatter.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/visit_type.h"

namespace iceberg::avro {
Expand Down Expand Up @@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& attr_name,
return InvalidSchema("Missing avro attribute: {}", attr_name);
}

int32_t id;
const auto& id_value = id_str.value();
auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id);
if (ec != std::errc()) {
return InvalidSchema("Invalid {}: {}", attr_name, id_value);
}
return id;
return StringUtils::ParseInt<int32_t>(id_str.value());
}

Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
Expand Down
14 changes: 11 additions & 3 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1553,9 +1553,17 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), snapshot_id, type,
min_snapshots, max_snapshot_age,
max_ref_age);
if (type == SnapshotRefType::kTag) {
ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *tag);
} else {
ICEBERG_CHECK(type == SnapshotRefType::kBranch,
"Expected branch type for snapshot ref");
ICEBERG_ASSIGN_OR_RAISE(auto branch,
SnapshotRef::MakeBranch(snapshot_id, min_snapshots,
max_snapshot_age, max_ref_age));
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *branch);
}
}
if (action == kActionSetProperties) {
using StringMap = std::unordered_map<std::string, std::string>;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
Expand Down
91 changes: 91 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"

namespace iceberg {

Expand All @@ -49,6 +50,78 @@ SnapshotRefType SnapshotRef::type() const noexcept {
retention);
}

Status SnapshotRef::MinSnapshotsToKeep(std::optional<int32_t> value) {
ICEBERG_PRECHECK(this->type() != SnapshotRefType::kTag,
"Tags do not support setting minSnapshotsToKeep");
ICEBERG_PRECHECK(!value.has_value() || value.value() > 0,
"Min snapshots to keep must be greater than 0");
std::get<Branch>(this->retention).min_snapshots_to_keep = value;
return {};
}

Status SnapshotRef::MaxSnapshotAgeMs(std::optional<int64_t> value) {
ICEBERG_PRECHECK(this->type() != SnapshotRefType::kTag,
"Tags do not support setting maxSnapshotAgeMs");
ICEBERG_PRECHECK(!value.has_value() || value.value() > 0,
"Max snapshot age must be greater than 0 ms");
std::get<Branch>(this->retention).max_snapshot_age_ms = value;
return {};
}

Status SnapshotRef::MaxRefAgeMs(std::optional<int64_t> value) {
ICEBERG_PRECHECK(!value.has_value() || value.value() > 0,
"Max reference age must be greater than 0");
if (this->type() == SnapshotRefType::kBranch) {
std::get<Branch>(this->retention).max_ref_age_ms = value;
} else {
std::get<Tag>(this->retention).max_ref_age_ms = value;
}
return {};
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeBranch(
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep,
std::optional<int64_t> max_snapshot_age_ms, std::optional<int64_t> max_ref_age_ms) {
// Validate optional parameters
if (min_snapshots_to_keep.has_value() && min_snapshots_to_keep.value() <= 0) {
return InvalidArgument("Min snapshots to keep must be greater than 0");
}
if (max_snapshot_age_ms.has_value() && max_snapshot_age_ms.value() <= 0) {
return InvalidArgument("Max snapshot age must be greater than 0 ms");
}
if (max_ref_age_ms.has_value() && max_ref_age_ms.value() <= 0) {
return InvalidArgument("Max reference age must be greater than 0");
}

auto ref = std::make_unique<SnapshotRef>();
ref->snapshot_id = snapshot_id;
ref->retention = Branch{.min_snapshots_to_keep = min_snapshots_to_keep,
.max_snapshot_age_ms = max_snapshot_age_ms,
.max_ref_age_ms = max_ref_age_ms};
return ref;
}

Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeTag(
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms) {
// Validate optional parameter
if (max_ref_age_ms.has_value() && max_ref_age_ms.value() <= 0) {
return InvalidArgument("Max reference age must be greater than 0");
}

auto ref = std::make_unique<SnapshotRef>();
ref->snapshot_id = snapshot_id;
ref->retention = Tag{.max_ref_age_ms = max_ref_age_ms};
return ref;
}

std::unique_ptr<SnapshotRef> SnapshotRef::Clone(
std::optional<int64_t> new_snapshot_id) const {
auto ref = std::make_unique<SnapshotRef>();
ref->snapshot_id = new_snapshot_id.value_or(snapshot_id);
ref->retention = retention;
return ref;
}

bool SnapshotRef::Equals(const SnapshotRef& other) const {
if (this == &other) {
return true;
Expand All @@ -75,6 +148,24 @@ std::optional<std::string_view> Snapshot::operation() const {
return std::nullopt;
}

Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
if (it == summary.end()) {
return std::nullopt;
}

return StringUtils::ParseInt<int64_t>(it->second);
}

Result<std::optional<int64_t>> Snapshot::AddedRows() const {
auto it = summary.find(SnapshotSummaryFields::kAddedRows);
if (it == summary.end()) {
return std::nullopt;
}

return StringUtils::ParseInt<int64_t>(it->second);
}

bool Snapshot::Equals(const Snapshot& other) const {
if (this == &other) {
return true;
Expand Down
75 changes: 74 additions & 1 deletion src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <variant>

#include "iceberg/iceberg_export.h"
Expand Down Expand Up @@ -114,6 +113,51 @@ struct ICEBERG_EXPORT SnapshotRef {

SnapshotRefType type() const noexcept;

/// \brief Set the minimum number of snapshots to keep (branch only)
/// \param value The minimum number of snapshots to keep, or nullopt for default
/// \return Status indicating success or failure
Status MinSnapshotsToKeep(std::optional<int32_t> value);

/// \brief Set the maximum snapshot age in milliseconds (branch only)
/// \param value The maximum snapshot age in milliseconds, or nullopt for default
/// \return Status indicating success or failure
Status MaxSnapshotAgeMs(std::optional<int64_t> value);

/// \brief Set the maximum reference age in milliseconds
/// \param value The maximum reference age in milliseconds, or nullopt for default
/// \return Status indicating success or failure
Status MaxRefAgeMs(std::optional<int64_t> value);

/// \brief Create a branch reference
///
/// \param snapshot_id The snapshot ID for the branch
/// \param min_snapshots_to_keep Optional minimum number of snapshots to keep
/// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
/// validation failed
static Result<std::unique_ptr<SnapshotRef>> MakeBranch(
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep = std::nullopt,
std::optional<int64_t> max_snapshot_age_ms = std::nullopt,
std::optional<int64_t> max_ref_age_ms = std::nullopt);

/// \brief Create a tag reference
///
/// \param snapshot_id The snapshot ID for the tag
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
/// validation failed
static Result<std::unique_ptr<SnapshotRef>> MakeTag(
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms = std::nullopt);

/// \brief Clone this SnapshotRef with an optional new snapshot ID
///
/// \param new_snapshot_id Optional new snapshot ID. If not provided, uses the current
/// snapshot_id
/// \return A unique_ptr to the cloned SnapshotRef
std::unique_ptr<SnapshotRef> Clone(
std::optional<int64_t> new_snapshot_id = std::nullopt) const;

/// \brief Compare two snapshot refs for equality
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
return lhs.Equals(rhs);
Expand All @@ -129,6 +173,12 @@ struct SnapshotSummaryFields {
/// \brief The operation field key
inline static const std::string kOperation = "operation";

/// \brief The first row id field key
inline static const std::string kFirstRowId = "first-row-id";

/// \brief The added rows field key
inline static const std::string kAddedRows = "added-rows";

/// Metrics, see https://iceberg.apache.org/spec/#metrics

/// \brief Number of data files added in the snapshot
Expand Down Expand Up @@ -253,6 +303,29 @@ struct ICEBERG_EXPORT Snapshot {
/// unknown.
std::optional<std::string_view> operation() const;

/// \brief The row-id of the first newly added row in this snapshot.
///
/// All rows added in this snapshot will have a row-id assigned to them greater than
/// this value. All rows with a row-id less than this value were created in a snapshot
/// that was added to the table (but not necessarily committed to this branch) in the
/// past.
///
/// \return the first row-id to be used in this snapshot or nullopt when row lineage
/// is not supported
Result<std::optional<int64_t>> FirstRowId() const;

/// \brief The upper bound of number of rows with assigned row IDs in this snapshot.
///
/// It can be used safely to increment the table's `next-row-id` during a commit. It
/// can be more than the number of rows added in this snapshot and include some
/// existing rows.
///
/// This field is optional but is required when the table version supports row lineage.
///
/// \return the upper bound of number of rows with assigned row IDs in this snapshot
/// or nullopt if the value was not stored.
Result<std::optional<int64_t>> AddedRows() const;

/// \brief Compare two snapshots for equality.
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
return lhs.Equals(rhs);
Expand Down
20 changes: 10 additions & 10 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {

virtual ~Table();

/// \brief Return the identifier of this table
/// \brief Returns the identifier of this table
const TableIdentifier& name() const { return identifier_; }

/// \brief Returns the UUID of the table
Expand All @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// \brief Return the schema for this table, return NotFoundError if not found
Result<std::shared_ptr<Schema>> schema() const;

/// \brief Return a map of schema for this table
/// \brief Returns a map of schema for this table
Result<
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
schemas() const;

/// \brief Return the partition spec for this table, return NotFoundError if not found
/// \brief Returns the partition spec for this table, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;

/// \brief Return a map of partition specs for this table
/// \brief Returns a map of partition specs for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
specs() const;

/// \brief Return the sort order for this table, return NotFoundError if not found
/// \brief Returns the sort order for this table, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
/// \brief Returns a map of sort order IDs to sort orders for this table
Result<std::reference_wrapper<
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
sort_orders() const;

/// \brief Return a map of string properties for this table
/// \brief Returns the properties of this table
const TableProperties& properties() const;

/// \brief Return the table's metadata file location
/// \brief Returns the table's metadata file location
std::string_view metadata_file_location() const;

/// \brief Return the table's base location
/// \brief Returns the table's base location
std::string_view location() const;

/// \brief Returns the time when this table was last updated
TimePointMs last_updated_ms() const;

/// \brief Return the table's current snapshot, return NotFoundError if not found
/// \brief Returns the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

/// \brief Get the snapshot of this table with the given id
Expand Down
Loading
Loading