From e350cfd232c8c0703855a58e3095caa2da4a1d53 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 10:04:28 -0800 Subject: [PATCH 1/8] format --- asv_bench/benchmarks/groupby.py | 84 +++++++++++ pandas/core/arrays/arrow/array.py | 159 +++++++++++++++++++- pandas/tests/extension/test_arrow.py | 207 +++++++++++++++++++++++++++ 3 files changed, 448 insertions(+), 2 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 7c1d6457eea15..08be81eaa50dd 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -1152,4 +1152,88 @@ def time_resample_multiindex(self): ).mean() +class GroupByAggregateArrowDtypes: + param_names = ["dtype", "method"] + params = [ + [ + "bool[pyarrow]", + "int32[pyarrow]", + "float64[pyarrow]", + "decimal128(25, 3)[pyarrow]", + "string[pyarrow]", + "timestamp[s, tz=UTC][pyarrow]", + "duration[ms][pyarrow]", + ], + [ + "any", + "all", + "sum", + "prod", + "min", + "max", + "mean", + "std", + "var", + "sem", + "count", + "median", + ], + ] + + def setup(self, dtype, method): + import pyarrow as pa + + # Parse dtype string + if dtype.startswith("decimal128"): + pa_type = pa.decimal128(25, 3) + elif dtype.startswith("timestamp"): + pa_type = pa.timestamp("s", "UTC") + elif dtype.startswith("duration"): + pa_type = pa.duration("ms") + elif dtype == "bool[pyarrow]": + pa_type = pa.bool_() + elif dtype == "int32[pyarrow]": + pa_type = pa.int32() + elif dtype == "float64[pyarrow]": + pa_type = pa.float64() + elif dtype == "string[pyarrow]": + pa_type = pa.string() + else: + raise ValueError(f"Unsupported dtype: {dtype}") + + size = 100_000 + ncols = 5 + columns = list("abcde") + + # Generate data based on type + if pa.types.is_floating(pa_type): + data = np.random.randn(size, ncols) + elif pa.types.is_integer(pa_type): + data = np.random.randint(0, 10_000, (size, ncols)) + elif pa.types.is_decimal(pa_type): + from decimal import Decimal + + data = np.random.randn(size, ncols).round(3) + data = [[Decimal(str(x)) for x in row] for row in data] + elif pa.types.is_boolean(pa_type): + data = np.random.choice([True, False], (size, ncols)) + elif pa.types.is_timestamp(pa_type): + data = np.random.randint(0, 1_000_000, (size, ncols)) + elif pa.types.is_duration(pa_type): + data = np.random.randint(0, 1_000_000, (size, ncols)) + elif pa.types.is_string(pa_type): + data = np.random.choice(list(ascii_letters), (size, ncols)) + else: + raise ValueError(f"Unsupported pyarrow type: {pa_type}") + + df = DataFrame(data, columns=columns, dtype=dtype) + # Add some NAs + df.iloc[::10, ::2] = NA + df["key"] = np.random.randint(0, 100, size) + self.df = df + + def time_frame_agg(self, dtype, method): + self.df.groupby("key").agg(method) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index edf1d7ddcaa76..8a10d3900ff89 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2601,6 +2601,143 @@ def _to_masked(self): arr = self.to_numpy(dtype=dtype.numpy_dtype, na_value=na_value) return dtype.construct_array_type()(arr, mask) + # Mapping from pandas groupby 'how' to PyArrow aggregation function names + _PYARROW_AGG_FUNCS: dict[str, str] = { + "sum": "sum", + "prod": "product", + "min": "min", + "max": "max", + "mean": "mean", + "std": "stddev", + "var": "variance", + "sem": "stddev", # sem = stddev / sqrt(count), computed below + "count": "count", + "any": "any", + "all": "all", + } + + # Default values for missing groups (identity elements for each operation) + _PYARROW_AGG_DEFAULTS: dict[str, int | bool] = { + "sum": 0, + "prod": 1, + "count": 0, + "any": False, + "all": True, + } + + def _groupby_op_pyarrow( + self, + *, + how: str, + min_count: int, + ngroups: int, + ids: npt.NDArray[np.intp], + **kwargs, + ) -> Self | None: + """ + Perform groupby aggregation using PyArrow's native Table.group_by. + + Returns None if the operation is not supported by PyArrow, + in which case the caller should fall back to the Cython path. + """ + pa_agg_func = self._PYARROW_AGG_FUNCS.get(how) + if pa_agg_func is None: + return None + + pa_type = self._pa_array.type + # PyArrow doesn't support these aggregations for temporal types directly + if pa.types.is_temporal(pa_type) and how in ["std", "var", "sem"]: + return None + + # PyArrow's any/all only work on boolean types + if how in ["any", "all"] and not pa.types.is_boolean(pa_type): + return None + + # Filter out NA group (ids == -1) to avoid unnecessary computation + mask = ids >= 0 + if not mask.all(): + ids = ids[mask] + values = pc.filter(self._pa_array, mask) + else: + values = self._pa_array + + # Create a PyArrow table with the values and group IDs + # Explicitly cast ids to int64 since np.intp is platform-dependent + group_id_arr = pa.array(ids, type=pa.int64()) + table = pa.table({"value": values, "group_id": group_id_arr}) + + # Build aggregation list - always include count for null handling + # For std/var/sem, pass VarianceOptions with ddof to match pandas behavior + if how in ["std", "var", "sem"]: + ddof = kwargs.get("ddof", 1) + agg_with_opts = ("value", pa_agg_func, pc.VarianceOptions(ddof=ddof)) + aggs = [agg_with_opts, ("value", "count")] + else: + aggs = [("value", pa_agg_func), ("value", "count")] + + # Perform the groupby aggregation + result_table = table.group_by("group_id").aggregate(aggs) + + # Extract results + result_group_ids = result_table.column("group_id") + result_values = result_table.column(f"value_{pa_agg_func}") + result_counts = result_table.column("value_count") + + # For sem, compute stddev / sqrt(count) using PyArrow compute + if how == "sem": + sqrt_counts = pc.sqrt(result_counts) + result_values = pc.divide(result_values, sqrt_counts) + + output_type = result_values.type + default_value = pa.scalar(self._PYARROW_AGG_DEFAULTS.get(how), type=output_type) + + # Handle nulls from all-null groups for sum/prod with min_count=0 + if result_values.null_count > 0 and how in ["sum", "prod"] and min_count == 0: + result_values = pc.if_else( + pc.is_null(result_values), default_value, result_values + ) + + # Handle min_count: groups with count < min_count should be null + if min_count > 0: + below_min_count = pc.less(result_counts, pa.scalar(min_count)) + result_values = pc.if_else(below_min_count, None, result_values) + + # PyArrow returns results in encounter order. We need to reorder to + # match expected output (group 0, 1, 2, ..., ngroups-1) and fill + # missing groups with default values. + # + # We use NumPy scatter (O(n)) instead of: + # - pc.scatter: doesn't handle missing groups, workaround is slower + # - join+sort: O(n log n), slower for high-cardinality groupby + # + # Explicitly cast to int64 to ensure usable as NumPy indices + result_group_ids_np = result_group_ids.to_numpy(zero_copy_only=False).astype( + np.int64, copy=False + ) + result_values_np = result_values.to_numpy(zero_copy_only=False) + + default_py = default_value.as_py() + if default_py is not None and min_count == 0: + # Operations with identity elements (sum=0, prod=1, count=0, any=False, + # all=True): fill missing groups with default value + output_np = np.full(ngroups, default_py, dtype=result_values_np.dtype) + output_np[result_group_ids_np] = result_values_np + pa_result = pa.array(output_np, type=output_type) + else: + # Operations without identity elements (mean, std, var, min, max, sem): + # fill missing groups with null using a boolean mask + output_np = np.empty(ngroups, dtype=result_values_np.dtype) + null_mask = np.ones(ngroups, dtype=bool) # True = null/missing + output_np[result_group_ids_np] = result_values_np + null_mask[result_group_ids_np] = False + # Restore nulls for groups that had null results (min_count or all-null) + if result_values.null_count > 0: + result_nulls = pc.is_null(result_values).to_numpy() + null_mask[result_group_ids_np[result_nulls]] = True + pa_result = pa.array(output_np, type=output_type, mask=null_mask) + + return self._from_pyarrow_array(pa_result) + def _groupby_op( self, *, @@ -2635,9 +2772,27 @@ def _groupby_op( **kwargs, ) - # maybe convert to a compatible dtype optimized for groupby - values: ExtensionArray pa_type = self._pa_array.type + + # Try PyArrow-native path for decimal and string types where it's faster. + # For integer/float/boolean, the fallback path via _to_masked() is faster. + if ( + pa.types.is_decimal(pa_type) + or pa.types.is_string(pa_type) + or pa.types.is_large_string(pa_type) + ): + result = self._groupby_op_pyarrow( + how=how, + min_count=min_count, + ngroups=ngroups, + ids=ids, + **kwargs, + ) + if result is not None: + return result + + # Fall back to converting to masked/datetime array and using Cython + values: ExtensionArray if pa.types.is_timestamp(pa_type): values = self._to_datetimearray() elif pa.types.is_duration(pa_type): diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index ba5d257bd59e4..9f2549bbe121c 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -3270,6 +3270,213 @@ def test_groupby_count_return_arrow_dtype(data_missing): tm.assert_frame_equal(result, expected) +class TestGroupbyAggPyArrowNative: + """Tests for PyArrow-native groupby aggregations on decimal and string types.""" + + @pytest.mark.parametrize( + "dtype,agg_func", + [ + # Decimal aggregations + (pa.decimal128(10, 2), "sum"), + (pa.decimal128(10, 2), "prod"), + (pa.decimal128(10, 2), "min"), + (pa.decimal128(10, 2), "max"), + (pa.decimal128(10, 2), "mean"), + (pa.decimal128(10, 2), "std"), + (pa.decimal128(10, 2), "var"), + (pa.decimal128(10, 2), "sem"), + (pa.decimal128(10, 2), "count"), + # String aggregations + (pa.string(), "min"), + (pa.string(), "max"), + (pa.string(), "count"), + (pa.large_string(), "min"), + (pa.large_string(), "max"), + (pa.large_string(), "count"), + ], + ) + def test_groupby_aggregations(self, dtype, agg_func): + # Test that decimal/string types use PyArrow-native groupby path + if pa.types.is_decimal(dtype): + values = [ + Decimal("1.5"), + Decimal("2.5"), + Decimal("3.0"), + Decimal("4.0"), + Decimal("5.0"), + ] + else: + values = ["apple", "banana", "cherry", "date", "elderberry"] + + df = pd.DataFrame( + { + "key": [1, 1, 2, 2, 3], + "value": pd.array(values, dtype=ArrowDtype(dtype)), + } + ) + result = getattr(df.groupby("key")["value"], agg_func)() + assert len(result) == 3 + assert result.index.tolist() == [1, 2, 3] + assert isinstance(result.dtype, ArrowDtype) + + @pytest.mark.parametrize( + "dtype,agg_func", + [ + (pa.decimal128(10, 2), "sum"), + (pa.decimal128(10, 2), "min"), + (pa.string(), "min"), + (pa.string(), "max"), + ], + ) + def test_groupby_with_nulls(self, dtype, agg_func): + # Test groupby with null values + if pa.types.is_decimal(dtype): + values = [Decimal("1.0"), None, Decimal("3.0"), None] + expected = [Decimal("1.0"), Decimal("3.0")] + else: + values = ["a", None, "c", None] + expected = ["a", "c"] + + df = pd.DataFrame( + { + "key": [1, 1, 2, 2], + "value": pd.array(values, dtype=ArrowDtype(dtype)), + } + ) + result = getattr(df.groupby("key")["value"], agg_func)() + assert len(result) == 2 + assert result.iloc[0] == expected[0] + assert result.iloc[1] == expected[1] + + def test_groupby_sem_returns_float(self): + # Test that sem returns float dtype and handles edge cases + # 1. Normal case: sem should return double[pyarrow] + df = pd.DataFrame( + { + "key": [1, 1, 2, 2], + "value": pd.array( + [Decimal("1.0"), Decimal("2.0"), Decimal("3.0"), Decimal("4.0")], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + result = df.groupby("key")["value"].sem() + assert result.dtype == ArrowDtype(pa.float64()) + + # 2. Single value per group (count=1): should be NA (stddev undefined) + df_single = pd.DataFrame( + { + "key": [1, 2], + "value": pd.array( + [Decimal("1.0"), Decimal("2.0")], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + result = df_single.groupby("key")["value"].sem() + assert pd.isna(result.iloc[0]) + assert pd.isna(result.iloc[1]) + + # 3. All nulls in a group (count=0): should be NA (no division-by-zero) + df_nulls = pd.DataFrame( + { + "key": [1, 1, 2, 2], + "value": pd.array( + [Decimal("1.0"), Decimal("2.0"), None, None], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + result = df_nulls.groupby("key")["value"].sem() + assert not pd.isna(result.iloc[0]) # Group 1 has values + assert pd.isna(result.iloc[1]) # Group 2 all nulls + + @pytest.mark.parametrize("agg_func", ["sum", "prod"]) + def test_groupby_min_count(self, agg_func): + df = pd.DataFrame( + { + "key": [1, 1, 2], + "value": pd.array( + [Decimal("1.0"), Decimal("2.0"), Decimal("3.0")], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + # min_count=2: group 2 has only 1 value, should be null + result = getattr(df.groupby("key")["value"], agg_func)(min_count=2) + assert not pd.isna(result.iloc[0]) # Group 1 has 2 values + assert pd.isna(result.iloc[1]) # Group 2 has 1 value < min_count + + def test_groupby_min_count_with_nulls(self): + # Test that min_count uses non-null count, not group size + df = pd.DataFrame( + { + "key": [1, 1, 2, 2, 2], + "value": pd.array( + [Decimal("1.0"), None, Decimal("2.0"), Decimal("3.0"), None], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + # Group 1: 2 rows but only 1 non-null -> should be null with min_count=2 + # Group 2: 3 rows but only 2 non-null -> should be 5.0 with min_count=2 + result = df.groupby("key")["value"].sum(min_count=2) + assert pd.isna(result.iloc[0]) # Only 1 non-null < min_count=2 + assert result.iloc[1] == Decimal("5.0") # 2 non-null >= min_count=2 + + @pytest.mark.parametrize( + "agg_func,default_value", + [ + ("sum", 0), + ("prod", 1), + ], + ) + def test_groupby_missing_groups(self, agg_func, default_value): + df = pd.DataFrame( + { + "key": pd.Categorical([0, 0, 2, 2], categories=[0, 1, 2]), + "value": pd.array( + [Decimal("1.0"), Decimal("2.0"), Decimal("3.0"), Decimal("4.0")], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + result = getattr(df.groupby("key", observed=False)["value"], agg_func)() + assert len(result) == 3 + # Group 1 is missing, should get default value + assert result.iloc[1] == Decimal(str(default_value)) + + @pytest.mark.parametrize("dropna", [True, False]) + def test_groupby_dropna(self, dropna): + # Test that NA group (ids == -1) is handled correctly + df = pd.DataFrame( + { + "key": [1, 1, None, 2, 2, None], + "value": pd.array( + [ + Decimal("1.0"), + Decimal("2.0"), + Decimal("3.0"), + Decimal("4.0"), + Decimal("5.0"), + Decimal("6.0"), + ], + dtype=ArrowDtype(pa.decimal128(10, 2)), + ), + } + ) + result = df.groupby("key", dropna=dropna)["value"].sum() + if dropna: + assert len(result) == 2 + assert result.iloc[0] == Decimal("3.0") # 1 + 2 + assert result.iloc[1] == Decimal("9.0") # 4 + 5 + else: + assert len(result) == 3 + assert result.iloc[0] == Decimal("3.0") # 1 + 2 + assert result.iloc[1] == Decimal("9.0") # 4 + 5 + assert result.iloc[2] == Decimal("9.0") # 3 + 6 (NA group) + + def test_fixed_size_list(): # GH#55000 ser = pd.Series( From 79892b3f7c076889b1b12ec4683e3b34f863c690 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 10:41:35 -0800 Subject: [PATCH 2/8] improve comments and benchmark, turn on arrow path for int and float --- asv_bench/benchmarks/groupby.py | 87 +++++++++---------------------- pandas/core/arrays/arrow/array.py | 58 +++++++-------------- 2 files changed, 43 insertions(+), 102 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 08be81eaa50dd..fb3bf9d3d6c44 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -6,6 +6,7 @@ from pandas import ( NA, + ArrowDtype, Categorical, DataFrame, Index, @@ -1156,84 +1157,44 @@ class GroupByAggregateArrowDtypes: param_names = ["dtype", "method"] params = [ [ - "bool[pyarrow]", "int32[pyarrow]", + "int64[pyarrow]", + "float32[pyarrow]", "float64[pyarrow]", - "decimal128(25, 3)[pyarrow]", + "decimal128", "string[pyarrow]", - "timestamp[s, tz=UTC][pyarrow]", - "duration[ms][pyarrow]", - ], - [ - "any", - "all", - "sum", - "prod", - "min", - "max", - "mean", - "std", - "var", - "sem", - "count", - "median", ], + ["sum", "prod", "min", "max", "mean", "std", "var", "count"], ] def setup(self, dtype, method): import pyarrow as pa - # Parse dtype string - if dtype.startswith("decimal128"): - pa_type = pa.decimal128(25, 3) - elif dtype.startswith("timestamp"): - pa_type = pa.timestamp("s", "UTC") - elif dtype.startswith("duration"): - pa_type = pa.duration("ms") - elif dtype == "bool[pyarrow]": - pa_type = pa.bool_() - elif dtype == "int32[pyarrow]": - pa_type = pa.int32() - elif dtype == "float64[pyarrow]": - pa_type = pa.float64() - elif dtype == "string[pyarrow]": - pa_type = pa.string() - else: - raise ValueError(f"Unsupported dtype: {dtype}") + from pandas.api.types import is_string_dtype size = 100_000 - ncols = 5 - columns = list("abcde") - - # Generate data based on type - if pa.types.is_floating(pa_type): - data = np.random.randn(size, ncols) - elif pa.types.is_integer(pa_type): - data = np.random.randint(0, 10_000, (size, ncols)) - elif pa.types.is_decimal(pa_type): + ngroups = 1000 + + if dtype in ("int32[pyarrow]", "int64[pyarrow]"): + data = np.random.randint(0, 10_000, size) + elif dtype in ("float32[pyarrow]", "float64[pyarrow]"): + data = np.random.randn(size) + elif dtype == "decimal128": from decimal import Decimal - data = np.random.randn(size, ncols).round(3) - data = [[Decimal(str(x)) for x in row] for row in data] - elif pa.types.is_boolean(pa_type): - data = np.random.choice([True, False], (size, ncols)) - elif pa.types.is_timestamp(pa_type): - data = np.random.randint(0, 1_000_000, (size, ncols)) - elif pa.types.is_duration(pa_type): - data = np.random.randint(0, 1_000_000, (size, ncols)) - elif pa.types.is_string(pa_type): - data = np.random.choice(list(ascii_letters), (size, ncols)) - else: - raise ValueError(f"Unsupported pyarrow type: {pa_type}") + data = [Decimal(str(round(x, 3))) for x in np.random.randn(size)] + dtype = ArrowDtype(pa.decimal128(10, 3)) + elif dtype == "string[pyarrow]": + data = np.random.choice(list(ascii_letters), size) - df = DataFrame(data, columns=columns, dtype=dtype) - # Add some NAs - df.iloc[::10, ::2] = NA - df["key"] = np.random.randint(0, 100, size) - self.df = df + ser = Series(data, dtype=dtype) + if not is_string_dtype(ser.dtype): + ser.iloc[::10] = NA + self.ser = ser + self.key = np.random.randint(0, ngroups, size) - def time_frame_agg(self, dtype, method): - self.df.groupby("key").agg(method) + def time_series_agg(self, dtype, method): + self.ser.groupby(self.key).agg(method) from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index 8a10d3900ff89..065ddeac1897b 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2601,7 +2601,7 @@ def _to_masked(self): arr = self.to_numpy(dtype=dtype.numpy_dtype, na_value=na_value) return dtype.construct_array_type()(arr, mask) - # Mapping from pandas groupby 'how' to PyArrow aggregation function names + # pandas groupby 'how' -> PyArrow aggregation function name _PYARROW_AGG_FUNCS: dict[str, str] = { "sum": "sum", "prod": "product", @@ -2610,13 +2610,13 @@ def _to_masked(self): "mean": "mean", "std": "stddev", "var": "variance", - "sem": "stddev", # sem = stddev / sqrt(count), computed below + "sem": "stddev", # sem = stddev / sqrt(count) "count": "count", "any": "any", "all": "all", } - # Default values for missing groups (identity elements for each operation) + # Identity elements for operations (used to fill missing groups) _PYARROW_AGG_DEFAULTS: dict[str, int | bool] = { "sum": 0, "prod": 1, @@ -2637,23 +2637,19 @@ def _groupby_op_pyarrow( """ Perform groupby aggregation using PyArrow's native Table.group_by. - Returns None if the operation is not supported by PyArrow, - in which case the caller should fall back to the Cython path. + Returns None if not supported, caller should fall back to Cython path. """ pa_agg_func = self._PYARROW_AGG_FUNCS.get(how) if pa_agg_func is None: return None pa_type = self._pa_array.type - # PyArrow doesn't support these aggregations for temporal types directly if pa.types.is_temporal(pa_type) and how in ["std", "var", "sem"]: return None - - # PyArrow's any/all only work on boolean types if how in ["any", "all"] and not pa.types.is_boolean(pa_type): return None - # Filter out NA group (ids == -1) to avoid unnecessary computation + # Filter out NA group (ids == -1) mask = ids >= 0 if not mask.all(): ids = ids[mask] @@ -2661,56 +2657,41 @@ def _groupby_op_pyarrow( else: values = self._pa_array - # Create a PyArrow table with the values and group IDs - # Explicitly cast ids to int64 since np.intp is platform-dependent + # Build table and run aggregation (cast ids to int64 for portability) group_id_arr = pa.array(ids, type=pa.int64()) table = pa.table({"value": values, "group_id": group_id_arr}) - # Build aggregation list - always include count for null handling - # For std/var/sem, pass VarianceOptions with ddof to match pandas behavior if how in ["std", "var", "sem"]: ddof = kwargs.get("ddof", 1) - agg_with_opts = ("value", pa_agg_func, pc.VarianceOptions(ddof=ddof)) - aggs = [agg_with_opts, ("value", "count")] + aggs = [("value", pa_agg_func, pc.VarianceOptions(ddof=ddof))] else: - aggs = [("value", pa_agg_func), ("value", "count")] + aggs = [("value", pa_agg_func)] + aggs.append(("value", "count")) - # Perform the groupby aggregation result_table = table.group_by("group_id").aggregate(aggs) - - # Extract results result_group_ids = result_table.column("group_id") result_values = result_table.column(f"value_{pa_agg_func}") result_counts = result_table.column("value_count") - # For sem, compute stddev / sqrt(count) using PyArrow compute if how == "sem": - sqrt_counts = pc.sqrt(result_counts) - result_values = pc.divide(result_values, sqrt_counts) + result_values = pc.divide(result_values, pc.sqrt(result_counts)) output_type = result_values.type default_value = pa.scalar(self._PYARROW_AGG_DEFAULTS.get(how), type=output_type) - # Handle nulls from all-null groups for sum/prod with min_count=0 + # Replace nulls from all-null groups with identity element if result_values.null_count > 0 and how in ["sum", "prod"] and min_count == 0: result_values = pc.if_else( pc.is_null(result_values), default_value, result_values ) - # Handle min_count: groups with count < min_count should be null + # Null out groups below min_count if min_count > 0: below_min_count = pc.less(result_counts, pa.scalar(min_count)) result_values = pc.if_else(below_min_count, None, result_values) - # PyArrow returns results in encounter order. We need to reorder to - # match expected output (group 0, 1, 2, ..., ngroups-1) and fill - # missing groups with default values. - # - # We use NumPy scatter (O(n)) instead of: - # - pc.scatter: doesn't handle missing groups, workaround is slower - # - join+sort: O(n log n), slower for high-cardinality groupby - # - # Explicitly cast to int64 to ensure usable as NumPy indices + # Scatter results into output array ordered by group id. + # NumPy scatter is O(n) vs O(n log n) for join+sort or pc.scatter workaround. result_group_ids_np = result_group_ids.to_numpy(zero_copy_only=False).astype( np.int64, copy=False ) @@ -2718,19 +2699,16 @@ def _groupby_op_pyarrow( default_py = default_value.as_py() if default_py is not None and min_count == 0: - # Operations with identity elements (sum=0, prod=1, count=0, any=False, - # all=True): fill missing groups with default value + # Fill missing groups with identity element output_np = np.full(ngroups, default_py, dtype=result_values_np.dtype) output_np[result_group_ids_np] = result_values_np pa_result = pa.array(output_np, type=output_type) else: - # Operations without identity elements (mean, std, var, min, max, sem): - # fill missing groups with null using a boolean mask + # Fill missing groups with null output_np = np.empty(ngroups, dtype=result_values_np.dtype) - null_mask = np.ones(ngroups, dtype=bool) # True = null/missing + null_mask = np.ones(ngroups, dtype=bool) output_np[result_group_ids_np] = result_values_np null_mask[result_group_ids_np] = False - # Restore nulls for groups that had null results (min_count or all-null) if result_values.null_count > 0: result_nulls = pc.is_null(result_values).to_numpy() null_mask[result_group_ids_np[result_nulls]] = True @@ -2780,6 +2758,8 @@ def _groupby_op( pa.types.is_decimal(pa_type) or pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type) + or pa.types.is_integer(pa_type) # TEMPORARY: for testing + or pa.types.is_floating(pa_type) # TEMPORARY: for testing ): result = self._groupby_op_pyarrow( how=how, From 6a9be6cfcb875db6cc721d305804a59f614848ed Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 14:30:33 -0800 Subject: [PATCH 3/8] CLN: refactor TestGroupbyAggPyArrowNative tests to eliminate if-else branches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Split test_groupby_aggregations into test_groupby_decimal_aggregations and test_groupby_string_aggregations - Split test_groupby_dropna into test_groupby_dropna_true and test_groupby_dropna_false - Use explicit Decimal values instead of range() casts for decimal tests - Parametrize values directly to avoid runtime branching 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pandas/tests/extension/test_arrow.py | 225 +++++++++------------------ 1 file changed, 77 insertions(+), 148 deletions(-) diff --git a/pandas/tests/extension/test_arrow.py b/pandas/tests/extension/test_arrow.py index 9f2549bbe121c..9519afd21ff61 100644 --- a/pandas/tests/extension/test_arrow.py +++ b/pandas/tests/extension/test_arrow.py @@ -3273,20 +3273,22 @@ def test_groupby_count_return_arrow_dtype(data_missing): class TestGroupbyAggPyArrowNative: """Tests for PyArrow-native groupby aggregations on decimal and string types.""" + @pytest.mark.parametrize( + "agg_func", + ["sum", "prod", "min", "max", "mean", "std", "var", "sem", "count"], + ) + def test_groupby_decimal_aggregations(self, agg_func): + """Test decimal types use PyArrow-native groupby path.""" + values = [Decimal(str(i)) for i in range(5)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 1, 2, 2, 3]).agg(agg_func) + assert len(result) == 3 + assert result.index.tolist() == [1, 2, 3] + assert isinstance(result.dtype, ArrowDtype) + @pytest.mark.parametrize( "dtype,agg_func", [ - # Decimal aggregations - (pa.decimal128(10, 2), "sum"), - (pa.decimal128(10, 2), "prod"), - (pa.decimal128(10, 2), "min"), - (pa.decimal128(10, 2), "max"), - (pa.decimal128(10, 2), "mean"), - (pa.decimal128(10, 2), "std"), - (pa.decimal128(10, 2), "var"), - (pa.decimal128(10, 2), "sem"), - (pa.decimal128(10, 2), "count"), - # String aggregations (pa.string(), "min"), (pa.string(), "max"), (pa.string(), "count"), @@ -3295,132 +3297,76 @@ class TestGroupbyAggPyArrowNative: (pa.large_string(), "count"), ], ) - def test_groupby_aggregations(self, dtype, agg_func): - # Test that decimal/string types use PyArrow-native groupby path - if pa.types.is_decimal(dtype): - values = [ - Decimal("1.5"), - Decimal("2.5"), - Decimal("3.0"), - Decimal("4.0"), - Decimal("5.0"), - ] - else: - values = ["apple", "banana", "cherry", "date", "elderberry"] - - df = pd.DataFrame( - { - "key": [1, 1, 2, 2, 3], - "value": pd.array(values, dtype=ArrowDtype(dtype)), - } - ) - result = getattr(df.groupby("key")["value"], agg_func)() + def test_groupby_string_aggregations(self, dtype, agg_func): + """Test string types use PyArrow-native groupby path.""" + ser = pd.Series(list("abcde"), dtype=ArrowDtype(dtype)) + result = ser.groupby([1, 1, 2, 2, 3]).agg(agg_func) assert len(result) == 3 assert result.index.tolist() == [1, 2, 3] assert isinstance(result.dtype, ArrowDtype) @pytest.mark.parametrize( - "dtype,agg_func", + "dtype,values,expected,agg_func", [ - (pa.decimal128(10, 2), "sum"), - (pa.decimal128(10, 2), "min"), - (pa.string(), "min"), - (pa.string(), "max"), + ( + pa.decimal128(10, 2), + [Decimal("1.0"), None, Decimal("3.0"), None], + [Decimal("1.0"), Decimal("3.0")], + "min", + ), + (pa.string(), ["a", None, "c", None], ["a", "c"], "min"), + (pa.string(), ["a", None, "c", None], ["a", "c"], "max"), ], ) - def test_groupby_with_nulls(self, dtype, agg_func): - # Test groupby with null values - if pa.types.is_decimal(dtype): - values = [Decimal("1.0"), None, Decimal("3.0"), None] - expected = [Decimal("1.0"), Decimal("3.0")] - else: - values = ["a", None, "c", None] - expected = ["a", "c"] - - df = pd.DataFrame( - { - "key": [1, 1, 2, 2], - "value": pd.array(values, dtype=ArrowDtype(dtype)), - } - ) - result = getattr(df.groupby("key")["value"], agg_func)() + def test_groupby_with_nulls(self, dtype, values, expected, agg_func): + """Test groupby with null values.""" + ser = pd.Series(values, dtype=ArrowDtype(dtype)) + result = ser.groupby([1, 1, 2, 2]).agg(agg_func) assert len(result) == 2 assert result.iloc[0] == expected[0] assert result.iloc[1] == expected[1] def test_groupby_sem_returns_float(self): - # Test that sem returns float dtype and handles edge cases - # 1. Normal case: sem should return double[pyarrow] - df = pd.DataFrame( - { - "key": [1, 1, 2, 2], - "value": pd.array( - [Decimal("1.0"), Decimal("2.0"), Decimal("3.0"), Decimal("4.0")], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } - ) - result = df.groupby("key")["value"].sem() + """Test that sem returns float dtype.""" + values = [Decimal(str(i)) for i in range(4)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 1, 2, 2]).sem() assert result.dtype == ArrowDtype(pa.float64()) - # 2. Single value per group (count=1): should be NA (stddev undefined) - df_single = pd.DataFrame( - { - "key": [1, 2], - "value": pd.array( - [Decimal("1.0"), Decimal("2.0")], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } - ) - result = df_single.groupby("key")["value"].sem() + def test_groupby_sem_single_value(self): + """Test that sem returns NA for single-value groups (stddev undefined).""" + values = [Decimal("1.0"), Decimal("2.0")] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 2]).sem() assert pd.isna(result.iloc[0]) assert pd.isna(result.iloc[1]) - # 3. All nulls in a group (count=0): should be NA (no division-by-zero) - df_nulls = pd.DataFrame( - { - "key": [1, 1, 2, 2], - "value": pd.array( - [Decimal("1.0"), Decimal("2.0"), None, None], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } + def test_groupby_sem_all_nulls(self): + """Test that sem returns NA for all-null groups.""" + ser = pd.Series( + [Decimal("1.0"), Decimal("2.0"), None, None], + dtype=ArrowDtype(pa.decimal128(10, 2)), ) - result = df_nulls.groupby("key")["value"].sem() + result = ser.groupby([1, 1, 2, 2]).sem() assert not pd.isna(result.iloc[0]) # Group 1 has values assert pd.isna(result.iloc[1]) # Group 2 all nulls @pytest.mark.parametrize("agg_func", ["sum", "prod"]) def test_groupby_min_count(self, agg_func): - df = pd.DataFrame( - { - "key": [1, 1, 2], - "value": pd.array( - [Decimal("1.0"), Decimal("2.0"), Decimal("3.0")], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } - ) - # min_count=2: group 2 has only 1 value, should be null - result = getattr(df.groupby("key")["value"], agg_func)(min_count=2) + """Test min_count parameter.""" + values = [Decimal(str(i)) for i in range(3)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 1, 2]).agg(agg_func, min_count=2) assert not pd.isna(result.iloc[0]) # Group 1 has 2 values assert pd.isna(result.iloc[1]) # Group 2 has 1 value < min_count def test_groupby_min_count_with_nulls(self): - # Test that min_count uses non-null count, not group size - df = pd.DataFrame( - { - "key": [1, 1, 2, 2, 2], - "value": pd.array( - [Decimal("1.0"), None, Decimal("2.0"), Decimal("3.0"), None], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } + """Test that min_count uses non-null count, not group size.""" + ser = pd.Series( + [Decimal("1.0"), None, Decimal("2.0"), Decimal("3.0"), None], + dtype=ArrowDtype(pa.decimal128(10, 2)), ) - # Group 1: 2 rows but only 1 non-null -> should be null with min_count=2 - # Group 2: 3 rows but only 2 non-null -> should be 5.0 with min_count=2 - result = df.groupby("key")["value"].sum(min_count=2) + result = ser.groupby([1, 1, 2, 2, 2]).sum(min_count=2) assert pd.isna(result.iloc[0]) # Only 1 non-null < min_count=2 assert result.iloc[1] == Decimal("5.0") # 2 non-null >= min_count=2 @@ -3432,49 +3378,32 @@ def test_groupby_min_count_with_nulls(self): ], ) def test_groupby_missing_groups(self, agg_func, default_value): - df = pd.DataFrame( - { - "key": pd.Categorical([0, 0, 2, 2], categories=[0, 1, 2]), - "value": pd.array( - [Decimal("1.0"), Decimal("2.0"), Decimal("3.0"), Decimal("4.0")], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } - ) - result = getattr(df.groupby("key", observed=False)["value"], agg_func)() + """Test that missing groups get identity values.""" + values = [Decimal(str(i)) for i in range(4)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + keys = pd.Categorical([0, 0, 2, 2], categories=[0, 1, 2]) + result = ser.groupby(keys, observed=False).agg(agg_func) assert len(result) == 3 - # Group 1 is missing, should get default value assert result.iloc[1] == Decimal(str(default_value)) - @pytest.mark.parametrize("dropna", [True, False]) - def test_groupby_dropna(self, dropna): - # Test that NA group (ids == -1) is handled correctly - df = pd.DataFrame( - { - "key": [1, 1, None, 2, 2, None], - "value": pd.array( - [ - Decimal("1.0"), - Decimal("2.0"), - Decimal("3.0"), - Decimal("4.0"), - Decimal("5.0"), - Decimal("6.0"), - ], - dtype=ArrowDtype(pa.decimal128(10, 2)), - ), - } - ) - result = df.groupby("key", dropna=dropna)["value"].sum() - if dropna: - assert len(result) == 2 - assert result.iloc[0] == Decimal("3.0") # 1 + 2 - assert result.iloc[1] == Decimal("9.0") # 4 + 5 - else: - assert len(result) == 3 - assert result.iloc[0] == Decimal("3.0") # 1 + 2 - assert result.iloc[1] == Decimal("9.0") # 4 + 5 - assert result.iloc[2] == Decimal("9.0") # 3 + 6 (NA group) + def test_groupby_dropna_true(self): + """Test that NA keys are excluded when dropna=True.""" + values = [Decimal(str(i)) for i in range(6)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 1, None, 2, 2, None], dropna=True).sum() + assert len(result) == 2 + assert result.iloc[0] == Decimal("1.0") # 0 + 1 + assert result.iloc[1] == Decimal("7.0") # 3 + 4 + + def test_groupby_dropna_false(self): + """Test that NA keys form a group when dropna=False.""" + values = [Decimal(str(i)) for i in range(6)] + ser = pd.Series(values, dtype=ArrowDtype(pa.decimal128(10, 2))) + result = ser.groupby([1, 1, None, 2, 2, None], dropna=False).sum() + assert len(result) == 3 + assert result.iloc[0] == Decimal("1.0") # 0 + 1 + assert result.iloc[1] == Decimal("7.0") # 3 + 4 + assert result.iloc[2] == Decimal("7.0") # 2 + 5 (NA group) def test_fixed_size_list(): From 1322e7202d945b33d0d2ce42a22e9ece32aa233d Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 15:40:57 -0800 Subject: [PATCH 4/8] TYP: fix mypy errors in _groupby_op_pyarrow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add type annotation for aggs list to handle mixed tuple types - Rename result to fallback_result to avoid type conflict 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pandas/core/arrays/arrow/array.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index 065ddeac1897b..fbef8ff6b503b 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2663,7 +2663,9 @@ def _groupby_op_pyarrow( if how in ["std", "var", "sem"]: ddof = kwargs.get("ddof", 1) - aggs = [("value", pa_agg_func, pc.VarianceOptions(ddof=ddof))] + aggs: list[tuple[str, str] | tuple[str, str, pc.VarianceOptions]] = [ + ("value", pa_agg_func, pc.VarianceOptions(ddof=ddof)) + ] else: aggs = [("value", pa_agg_func)] aggs.append(("value", "count")) @@ -2772,15 +2774,15 @@ def _groupby_op( return result # Fall back to converting to masked/datetime array and using Cython - values: ExtensionArray + fallback_values: ExtensionArray if pa.types.is_timestamp(pa_type): - values = self._to_datetimearray() + fallback_values = self._to_datetimearray() elif pa.types.is_duration(pa_type): - values = self._to_timedeltaarray() + fallback_values = self._to_timedeltaarray() else: - values = self._to_masked() + fallback_values = self._to_masked() - result = values._groupby_op( + fallback_result = fallback_values._groupby_op( how=how, has_dropped_na=has_dropped_na, min_count=min_count, @@ -2788,14 +2790,14 @@ def _groupby_op( ids=ids, **kwargs, ) - if isinstance(result, np.ndarray): - return result - elif isinstance(result, BaseMaskedArray): - pa_result = result.__arrow_array__() + if isinstance(fallback_result, np.ndarray): + return fallback_result + elif isinstance(fallback_result, BaseMaskedArray): + pa_result = fallback_result.__arrow_array__() return self._from_pyarrow_array(pa_result) else: # DatetimeArray, TimedeltaArray - pa_result = pa.array(result) + pa_result = pa.array(fallback_result) return self._from_pyarrow_array(pa_result) def _apply_elementwise(self, func: Callable) -> list[list[Any]]: From 7da132cb1d24466081a80ff2cd04cea9b9a5b07d Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 18:39:13 -0800 Subject: [PATCH 5/8] BUG: skip unsupported string aggregations in ASV benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit String types only support min, max, count - skip sum, prod, mean, std, var. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- asv_bench/benchmarks/groupby.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index fb3bf9d3d6c44..ac36c0a4877f1 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -1167,11 +1167,17 @@ class GroupByAggregateArrowDtypes: ["sum", "prod", "min", "max", "mean", "std", "var", "count"], ] + # String types only support min, max, count + _string_unsupported = {"sum", "prod", "mean", "std", "var"} + def setup(self, dtype, method): import pyarrow as pa from pandas.api.types import is_string_dtype + if dtype == "string[pyarrow]" and method in self._string_unsupported: + raise NotImplementedError("skipped") + size = 100_000 ngroups = 1000 From 81d328e477f4e600e25bdffb4381b53c9df14f34 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Thu, 18 Dec 2025 21:25:59 -0800 Subject: [PATCH 6/8] remove int and float --- pandas/core/arrays/arrow/array.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index fbef8ff6b503b..dd4ec618d0da1 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2760,8 +2760,6 @@ def _groupby_op( pa.types.is_decimal(pa_type) or pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type) - or pa.types.is_integer(pa_type) # TEMPORARY: for testing - or pa.types.is_floating(pa_type) # TEMPORARY: for testing ): result = self._groupby_op_pyarrow( how=how, From 949061cb4af8866dd7f07463c16b79f077984598 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Fri, 19 Dec 2025 00:16:09 -0800 Subject: [PATCH 7/8] let string type fall through --- pandas/core/arrays/arrow/array.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index dd4ec618d0da1..7c8243df1a0bc 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2743,14 +2743,7 @@ def _groupby_op( raise TypeError( f"dtype '{self.dtype}' does not support operation '{how}'" ) - return super()._groupby_op( - how=how, - has_dropped_na=has_dropped_na, - min_count=min_count, - ngroups=ngroups, - ids=ids, - **kwargs, - ) + # Fall through to Arrow-native path below pa_type = self._pa_array.type From b3608110fb5698e4f9eed00ad5c3b406771c1be7 Mon Sep 17 00:00:00 2001 From: Fangchen Li Date: Fri, 19 Dec 2025 12:13:21 -0800 Subject: [PATCH 8/8] fix bug in string dtype fall throught --- pandas/core/arrays/arrow/array.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pandas/core/arrays/arrow/array.py b/pandas/core/arrays/arrow/array.py index a125dcc3dbcca..b03130e8939f1 100644 --- a/pandas/core/arrays/arrow/array.py +++ b/pandas/core/arrays/arrow/array.py @@ -2650,6 +2650,10 @@ def _groupby_op_pyarrow( return None if how in ["any", "all"] and not pa.types.is_boolean(pa_type): return None + # PyArrow doesn't support sum/prod/mean/std/var/sem on strings + is_str = pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type) + if is_str and how in ["sum", "prod", "mean", "std", "var", "sem"]: + return None # Filter out NA group (ids == -1) mask = ids >= 0 @@ -2765,6 +2769,17 @@ def _groupby_op( ) if result is not None: return result + # For string types, fall back to parent implementation (Python path) + # since _to_masked() doesn't support strings + if pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type): + return super()._groupby_op( + how=how, + has_dropped_na=has_dropped_na, + min_count=min_count, + ngroups=ngroups, + ids=ids, + **kwargs, + ) # Fall back to converting to masked/datetime array and using Cython fallback_values: ExtensionArray