From 7ee49d22a9f1b88765ab5c7b40d1a6c5578b706e Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 07:52:40 -0500 Subject: [PATCH 1/8] Add failing test --- datafusion/core/tests/dataframe/mod.rs | 32 +++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c09db371912b..ece2cd64bd33 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -534,7 +534,8 @@ async fn drop_columns_with_nonexistent_columns() -> Result<()> { async fn drop_columns_with_empty_array() -> Result<()> { // build plan using Table API let t = test_table().await?; - let t2 = t.drop_columns(&[])?; + let drop_columns = vec![] as Vec<&str>; + let t2 = t.drop_columns(&drop_columns)?; let plan = t2.logical_plan().clone(); // build query using SQL @@ -549,6 +550,35 @@ async fn drop_columns_with_empty_array() -> Result<()> { Ok(()) } +#[tokio::test] +async fn drop_columns_qualified() -> Result<()> { + // build plan using Table API + let mut t = test_table().await?; + t = t.select_columns(&["c1", "c2", "c11"])?; + let mut t2 = test_table_with_name("another_table").await?; + t2 = t2.select_columns(&["c1", "c2", "c11"])?; + let mut t3 = t.join_on( + t2, + JoinType::Inner, + [col("aggregate_test_100.c1").eq(col("another_table.c1"))], + )?; + t3 = t3.drop_columns(&["another_table.c2", "another_table.c11"])?; + //t3 = t3.drop_columns_full(&["another_table.c2", "another_table.c11"])?; + + let plan = t3.logical_plan().clone(); + + let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON aggregate_test_100.c1 = another_table.c1"; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; + register_aggregate_csv(&ctx, "another_table").await?; + let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan(); + + // the two plans should be identical + assert_same_plan(&plan, &sql_plan); + + Ok(()) +} + #[tokio::test] async fn drop_with_quotes() -> Result<()> { // define data with a column name that has a "." in it: From 52d153ea65ebcb513e1d46df5a33980b4d414362 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 08:04:31 -0500 Subject: [PATCH 2/8] rust/issues/85077 --- datafusion/functions/src/utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index e4980728b18a..ad68e8cdd554 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -410,7 +410,7 @@ pub mod test { #[test] fn test_decimal32_to_i32() { - let cases: [(i32, i8, Either); _] = [ + let cases: [(i32, i8, Either); 10] = [ (123, 0, Either::Left(123)), (1230, 1, Either::Left(123)), (123000, 3, Either::Left(123)), @@ -456,7 +456,7 @@ pub mod test { #[test] fn test_decimal64_to_i64() { - let cases: [(i64, i8, Either); _] = [ + let cases: [(i64, i8, Either); 8] = [ (123, 0, Either::Left(123)), (1234567890, 2, Either::Left(12345678)), (-1234567890, 2, Either::Left(-12345678)), From 743ede5edad8c51cae5a77f2f11c95bd654ea359 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 08:07:50 -0500 Subject: [PATCH 3/8] Resolve failing test by coercing into column --- datafusion/core/src/dataframe/mod.rs | 10 +++++++--- datafusion/core/tests/dataframe/mod.rs | 1 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 0d060db3bf14..c146457b1d04 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -445,13 +445,17 @@ impl DataFrame { /// # Ok(()) /// # } /// ``` - pub fn drop_columns(self, columns: &[&str]) -> Result { + pub fn drop_columns(self, columns: &[T]) -> Result + where + T: Into + Clone, + { let fields_to_drop = columns .iter() - .flat_map(|name| { + .flat_map(|col| { + let column: Column = col.clone().into(); self.plan .schema() - .qualified_fields_with_unqualified_name(name) + .qualified_field_from_column(&column) }) .collect::>(); let expr: Vec = self diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index ece2cd64bd33..032929f3977c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -563,7 +563,6 @@ async fn drop_columns_qualified() -> Result<()> { [col("aggregate_test_100.c1").eq(col("another_table.c1"))], )?; t3 = t3.drop_columns(&["another_table.c2", "another_table.c11"])?; - //t3 = t3.drop_columns_full(&["another_table.c2", "another_table.c11"])?; let plan = t3.logical_plan().clone(); From 224deb9463d5d03cb7bd629d4cccfc99d8d60d35 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 09:04:27 -0500 Subject: [PATCH 4/8] Add helper for qualified column access --- datafusion/core/src/dataframe/mod.rs | 42 +++++++++++++++++ datafusion/core/tests/dataframe/mod.rs | 64 ++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c146457b1d04..154421cfc100 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2467,6 +2467,48 @@ impl DataFrame { .collect() } + /// Find qualified columns for this dataframe from names + /// + /// # Arguments + /// * `names` - Unqualified names to find. + /// + /// # Example + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::ScalarValue; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// ctx.register_csv("first_table", "tests/data/example.csv", CsvReadOptions::new()) + /// .await?; + /// let df = ctx.table("first_table").await?; + /// ctx.register_csv("second_table", "tests/data/example.csv", CsvReadOptions::new()) + /// .await?; + /// let df2 = ctx.table("second_table").await?; + /// let join_expr = df.find_qualified_columns(&["a"])?.iter() + /// .zip(df2.find_qualified_columns(&["a"])?.iter()) + /// .map(|(col1, col2)| col(*col1).eq(col(*col2))) + /// .collect::>(); + /// let df3 = df.join_on(df2, JoinType::Inner, join_expr)?; + /// # Ok(()) + /// # } + /// ``` + pub fn find_qualified_columns( + &self, + names: &[&str], + ) -> Result, &FieldRef)>> { + let schema = self.logical_plan().schema(); + names + .iter() + .map(|name| { + schema + .qualified_field_from_column(&Column::from_name(*name)) + .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) + }) + .collect() + } + /// Helper for creating DataFrame. /// # Example /// ``` diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 032929f3977c..28ca14ae3bd6 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -578,6 +578,70 @@ async fn drop_columns_qualified() -> Result<()> { Ok(()) } +#[tokio::test] +async fn drop_columns_qualified_find_qualified() -> Result<()> { + // build plan using Table API + let mut t = test_table().await?; + t = t.select_columns(&["c1", "c2", "c11"])?; + let mut t2 = test_table_with_name("another_table").await?; + t2 = t2.select_columns(&["c1", "c2", "c11"])?; + let mut t3 = t.join_on( + t2.clone(), + JoinType::Inner, + [col("aggregate_test_100.c1").eq(col("another_table.c1"))], + )?; + t3 = t3.drop_columns(&t2.find_qualified_columns(&["c2", "c11"])?)?; + + let plan = t3.logical_plan().clone(); + + let sql = "SELECT aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c11, another_table.c1 FROM (SELECT c1, c2, c11 FROM aggregate_test_100) INNER JOIN (SELECT c1, c2, c11 FROM another_table) ON aggregate_test_100.c1 = another_table.c1"; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; + register_aggregate_csv(&ctx, "another_table").await?; + let sql_plan = ctx.sql(sql).await?.into_unoptimized_plan(); + + // the two plans should be identical + assert_same_plan(&plan, &sql_plan); + + Ok(()) +} + +#[tokio::test] +async fn test_find_qualified_names() -> Result<()> { + let t = test_table().await?; + let column_names = ["c1", "c2", "c3"]; + let columns = t.find_qualified_columns(&column_names)?; + + // Expected results for each column + let binding = TableReference::bare("aggregate_test_100"); + let expected = vec![ + (Some(&binding), "c1"), + (Some(&binding), "c2"), + (Some(&binding), "c3"), + ]; + + // Verify we got the expected number of results + assert_eq!(columns.len(), expected.len(), "Expected {} columns, got {}", expected.len(), columns.len()); + + // Iterate over the results and check each one individually + for (i, (actual, expected)) in columns.iter().zip(expected.iter()).enumerate() { + let (actual_table_ref, actual_field_ref) = actual; + let (expected_table_ref, expected_field_name) = expected; + + // Check table reference + assert_eq!(actual_table_ref, expected_table_ref, + "Column {}: expected table reference {:?}, got {:?}", + i, expected_table_ref, actual_table_ref); + + // Check field name + assert_eq!(actual_field_ref.name(), *expected_field_name, + "Column {}: expected field name '{}', got '{}'", + i, expected_field_name, actual_field_ref.name()); + } + + Ok(()) +} + #[tokio::test] async fn drop_with_quotes() -> Result<()> { // define data with a column name that has a "." in it: From 95940c0cd7768b417a502bca325eb843f3e8cce7 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 09:04:42 -0500 Subject: [PATCH 5/8] Run pre-commit --- datafusion-cli/src/functions.rs | 2 +- datafusion/core/tests/dataframe/mod.rs | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index a45d57e8e952..c2e4e16569cc 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -421,7 +421,7 @@ impl TableFunctionImpl for ParquetMetadataFunc { compression_arr.push(format!("{:?}", column.compression())); // need to collect into Vec to format let encodings: Vec<_> = column.encodings().collect(); - encodings_arr.push(format!("{:?}", encodings)); + encodings_arr.push(format!("{encodings:?}")); index_page_offset_arr.push(column.index_page_offset()); dictionary_page_offset_arr.push(column.dictionary_page_offset()); data_page_offset_arr.push(column.data_page_offset()); diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 28ca14ae3bd6..ce9bb8d99411 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -614,11 +614,9 @@ async fn test_find_qualified_names() -> Result<()> { // Expected results for each column let binding = TableReference::bare("aggregate_test_100"); - let expected = vec![ - (Some(&binding), "c1"), + let expected = [(Some(&binding), "c1"), (Some(&binding), "c2"), - (Some(&binding), "c3"), - ]; + (Some(&binding), "c3")]; // Verify we got the expected number of results assert_eq!(columns.len(), expected.len(), "Expected {} columns, got {}", expected.len(), columns.len()); @@ -629,14 +627,11 @@ async fn test_find_qualified_names() -> Result<()> { let (expected_table_ref, expected_field_name) = expected; // Check table reference - assert_eq!(actual_table_ref, expected_table_ref, - "Column {}: expected table reference {:?}, got {:?}", - i, expected_table_ref, actual_table_ref); + assert_eq!(actual_table_ref, expected_table_ref, "Column {i}: expected table reference {expected_table_ref:?}, got {actual_table_ref:?}"); // Check field name assert_eq!(actual_field_ref.name(), *expected_field_name, - "Column {}: expected field name '{}', got '{}'", - i, expected_field_name, actual_field_ref.name()); + "Column {i}: expected field name '{expected_field_name}', got '{actual_field_ref}'"); } Ok(()) From fe63c803bd29b79ac60d177a9414b7d16a390af4 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:15:05 -0500 Subject: [PATCH 6/8] Handle duplicate unqualified name --- datafusion/core/src/dataframe/mod.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 154421cfc100..022fbabc541b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -453,11 +453,21 @@ impl DataFrame { .iter() .flat_map(|col| { let column: Column = col.clone().into(); - self.plan - .schema() - .qualified_field_from_column(&column) + match column.relation.as_ref() { + Some(_) => { + // qualified_field_from_column returns Result<(Option<&TableReference>, &FieldRef)> + vec![self.plan.schema().qualified_field_from_column(&column)] + } + None => { + // qualified_fields_with_unqualified_name returns Vec<(Option<&TableReference>, &FieldRef)> + self.plan.schema().qualified_fields_with_unqualified_name(&column.name) + .into_iter() + .map(|field| Ok(field)) + .collect::>() + } + } }) - .collect::>(); + .collect::, _>>()?; let expr: Vec = self .plan .schema() From 978b97360fcf2dd77dd6923e9bf0ca341906e91a Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:15:25 -0500 Subject: [PATCH 7/8] Run ci fmt --- datafusion/core/tests/dataframe/mod.rs | 40 +++++++++++++++++--------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index ce9bb8d99411..1ae6ef5c4a8b 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -611,29 +611,43 @@ async fn test_find_qualified_names() -> Result<()> { let t = test_table().await?; let column_names = ["c1", "c2", "c3"]; let columns = t.find_qualified_columns(&column_names)?; - + // Expected results for each column let binding = TableReference::bare("aggregate_test_100"); - let expected = [(Some(&binding), "c1"), + let expected = [ + (Some(&binding), "c1"), (Some(&binding), "c2"), - (Some(&binding), "c3")]; - + (Some(&binding), "c3"), + ]; + // Verify we got the expected number of results - assert_eq!(columns.len(), expected.len(), "Expected {} columns, got {}", expected.len(), columns.len()); - + assert_eq!( + columns.len(), + expected.len(), + "Expected {} columns, got {}", + expected.len(), + columns.len() + ); + // Iterate over the results and check each one individually for (i, (actual, expected)) in columns.iter().zip(expected.iter()).enumerate() { let (actual_table_ref, actual_field_ref) = actual; let (expected_table_ref, expected_field_name) = expected; - + // Check table reference - assert_eq!(actual_table_ref, expected_table_ref, "Column {i}: expected table reference {expected_table_ref:?}, got {actual_table_ref:?}"); - + assert_eq!( + actual_table_ref, expected_table_ref, + "Column {i}: expected table reference {expected_table_ref:?}, got {actual_table_ref:?}" + ); + // Check field name - assert_eq!(actual_field_ref.name(), *expected_field_name, - "Column {i}: expected field name '{expected_field_name}', got '{actual_field_ref}'"); + assert_eq!( + actual_field_ref.name(), + *expected_field_name, + "Column {i}: expected field name '{expected_field_name}', got '{actual_field_ref}'" + ); } - + Ok(()) } @@ -682,7 +696,7 @@ async fn drop_with_periods() -> Result<()> { let ctx = SessionContext::new(); ctx.register_batch("t", batch)?; - let df = ctx.table("t").await?.drop_columns(&["f.c1"])?; + let df = ctx.table("t").await?.drop_columns(&["\"f.c1\""])?; let df_results = df.collect().await?; From 3cd4e67f7e0650e25bd4096ecb49355ddbba68b5 Mon Sep 17 00:00:00 2001 From: ntjohnson1 <24689722+ntjohnson1@users.noreply.github.com> Date: Mon, 29 Dec 2025 10:53:42 -0500 Subject: [PATCH 8/8] One more pass with fmt and clippy --- datafusion/core/src/dataframe/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 022fbabc541b..a93146b07982 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -460,9 +460,11 @@ impl DataFrame { } None => { // qualified_fields_with_unqualified_name returns Vec<(Option<&TableReference>, &FieldRef)> - self.plan.schema().qualified_fields_with_unqualified_name(&column.name) + self.plan + .schema() + .qualified_fields_with_unqualified_name(&column.name) .into_iter() - .map(|field| Ok(field)) + .map(Ok) .collect::>() } }