From 3e6f391e977d838157815b2fe9d03a391c852764 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 18 Dec 2025 17:27:20 +0800 Subject: [PATCH 1/2] feat: update pgwire to 0.37 --- Cargo.lock | 15 +++- Cargo.toml | 2 +- arrow-pg/src/row_encoder.rs | 9 ++- datafusion-postgres/src/handlers.rs | 99 ++++++++++------------- datafusion-postgres/src/hooks/set_show.rs | 2 +- 5 files changed, 62 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89a7d21..42038ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2774,9 +2774,7 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.36.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a2bcdcc4b20a88e0648778ecf00415bbd5b447742275439c22176835056f99" +version = "0.37.0" dependencies = [ "async-trait", "base64", @@ -2795,6 +2793,7 @@ dependencies = [ "ryu", "serde", "serde_json", + "smol_str", "stringprep", "thiserror", "tokio", @@ -3539,6 +3538,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smol_str" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3498b0a27f93ef1402f20eefacfaa1691272ac4eca1cdc8c596cb0a245d6cbf5" +dependencies = [ + "borsh", + "serde_core", +] + [[package]] name = "snap" version = "1.1.1" diff --git a/Cargo.toml b/Cargo.toml index f7c11cc..732b492 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ bytes = "1.11.0" chrono = { version = "0.4", features = ["std"] } datafusion = { version = "51", default-features = false } futures = "0.3" -pgwire = { version = "0.36.3", default-features = false } +pgwire = { version = "0.37", default-features = false } postgres-types = "0.2" rust_decimal = { version = "1.39", features = ["db-postgres"] } tokio = { version = "1", default-features = false } diff --git a/arrow-pg/src/row_encoder.rs b/arrow-pg/src/row_encoder.rs index 674751b..1cbce40 100644 --- a/arrow-pg/src/row_encoder.rs +++ b/arrow-pg/src/row_encoder.rs @@ -17,6 +17,7 @@ pub struct RowEncoder { rb: RecordBatch, curr_idx: usize, fields: Arc>, + row_encoder: DataRowEncoder, } impl RowEncoder { @@ -24,8 +25,9 @@ impl RowEncoder { assert_eq!(rb.num_columns(), fields.len()); Self { rb, - fields, + fields: fields.clone(), curr_idx: 0, + row_encoder: DataRowEncoder::new(fields), } } @@ -33,14 +35,13 @@ impl RowEncoder { if self.curr_idx == self.rb.num_rows() { return None; } - let mut encoder = DataRowEncoder::new(self.fields.clone()); for col in 0..self.rb.num_columns() { let array = self.rb.column(col); let field = &self.fields[col]; - encode_value(&mut encoder, array, self.curr_idx, field).unwrap(); + encode_value(&mut self.row_encoder, array, self.curr_idx, field).unwrap(); } self.curr_idx += 1; - Some(encoder.finish()) + Some(Ok(self.row_encoder.take_row())) } } diff --git a/datafusion-postgres/src/handlers.rs b/datafusion-postgres/src/handlers.rs index 731dc8e..ea5c953 100644 --- a/datafusion-postgres/src/handlers.rs +++ b/datafusion-postgres/src/handlers.rs @@ -13,11 +13,8 @@ use pgwire::api::auth::noop::NoopStartupHandler; use pgwire::api::auth::StartupHandler; use pgwire::api::portal::{Format, Portal}; use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler}; -use pgwire::api::results::{ - DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag, -}; +use pgwire::api::results::{FieldInfo, Response, Tag}; use pgwire::api::stmt::QueryParser; -use pgwire::api::stmt::StoredStatement; use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers, Type}; use pgwire::error::{PgWireError, PgWireResult}; use pgwire::types::format::FormatOptions; @@ -202,58 +199,6 @@ impl ExtendedQueryHandler for DfSessionService { self.parser.clone() } - async fn do_describe_statement( - &self, - _client: &mut C, - target: &StoredStatement, - ) -> PgWireResult - where - C: ClientInfo + Unpin + Send + Sync, - { - if let (_, Some((_, plan))) = &target.statement { - let schema = plan.schema(); - let fields = - arrow_schema_to_pg_fields(schema.as_arrow(), &Format::UnifiedBinary, None)?; - let params = plan - .get_parameter_types() - .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - - let mut param_types = Vec::with_capacity(params.len()); - for param_type in ordered_param_types(¶ms).iter() { - // Fixed: Use ¶ms - if let Some(datatype) = param_type { - let pgtype = into_pg_type(datatype)?; - param_types.push(pgtype); - } else { - param_types.push(Type::UNKNOWN); - } - } - - Ok(DescribeStatementResponse::new(param_types, fields)) - } else { - Ok(DescribeStatementResponse::no_data()) - } - } - - async fn do_describe_portal( - &self, - _client: &mut C, - target: &Portal, - ) -> PgWireResult - where - C: ClientInfo + Unpin + Send + Sync, - { - if let (_, Some((_, plan))) = &target.statement.statement { - let format = &target.result_column_format; - let schema = plan.schema(); - let fields = arrow_schema_to_pg_fields(schema.as_arrow(), format, None)?; - - Ok(DescribePortalResponse::new(fields)) - } else { - Ok(DescribePortalResponse::no_data()) - } - } - async fn do_query( &self, client: &mut C, @@ -433,6 +378,48 @@ impl QueryParser for Parser { .map_err(|e| PgWireError::ApiError(Box::new(e)))?; Ok((query, Some((statement, logical_plan)))) } + + fn get_parameter_types(&self, stmt: &Self::Statement) -> PgWireResult> { + if let (_, Some((_, plan))) = stmt { + let params = plan + .get_parameter_types() + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + + let mut param_types = Vec::with_capacity(params.len()); + for param_type in ordered_param_types(¶ms).iter() { + // Fixed: Use ¶ms + if let Some(datatype) = param_type { + let pgtype = into_pg_type(datatype)?; + param_types.push(pgtype); + } else { + param_types.push(Type::UNKNOWN); + } + } + + Ok(param_types) + } else { + Ok(vec![]) + } + } + + fn get_result_schema( + &self, + stmt: &Self::Statement, + column_format: Option<&Format>, + ) -> PgWireResult> { + if let (_, Some((_, plan))) = stmt { + let schema = plan.schema(); + let fields = arrow_schema_to_pg_fields( + schema.as_arrow(), + column_format.unwrap_or(&Format::UnifiedBinary), + None, + )?; + + Ok(fields) + } else { + Ok(vec![]) + } + } } fn ordered_param_types(types: &HashMap>) -> Vec> { diff --git a/datafusion-postgres/src/hooks/set_show.rs b/datafusion-postgres/src/hooks/set_show.rs index cb5332f..93c16c5 100644 --- a/datafusion-postgres/src/hooks/set_show.rs +++ b/datafusion-postgres/src/hooks/set_show.rs @@ -111,7 +111,7 @@ fn mock_show_response(name: &str, value: &str) -> PgWireResult { let row = { let mut encoder = DataRowEncoder::new(Arc::new(fields.clone())); encoder.encode_field(&Some(value))?; - encoder.finish() + Ok(encoder.take_row()) }; let row_stream = futures::stream::once(async move { row }); From 9e4c938f0fd044d6bf87f494312a39771f957804 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 18 Dec 2025 17:33:27 +0800 Subject: [PATCH 2/2] chore: update msrv --- .github/workflows/ci.yml | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 849768e..4334596 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,6 +91,6 @@ jobs: - uses: actions/checkout@v4 - uses: actions-rs/toolchain@v1 with: - toolchain: "1.88.0" + toolchain: "1.89" override: true - run: cargo build --all-features diff --git a/Cargo.toml b/Cargo.toml index 732b492..2e3f09f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = ["datafusion-postgres", "datafusion-postgres-cli", "arrow-pg", "datafu [workspace.package] edition = "2021" license = "Apache-2.0" -rust-version = "1.88.0" +rust-version = "1.89" authors = ["Ning Sun "] keywords = ["database", "postgresql", "datafusion"] homepage = "https://github.com/datafusion-contrib/datafusion-postgres/"