From a7879be51c64bbfc6a5238706aeb2781ac16dd69 Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Thu, 31 Jul 2025 14:56:24 +0200 Subject: [PATCH 1/2] DSN increment --- .../src/materialized_view/transaction/mod.rs | 4 +++ iceberg-rust/src/table/transaction/mod.rs | 31 ++++++++++++++++--- .../src/table/transaction/operation.rs | 12 +++++-- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 9b8893d9..5129c445 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -108,6 +108,7 @@ impl<'view> Transaction<'view> { data_files: old, delete_files: _, additional_summary: old_lineage, + dsn_increment: _, } = operation { old.extend_from_slice(&files); @@ -125,6 +126,7 @@ impl<'view> Transaction<'view> { REFRESH_STATE.to_owned(), refresh_state, )])), + dsn_increment: None, }); } Ok(self) @@ -143,6 +145,7 @@ impl<'view> Transaction<'view> { data_files: _, delete_files: old, additional_summary: old_lineage, + dsn_increment: _, } = operation { old.extend_from_slice(&files); @@ -160,6 +163,7 @@ impl<'view> Transaction<'view> { REFRESH_STATE.to_owned(), refresh_state, )])), + dsn_increment: None, }); } Ok(self) diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 66654b0a..7926e52b 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -116,15 +116,25 @@ impl<'table> TableTransaction<'table> { /// .commit() /// .await?; /// ``` - pub fn append_data(mut self, files: Vec) -> Self { + pub fn append_data(self, files: Vec) -> Self { + self.append_data_with_dsn_increment(files, None) + } + + /// Appends data files to the table, increasing the Data Sequence Number by a given amount + /// + pub fn append_data_with_dsn_increment(mut self, files: Vec, dsn_increment: Option) -> Self { let summary = append_summary(&files); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { - data_files: old, .. + data_files: old, + dsn_increment: old_dsn_increment, + .. } = operation { - old.extend_from_slice(&files); + if dsn_increment == *old_dsn_increment { + old.extend_from_slice(&files); + } } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { @@ -132,6 +142,7 @@ impl<'table> TableTransaction<'table> { data_files: files, delete_files: Vec::new(), additional_summary: summary, + dsn_increment, }); } self @@ -155,16 +166,25 @@ impl<'table> TableTransaction<'table> { /// .commit() /// .await?; /// ``` - pub fn append_delete(mut self, files: Vec) -> Self { + pub fn append_delete(self, files: Vec) -> Self { + self.append_delete_with_dsn_increment(files, None) + } + + /// Appends delete files to the table, increasing the Data Sequence Number by a given amount + /// + pub fn append_delete_with_dsn_increment(mut self, files: Vec, dsn_increment: Option) -> Self { if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { branch: _, data_files: _, delete_files: old, additional_summary: None, + dsn_increment: old_dsn_increment, } = operation { - old.extend_from_slice(&files); + if dsn_increment == *old_dsn_increment { + old.extend_from_slice(&files); + } } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { @@ -172,6 +192,7 @@ impl<'table> TableTransaction<'table> { data_files: Vec::new(), delete_files: files, additional_summary: None, + dsn_increment, }); } self diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index 5f045dba..bb44cf30 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -58,6 +58,7 @@ pub enum Operation { data_files: Vec, delete_files: Vec, additional_summary: Option>, + dsn_increment: Option, }, // /// Quickly append new files to the table // NewFastAppend { @@ -101,6 +102,7 @@ impl Operation { data_files, delete_files, additional_summary, + dsn_increment, } => { let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?; @@ -145,10 +147,16 @@ impl Operation { .into_iter() .chain(data_files.into_iter()) .map(|data_file| { - ManifestEntry::builder() + let mut builder = ManifestEntry::builder(); + builder .with_format_version(table_metadata.format_version) .with_status(Status::Added) - .with_data_file(data_file) + .with_data_file(data_file); + if let Some(dsn_increment) = dsn_increment { + builder.with_sequence_number(table_metadata.last_sequence_number + (dsn_increment as i64)); + } + + builder .build() .map_err(crate::spec::error::Error::from) .map_err(Error::from) From eb9af1a02a6cd4f13674bc5f03c8a2a372bddecc Mon Sep 17 00:00:00 2001 From: Sergei Patiakin Date: Thu, 31 Jul 2025 15:24:41 +0200 Subject: [PATCH 2/2] DataFileWithIncrement --- .../src/materialized_view/transaction/mod.rs | 31 +++++++++---- iceberg-rust/src/table/transaction/mod.rs | 46 ++++++++++++------- .../src/table/transaction/operation.rs | 28 +++++++---- 3 files changed, 71 insertions(+), 34 deletions(-) diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 5129c445..51acd03d 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -14,7 +14,10 @@ use crate::{ error::Error, table::{ delete_all_table_files, - transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX}, + transaction::{ + operation::{DataFileWithIncrement, Operation as TableOperation}, + APPEND_INDEX, REPLACE_INDEX, + }, }, view::transaction::operation::Operation as ViewOperation, }; @@ -102,16 +105,22 @@ impl<'view> Transaction<'view> { refresh_state: RefreshState, ) -> Result { let refresh_state = serde_json::to_string(&refresh_state)?; + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment: None, + }) + .collect(); if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, data_files: old, delete_files: _, additional_summary: old_lineage, - dsn_increment: _, } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -120,13 +129,12 @@ impl<'view> Transaction<'view> { } else { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), - data_files: files, + data_files: files_with_increments, delete_files: Vec::new(), additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state, )])), - dsn_increment: None, }); } Ok(self) @@ -139,16 +147,22 @@ impl<'view> Transaction<'view> { refresh_state: RefreshState, ) -> Result { let refresh_state = serde_json::to_string(&refresh_state)?; + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment: None, + }) + .collect(); if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, data_files: _, delete_files: old, additional_summary: old_lineage, - dsn_increment: _, } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -158,12 +172,11 @@ impl<'view> Transaction<'view> { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), data_files: Vec::new(), - delete_files: files, + delete_files: files_with_increments, additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state, )])), - dsn_increment: None, }); } Ok(self) diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 7926e52b..035b2b99 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference}; use crate::table::transaction::append::append_summary; +use crate::table::transaction::operation::DataFileWithIncrement; use crate::{catalog::commit::CommitTable, error::Error, table::Table}; use self::operation::Operation; @@ -122,27 +123,33 @@ impl<'table> TableTransaction<'table> { /// Appends data files to the table, increasing the Data Sequence Number by a given amount /// - pub fn append_data_with_dsn_increment(mut self, files: Vec, dsn_increment: Option) -> Self { + pub fn append_data_with_dsn_increment( + mut self, + files: Vec, + dsn_increment: Option, + ) -> Self { let summary = append_summary(&files); + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment, + }) + .collect(); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { - data_files: old, - dsn_increment: old_dsn_increment, - .. + data_files: old, .. } = operation { - if dsn_increment == *old_dsn_increment { - old.extend_from_slice(&files); - } + old.extend_from_slice(&files_with_increments); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), - data_files: files, + data_files: files_with_increments, delete_files: Vec::new(), additional_summary: summary, - dsn_increment, }); } self @@ -172,27 +179,34 @@ impl<'table> TableTransaction<'table> { /// Appends delete files to the table, increasing the Data Sequence Number by a given amount /// - pub fn append_delete_with_dsn_increment(mut self, files: Vec, dsn_increment: Option) -> Self { + pub fn append_delete_with_dsn_increment( + mut self, + files: Vec, + dsn_increment: Option, + ) -> Self { + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment, + }) + .collect(); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { branch: _, data_files: _, delete_files: old, additional_summary: None, - dsn_increment: old_dsn_increment, } = operation { - if dsn_increment == *old_dsn_increment { - old.extend_from_slice(&files); - } + old.extend_from_slice(&files_with_increments); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), data_files: Vec::new(), - delete_files: files, + delete_files: files_with_increments, additional_summary: None, - dsn_increment, }); } self diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index bb44cf30..062b43bf 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -37,6 +37,13 @@ use super::append::split_datafiles; /// The target number of datafiles per manifest is dynamic, but we don't want to go below this number. static MIN_DATAFILES_PER_MANIFEST: usize = 4; +#[derive(Debug, Clone)] +///Table operations +pub struct DataFileWithIncrement { + pub data_file: DataFile, + pub dsn_increment: Option, +} + #[derive(Debug)] ///Table operations pub enum Operation { @@ -55,10 +62,9 @@ pub enum Operation { /// Append new files to the table Append { branch: Option, - data_files: Vec, - delete_files: Vec, + data_files: Vec, + delete_files: Vec, additional_summary: Option>, - dsn_increment: Option, }, // /// Quickly append new files to the table // NewFastAppend { @@ -102,7 +108,6 @@ impl Operation { data_files, delete_files, additional_summary, - dsn_increment, } => { let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?; @@ -118,7 +123,10 @@ impl Operation { return Ok((None, Vec::new())); } - let data_files_iter = delete_files.iter().chain(data_files.iter()); + let data_files_iter = delete_files + .iter() + .chain(data_files.iter()) + .map(|f| &f.data_file); let manifest_list_writer = if let Some(manifest_list_bytes) = prefetch_manifest_list(old_snapshot, &object_store) @@ -146,14 +154,16 @@ impl Operation { delete_files .into_iter() .chain(data_files.into_iter()) - .map(|data_file| { + .map(|dfi| { let mut builder = ManifestEntry::builder(); builder .with_format_version(table_metadata.format_version) .with_status(Status::Added) - .with_data_file(data_file); - if let Some(dsn_increment) = dsn_increment { - builder.with_sequence_number(table_metadata.last_sequence_number + (dsn_increment as i64)); + .with_data_file(dfi.data_file); + if let Some(dsn_increment) = dfi.dsn_increment { + builder.with_sequence_number( + table_metadata.last_sequence_number + (dsn_increment as i64), + ); } builder