From cda0721b15f1bc082cec5d838e4f3667d2873452 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 09:37:13 +0800 Subject: [PATCH 01/20] [python] light refactor for stats collect --- .../pypaimon/write/writer/data_blob_writer.py | 9 +-------- .../pypaimon/write/writer/data_writer.py | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index eaf2b9483cd7..e7f63e1aadd6 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -276,14 +276,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, # Column stats (only for normal columns) metadata_stats_enabled = self.options.metadata_stats_enabled() stats_columns = self.normal_columns if metadata_stats_enabled else [] - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_columns - } - - min_value_stats = [column_stats[field.name]['min_values'] for field in stats_columns] - max_value_stats = [column_stats[field.name]['max_values'] for field in stats_columns] - value_null_counts = [column_stats[field.name]['null_counts'] for field in stats_columns] + min_value_stats, max_value_stats, value_null_counts = self._collect_value_stats(data, stats_columns) self.sequence_generator.start = self.sequence_generator.current diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 73609ed91282..7d06c371e499 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -199,9 +199,7 @@ def _write_data_to_file(self, data: pa.Table): for field in stats_fields } data_fields = stats_fields if value_stats_enabled else [] - min_value_stats = [column_stats[field.name]['min_values'] for field in data_fields] - max_value_stats = [column_stats[field.name]['max_values'] for field in data_fields] - value_null_counts = [column_stats[field.name]['null_counts'] for field in data_fields] + min_value_stats, max_value_stats, value_null_counts = self._collect_value_stats(data, data_fields) key_fields = self.trimmed_primary_keys_fields min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields] max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields] @@ -253,6 +251,19 @@ def _generate_file_path(self, file_name: str) -> str: bucket_path = self.path_factory.bucket_path(self.partition, self.bucket) return f"{bucket_path.rstrip('/')}/{file_name}" + def _collect_value_stats(self, data: pa.Table, value_fields: List) -> Tuple[List, List, List]: + if not value_fields: + return [], [], [] + + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in value_fields + } + min_value_stats = [column_stats[field.name]['min_values'] for field in value_fields] + max_value_stats = [column_stats[field.name]['max_values'] for field in value_fields] + value_null_counts = [column_stats[field.name]['null_counts'] for field in value_fields] + return min_value_stats, max_value_stats, value_null_counts + @staticmethod def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: total_rows = data.num_rows From e842711228c2846d750b7f287fcdf5e5d107887a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 10:01:38 +0800 Subject: [PATCH 02/20] [python] remove duplicate compute min and max --- .../pypaimon/write/writer/data_blob_writer.py | 12 ++--- .../pypaimon/write/writer/data_writer.py | 46 +++++++++---------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index e7f63e1aadd6..8cdd7428dcbc 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -276,7 +276,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, # Column stats (only for normal columns) metadata_stats_enabled = self.options.metadata_stats_enabled() stats_columns = self.normal_columns if metadata_stats_enabled else [] - min_value_stats, max_value_stats, value_null_counts = self._collect_value_stats(data, stats_columns) + value_stats = self._collect_value_stats(data, stats_columns) self.sequence_generator.start = self.sequence_generator.current @@ -286,14 +286,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, row_count=data.num_rows, min_key=GenericRow([], []), max_key=GenericRow([], []), - key_stats=SimpleStats( - GenericRow([], []), - GenericRow([], []), - []), - value_stats=SimpleStats( - GenericRow(min_value_stats, stats_columns), - GenericRow(max_value_stats, stats_columns), - value_null_counts), + key_stats=SimpleStats.empty_stats(), + value_stats=value_stats, min_sequence_number=-1, max_sequence_number=-1, schema_id=self.table.table_schema.id, diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 7d06c371e499..15a7ac9a1706 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -198,14 +198,14 @@ def _write_data_to_file(self, data: pa.Table): field.name: self._get_column_stats(data, field.name) for field in stats_fields } - data_fields = stats_fields if value_stats_enabled else [] - min_value_stats, max_value_stats, value_null_counts = self._collect_value_stats(data, data_fields) key_fields = self.trimmed_primary_keys_fields - min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields] - max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields] - key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields] - if not all(count == 0 for count in key_null_counts): + key_stats = self._collect_value_stats(data, key_fields, column_stats) + if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") + + data_fields = stats_fields if value_stats_enabled else [] + value_stats = self._collect_value_stats( + data, data_fields, column_stats if value_stats_enabled else None) min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current @@ -216,16 +216,8 @@ def _write_data_to_file(self, data: pa.Table): row_count=data.num_rows, min_key=GenericRow(min_key, self.trimmed_primary_keys_fields), max_key=GenericRow(max_key, self.trimmed_primary_keys_fields), - key_stats=SimpleStats( - GenericRow(min_key_stats, self.trimmed_primary_keys_fields), - GenericRow(max_key_stats, self.trimmed_primary_keys_fields), - key_null_counts, - ), - value_stats=SimpleStats( - GenericRow(min_value_stats, data_fields), - GenericRow(max_value_stats, data_fields), - value_null_counts, - ), + key_stats=key_stats, + value_stats=value_stats, min_sequence_number=min_seq, max_sequence_number=max_seq, schema_id=self.table.table_schema.id, @@ -251,18 +243,26 @@ def _generate_file_path(self, file_name: str) -> str: bucket_path = self.path_factory.bucket_path(self.partition, self.bucket) return f"{bucket_path.rstrip('/')}/{file_name}" - def _collect_value_stats(self, data: pa.Table, value_fields: List) -> Tuple[List, List, List]: + def _collect_value_stats(self, data: pa.Table, value_fields: List, + column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats: if not value_fields: - return [], [], [] + return SimpleStats.empty_stats() + + if column_stats is None: + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in value_fields + } - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in value_fields - } min_value_stats = [column_stats[field.name]['min_values'] for field in value_fields] max_value_stats = [column_stats[field.name]['max_values'] for field in value_fields] value_null_counts = [column_stats[field.name]['null_counts'] for field in value_fields] - return min_value_stats, max_value_stats, value_null_counts + + return SimpleStats( + GenericRow(min_value_stats, value_fields), + GenericRow(max_value_stats, value_fields), + value_null_counts + ) @staticmethod def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: From 342c0014b05ac0921c929fe9c4eca116e9904f9a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 11:54:27 +0800 Subject: [PATCH 03/20] fix code format --- paimon-python/pypaimon/write/writer/data_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 15a7ac9a1706..3225ce70b92f 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -243,8 +243,8 @@ def _generate_file_path(self, file_name: str) -> str: bucket_path = self.path_factory.bucket_path(self.partition, self.bucket) return f"{bucket_path.rstrip('/')}/{file_name}" - def _collect_value_stats(self, data: pa.Table, value_fields: List, - column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats: + def _collect_value_stats(self, data: pa.Table, value_fields: List, + column_stats: Optional[Dict[str, Dict]] = None) -> SimpleStats: if not value_fields: return SimpleStats.empty_stats() From 99c21f21967336ec3bd5766af820f01af89194f5 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 19:20:36 +0800 Subject: [PATCH 04/20] fix key fields stats collect twice issue --- .../pypaimon/tests/reader_base_test.py | 88 +++++++++++++++++++ .../pypaimon/write/writer/data_writer.py | 19 +++- 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 92a275585ccc..cc4f82571610 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -700,6 +700,94 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) + def test_primary_key_value_stats(self): + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ('category', pa.string()) + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['id'], + options={'metadata.stats-mode': 'full', 'bucket': '2'} + ) + self.catalog.create_table('default.test_pk_value_stats', schema, False) + table = self.catalog.get_table('default.test_pk_value_stats') + + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'price': [10.5, 20.3, 30.7, 40.1, 50.9], + 'category': ['A', 'B', 'C', 'D', 'E'] + }, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + latest_snapshot = SnapshotManager(table).get_latest_snapshot() + manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( + manifest_files[0].file_name, + lambda row: table_scan.starting_scanner._filter_manifest_entry(row), + False + ) + + self.assertGreater(len(manifest_entries), 0, "Should have at least one manifest entry") + file_meta = manifest_entries[0].file + + key_stats = file_meta.key_stats + self.assertIsNotNone(key_stats, "key_stats should not be None") + self.assertGreater(key_stats.min_values.arity, 0, "key_stats should contain key fields") + self.assertEqual(key_stats.min_values.arity, 1, "key_stats should contain exactly 1 key field (id)") + + value_stats = file_meta.value_stats + self.assertIsNotNone(value_stats, "value_stats should not be None") + + self.assertIsNotNone(file_meta.value_stats_cols, "value_stats_cols should not be None when stats enabled") + self.assertNotIn('id', file_meta.value_stats_cols, + "Key field 'id' should NOT be in value_stats_cols") + + expected_value_fields = ['name', 'price', 'category'] + self.assertEqual(set(file_meta.value_stats_cols), set(expected_value_fields), + f"value_stats_cols should only contain value fields: {expected_value_fields}, " + f"but got: {file_meta.value_stats_cols}") + + self.assertEqual(value_stats.min_values.arity, 3, + f"value_stats should contain exactly 3 value fields, but got {value_stats.min_values.arity}") + self.assertEqual(value_stats.max_values.arity, 3, + f"value_stats should contain exactly 3 value fields, but got {value_stats.max_values.arity}") + self.assertEqual(len(value_stats.null_counts), 3, + f"value_stats null_counts should have 3 elements, but got {len(value_stats.null_counts)}") + + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( + {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, + table.fields + ) + min_value_stats = GenericRowDeserializer.from_bytes( + value_stats.min_values.data, + value_stats_fields + ).values + max_value_stats = GenericRowDeserializer.from_bytes( + value_stats.max_values.data, + value_stats_fields + ).values + + self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") + self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") + + actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) + self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") + actual_ids = sorted(actual_data.column('id').to_pylist()) + self.assertEqual(actual_ids, [1, 2, 3, 4, 5], "All IDs should be present") + def test_split_target_size(self): """Test source.split.target-size configuration effect on split generation.""" from pypaimon.common.options.core_options import CoreOptions diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 3225ce70b92f..734df9e47d11 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -203,7 +203,22 @@ def _write_data_to_file(self, data: pa.Table): if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") - data_fields = stats_fields if value_stats_enabled else [] + if value_stats_enabled: + key_field_names = {field.name for field in key_fields} + # Exclude key fields and system fields (all system fields start with '_') + data_fields = [ + field for field in stats_fields + if field.name not in key_field_names + and not field.name.startswith('_') + ] + table_value_fields = [ + field for field in self.table.fields + if field.name not in key_field_names + ] + value_stats_cols = [field.name for field in table_value_fields] + else: + data_fields = [] + value_stats_cols = [] value_stats = self._collect_value_stats( data, data_fields, column_stats if value_stats_enabled else None) @@ -226,7 +241,7 @@ def _write_data_to_file(self, data: pa.Table): creation_time=Timestamp.now(), delete_row_count=0, file_source=0, - value_stats_cols=None if value_stats_enabled else [], + value_stats_cols=value_stats_cols, external_path=external_path_str, # Set external path if using external paths first_row_id=None, write_cols=self.write_cols, From 6524ed35a5da7aea3bbad7cb0647d1c04530bac5 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 19:21:01 +0800 Subject: [PATCH 05/20] clean code --- paimon-python/pypaimon/write/writer/data_writer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 734df9e47d11..d673c7494976 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -205,7 +205,6 @@ def _write_data_to_file(self, data: pa.Table): if value_stats_enabled: key_field_names = {field.name for field in key_fields} - # Exclude key fields and system fields (all system fields start with '_') data_fields = [ field for field in stats_fields if field.name not in key_field_names From 73b1d9e7767e76a4c02ed33bd2746936e43f629f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 19:51:30 +0800 Subject: [PATCH 06/20] fix --- .../pypaimon/write/writer/data_writer.py | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index d673c7494976..b78646ea993d 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -205,16 +205,23 @@ def _write_data_to_file(self, data: pa.Table): if value_stats_enabled: key_field_names = {field.name for field in key_fields} - data_fields = [ - field for field in stats_fields - if field.name not in key_field_names - and not field.name.startswith('_') - ] - table_value_fields = [ - field for field in self.table.fields - if field.name not in key_field_names - ] - value_stats_cols = [field.name for field in table_value_fields] + + if self.table.is_primary_key_table: + from pypaimon.table.special_fields import SpecialFields + data_fields = [ + field for field in stats_fields + if field.name not in key_field_names + and field.name in self.table.field_dict + and not SpecialFields.is_system_field(field.name) + and not field.name.startswith('_KEY_') + ] + value_stats_cols = [field.name for field in data_fields] + else: + data_fields = [ + field for field in stats_fields + if field.name not in key_field_names + ] + value_stats_cols = [field.name for field in data_fields] else: data_fields = [] value_stats_cols = [] From 225210dd7c94da3f227ccf90f26794e9f7ff9bfc Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 20:07:39 +0800 Subject: [PATCH 07/20] fix --- .../manifest/manifest_file_manager.py | 4 ++-- .../pypaimon/tests/reader_base_test.py | 24 ++++++++++++------- .../pypaimon/write/writer/data_writer.py | 22 ++++------------- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index f6ae41e3d386..ff3528d3ba45 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -166,13 +166,13 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: fields = schema_fields else: read_fields = file_dict['_WRITE_COLS'] - fields = [self.table.field_dict[col] for col in read_fields] + fields = [self.table.field_dict[col] for col in read_fields if col in self.table.field_dict] else: fields = schema_fields elif not file_dict['_VALUE_STATS_COLS']: fields = [] else: - fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] + fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS'] if col in self.table.field_dict] return fields def write(self, file_name, entries: List[ManifestEntry]): diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index cc4f82571610..aa0b1184e127 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -755,17 +755,25 @@ def test_primary_key_value_stats(self): self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") + # For PrimaryKey tables, value_stats_cols may include system fields (_KEY_*, _SEQUENCE_NUMBER, _VALUE_KIND) + # but should always include the actual value fields expected_value_fields = ['name', 'price', 'category'] - self.assertEqual(set(file_meta.value_stats_cols), set(expected_value_fields), - f"value_stats_cols should only contain value fields: {expected_value_fields}, " + self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), + f"value_stats_cols should contain value fields: {expected_value_fields}, " f"but got: {file_meta.value_stats_cols}") - self.assertEqual(value_stats.min_values.arity, 3, - f"value_stats should contain exactly 3 value fields, but got {value_stats.min_values.arity}") - self.assertEqual(value_stats.max_values.arity, 3, - f"value_stats should contain exactly 3 value fields, but got {value_stats.max_values.arity}") - self.assertEqual(len(value_stats.null_counts), 3, - f"value_stats null_counts should have 3 elements, but got {len(value_stats.null_counts)}") + # value_stats should match value_stats_cols (including system fields for PrimaryKey tables) + # So the arity should match the number of fields in value_stats_cols + expected_arity = len(file_meta.value_stats_cols) + self.assertEqual(value_stats.min_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.min_values.arity}") + self.assertEqual(value_stats.max_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.max_values.arity}") + self.assertEqual(len(value_stats.null_counts), expected_arity, + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index b78646ea993d..83bf7d302f04 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -205,23 +205,11 @@ def _write_data_to_file(self, data: pa.Table): if value_stats_enabled: key_field_names = {field.name for field in key_fields} - - if self.table.is_primary_key_table: - from pypaimon.table.special_fields import SpecialFields - data_fields = [ - field for field in stats_fields - if field.name not in key_field_names - and field.name in self.table.field_dict - and not SpecialFields.is_system_field(field.name) - and not field.name.startswith('_KEY_') - ] - value_stats_cols = [field.name for field in data_fields] - else: - data_fields = [ - field for field in stats_fields - if field.name not in key_field_names - ] - value_stats_cols = [field.name for field in data_fields] + data_fields = [ + field for field in stats_fields + if field.name not in key_field_names + ] + value_stats_cols = [field.name for field in data_fields] else: data_fields = [] value_stats_cols = [] From b11cb72424557d80666621637c0453126ac817f3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 20:11:20 +0800 Subject: [PATCH 08/20] fix --- paimon-python/pypaimon/tests/reader_base_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index aa0b1184e127..d8b17101d7ce 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -755,15 +755,11 @@ def test_primary_key_value_stats(self): self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") - # For PrimaryKey tables, value_stats_cols may include system fields (_KEY_*, _SEQUENCE_NUMBER, _VALUE_KIND) - # but should always include the actual value fields expected_value_fields = ['name', 'price', 'category'] self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), f"value_stats_cols should contain value fields: {expected_value_fields}, " f"but got: {file_meta.value_stats_cols}") - # value_stats should match value_stats_cols (including system fields for PrimaryKey tables) - # So the arity should match the number of fields in value_stats_cols expected_arity = len(file_meta.value_stats_cols) self.assertEqual(value_stats.min_values.arity, expected_arity, f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " From 46e1de10546bc39554241a7063aaee7f6418595e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 20:15:23 +0800 Subject: [PATCH 09/20] fix --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 4 ++-- paimon-python/pypaimon/write/writer/data_writer.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index ff3528d3ba45..f6ae41e3d386 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -166,13 +166,13 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: fields = schema_fields else: read_fields = file_dict['_WRITE_COLS'] - fields = [self.table.field_dict[col] for col in read_fields if col in self.table.field_dict] + fields = [self.table.field_dict[col] for col in read_fields] else: fields = schema_fields elif not file_dict['_VALUE_STATS_COLS']: fields = [] else: - fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS'] if col in self.table.field_dict] + fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] return fields def write(self, file_name, entries: List[ManifestEntry]): diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 83bf7d302f04..2806380ccf93 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -208,6 +208,7 @@ def _write_data_to_file(self, data: pa.Table): data_fields = [ field for field in stats_fields if field.name not in key_field_names + and field.name in self.table.field_dict ] value_stats_cols = [field.name for field in data_fields] else: From 42ef5e625eadc4181f2f89488f04190aee12600d Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 20:44:35 +0800 Subject: [PATCH 10/20] fix --- .../pypaimon/tests/reader_base_test.py | 74 ++++++++++--------- .../pypaimon/write/writer/data_writer.py | 14 +--- 2 files changed, 41 insertions(+), 47 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index d8b17101d7ce..e1fc8fa71ced 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -751,41 +751,45 @@ def test_primary_key_value_stats(self): value_stats = file_meta.value_stats self.assertIsNotNone(value_stats, "value_stats should not be None") - self.assertIsNotNone(file_meta.value_stats_cols, "value_stats_cols should not be None when stats enabled") - self.assertNotIn('id', file_meta.value_stats_cols, - "Key field 'id' should NOT be in value_stats_cols") - - expected_value_fields = ['name', 'price', 'category'] - self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), - f"value_stats_cols should contain value fields: {expected_value_fields}, " - f"but got: {file_meta.value_stats_cols}") - - expected_arity = len(file_meta.value_stats_cols) - self.assertEqual(value_stats.min_values.arity, expected_arity, - f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " - f"but got {value_stats.min_values.arity}") - self.assertEqual(value_stats.max_values.arity, expected_arity, - f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " - f"but got {value_stats.max_values.arity}") - self.assertEqual(len(value_stats.null_counts), expected_arity, - f"value_stats null_counts should have {expected_arity} elements, " - f"but got {len(value_stats.null_counts)}") - - value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( - {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, - table.fields - ) - min_value_stats = GenericRowDeserializer.from_bytes( - value_stats.min_values.data, - value_stats_fields - ).values - max_value_stats = GenericRowDeserializer.from_bytes( - value_stats.max_values.data, - value_stats_fields - ).values - - self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") - self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") + if file_meta.value_stats_cols is None: + expected_value_fields = ['name', 'price', 'category'] + self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), + f"value_stats should contain at least {len(expected_value_fields)} value fields") + else: + self.assertNotIn('id', file_meta.value_stats_cols, + "Key field 'id' should NOT be in value_stats_cols") + + expected_value_fields = ['name', 'price', 'category'] + self.assertTrue(set(expected_value_fields).issubset(set(file_meta.value_stats_cols)), + f"value_stats_cols should contain value fields: {expected_value_fields}, " + f"but got: {file_meta.value_stats_cols}") + + expected_arity = len(file_meta.value_stats_cols) + self.assertEqual(value_stats.min_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.min_values.arity}") + self.assertEqual(value_stats.max_values.arity, expected_arity, + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.max_values.arity}") + self.assertEqual(len(value_stats.null_counts), expected_arity, + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") + + value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( + {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, + table.fields + ) + min_value_stats = GenericRowDeserializer.from_bytes( + value_stats.min_values.data, + value_stats_fields + ).values + max_value_stats = GenericRowDeserializer.from_bytes( + value_stats.max_values.data, + value_stats_fields + ).values + + self.assertEqual(len(min_value_stats), 3, "min_value_stats should have 3 values") + self.assertEqual(len(max_value_stats), 3, "max_value_stats should have 3 values") actual_data = read_builder.new_read().to_arrow(table_scan.plan().splits()) self.assertEqual(actual_data.num_rows, 5, "Should have 5 rows") diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 2806380ccf93..3225ce70b92f 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -203,17 +203,7 @@ def _write_data_to_file(self, data: pa.Table): if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") - if value_stats_enabled: - key_field_names = {field.name for field in key_fields} - data_fields = [ - field for field in stats_fields - if field.name not in key_field_names - and field.name in self.table.field_dict - ] - value_stats_cols = [field.name for field in data_fields] - else: - data_fields = [] - value_stats_cols = [] + data_fields = stats_fields if value_stats_enabled else [] value_stats = self._collect_value_stats( data, data_fields, column_stats if value_stats_enabled else None) @@ -236,7 +226,7 @@ def _write_data_to_file(self, data: pa.Table): creation_time=Timestamp.now(), delete_row_count=0, file_source=0, - value_stats_cols=value_stats_cols, + value_stats_cols=None if value_stats_enabled else [], external_path=external_path_str, # Set external path if using external paths first_row_id=None, write_cols=self.write_cols, From 72e9e74b59c138d991d01bc8a8650f754820d9fd Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 20:53:20 +0800 Subject: [PATCH 11/20] fix --- .../pypaimon/write/writer/data_writer.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 3225ce70b92f..cc32fc886e5e 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -192,18 +192,28 @@ def _write_data_to_file(self, data: pa.Table): # key stats & value stats value_stats_enabled = self.options.metadata_stats_enabled() - stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled\ - else self.table.trimmed_primary_keys_fields - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_fields - } key_fields = self.trimmed_primary_keys_fields + key_field_names = {field.name for field in key_fields} + + if value_stats_enabled: + stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in stats_fields + } + data_fields = [field for field in stats_fields if field.name not in key_field_names] + else: + stats_fields = key_fields + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in stats_fields + } + data_fields = [] + key_stats = self._collect_value_stats(data, key_fields, column_stats) if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") - data_fields = stats_fields if value_stats_enabled else [] value_stats = self._collect_value_stats( data, data_fields, column_stats if value_stats_enabled else None) From 6726e2c774eb719336c6b858a9985740d8185a5b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 21:05:12 +0800 Subject: [PATCH 12/20] fix --- .../pypaimon/write/writer/data_writer.py | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index cc32fc886e5e..53f4310971b0 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -192,30 +192,24 @@ def _write_data_to_file(self, data: pa.Table): # key stats & value stats value_stats_enabled = self.options.metadata_stats_enabled() + stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled \ + else self.table.trimmed_primary_keys_fields + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in stats_fields + } + key_fields = self.trimmed_primary_keys_fields key_field_names = {field.name for field in key_fields} - - if value_stats_enabled: - stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_fields - } - data_fields = [field for field in stats_fields if field.name not in key_field_names] - else: - stats_fields = key_fields - column_stats = { - field.name: self._get_column_stats(data, field.name) - for field in stats_fields - } - data_fields = [] - + value_fields = [field for field in stats_fields if + field.name not in key_field_names] if value_stats_enabled else [] + key_stats = self._collect_value_stats(data, key_fields, column_stats) if not all(count == 0 for count in key_stats.null_counts): raise RuntimeError("Primary key should not be null") value_stats = self._collect_value_stats( - data, data_fields, column_stats if value_stats_enabled else None) + data, value_fields, column_stats if value_stats_enabled else None) min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current From 925a0e4198ae8e5af5dd4bb0472b0a7669c6ddc1 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 21:12:14 +0800 Subject: [PATCH 13/20] clean code --- paimon-python/pypaimon/write/writer/data_writer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 53f4310971b0..fac8f5de005d 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -192,13 +192,12 @@ def _write_data_to_file(self, data: pa.Table): # key stats & value stats value_stats_enabled = self.options.metadata_stats_enabled() - stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled \ + stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if value_stats_enabled\ else self.table.trimmed_primary_keys_fields column_stats = { field.name: self._get_column_stats(data, field.name) for field in stats_fields } - key_fields = self.trimmed_primary_keys_fields key_field_names = {field.name for field in key_fields} value_fields = [field for field in stats_fields if From ab419e1d919f9f624f28d168e6fc6fd5b137b924 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 21:24:02 +0800 Subject: [PATCH 14/20] fix code format --- paimon-python/pypaimon/tests/reader_base_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index e1fc8fa71ced..e265719d2391 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -754,7 +754,7 @@ def test_primary_key_value_stats(self): if file_meta.value_stats_cols is None: expected_value_fields = ['name', 'price', 'category'] self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), - f"value_stats should contain at least {len(expected_value_fields)} value fields") + f"value_stats should contain at least {len(expected_value_fields)} value fields") else: self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") @@ -766,14 +766,14 @@ def test_primary_key_value_stats(self): expected_arity = len(file_meta.value_stats_cols) self.assertEqual(value_stats.min_values.arity, expected_arity, - f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " - f"but got {value_stats.min_values.arity}") + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.min_values.arity}") self.assertEqual(value_stats.max_values.arity, expected_arity, - f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " - f"but got {value_stats.max_values.arity}") + f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " + f"but got {value_stats.max_values.arity}") self.assertEqual(len(value_stats.null_counts), expected_arity, - f"value_stats null_counts should have {expected_arity} elements, " - f"but got {len(value_stats.null_counts)}") + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, From 28b7f6275775e56da913c399741371b77d6e2a37 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 21:46:44 +0800 Subject: [PATCH 15/20] fix code format --- paimon-python/pypaimon/tests/reader_base_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index e265719d2391..584deb71b54d 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -754,7 +754,7 @@ def test_primary_key_value_stats(self): if file_meta.value_stats_cols is None: expected_value_fields = ['name', 'price', 'category'] self.assertGreaterEqual(value_stats.min_values.arity, len(expected_value_fields), - f"value_stats should contain at least {len(expected_value_fields)} value fields") + f"value_stats should contain at least {len(expected_value_fields)} value fields") else: self.assertNotIn('id', file_meta.value_stats_cols, "Key field 'id' should NOT be in value_stats_cols") From 31c28506e980a4dd8993157dca2db4782b19baf7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 22:07:39 +0800 Subject: [PATCH 16/20] fix --- paimon-python/pypaimon/tests/reader_base_test.py | 14 ++++++++++++-- paimon-python/pypaimon/write/writer/data_writer.py | 8 ++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 584deb71b54d..6973fc060802 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -772,8 +772,18 @@ def test_primary_key_value_stats(self): f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " f"but got {value_stats.max_values.arity}") self.assertEqual(len(value_stats.null_counts), expected_arity, - f"value_stats null_counts should have {expected_arity} elements, " - f"but got {len(value_stats.null_counts)}") + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") + + self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), + f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " + f"value_stats_cols length ({len(file_meta.value_stats_cols)})") + + for field_name in file_meta.value_stats_cols: + is_system_field = (field_name.startswith('_KEY_') or + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + self.assertFalse(is_system_field, + f"value_stats_cols should not contain system field: {field_name}") value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index fac8f5de005d..0528dc4f91ee 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -29,6 +29,7 @@ from pypaimon.schema.data_types import PyarrowFieldParser from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.special_fields import SpecialFields class DataWriter(ABC): @@ -201,7 +202,9 @@ def _write_data_to_file(self, data: pa.Table): key_fields = self.trimmed_primary_keys_fields key_field_names = {field.name for field in key_fields} value_fields = [field for field in stats_fields if - field.name not in key_field_names] if value_stats_enabled else [] + field.name not in key_field_names and + not (field.name.startswith('_KEY_') or SpecialFields.is_system_field(field.name))] \ + if value_stats_enabled else [] key_stats = self._collect_value_stats(data, key_fields, column_stats) if not all(count == 0 for count in key_stats.null_counts): @@ -213,6 +216,7 @@ def _write_data_to_file(self, data: pa.Table): min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current self.sequence_generator.start = self.sequence_generator.current + value_stats_cols = [field.name for field in value_fields] if value_stats_enabled else [] self.committed_files.append(DataFileMeta.create( file_name=file_name, file_size=self.file_io.get_file_size(file_path), @@ -229,7 +233,7 @@ def _write_data_to_file(self, data: pa.Table): creation_time=Timestamp.now(), delete_row_count=0, file_source=0, - value_stats_cols=None if value_stats_enabled else [], + value_stats_cols=value_stats_cols, external_path=external_path_str, # Set external path if using external paths first_row_id=None, write_cols=self.write_cols, From 4437bdf1935dbc5bed16d2fbc6c0ee05ac1a4df0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 22:28:11 +0800 Subject: [PATCH 17/20] fix code format --- paimon-python/pypaimon/tests/reader_base_test.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 6973fc060802..3050ce0b44f7 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -772,18 +772,18 @@ def test_primary_key_value_stats(self): f"value_stats should contain {expected_arity} fields (matching value_stats_cols), " f"but got {value_stats.max_values.arity}") self.assertEqual(len(value_stats.null_counts), expected_arity, - f"value_stats null_counts should have {expected_arity} elements, " - f"but got {len(value_stats.null_counts)}") + f"value_stats null_counts should have {expected_arity} elements, " + f"but got {len(value_stats.null_counts)}") self.assertEqual(value_stats.min_values.arity, len(file_meta.value_stats_cols), - f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " - f"value_stats_cols length ({len(file_meta.value_stats_cols)})") + f"value_stats.min_values.arity ({value_stats.min_values.arity}) must match " + f"value_stats_cols length ({len(file_meta.value_stats_cols)})") for field_name in file_meta.value_stats_cols: - is_system_field = (field_name.startswith('_KEY_') or - field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + is_system_field = (field_name.startswith('_KEY_') or + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, - f"value_stats_cols should not contain system field: {field_name}") + f"value_stats_cols should not contain system field: {field_name}") value_stats_fields = table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields( {'_VALUE_STATS_COLS': file_meta.value_stats_cols}, From d59d6da20310a9ff787f2fc9bdd924a78fdb7c85 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 22:39:23 +0800 Subject: [PATCH 18/20] fix code format --- paimon-python/pypaimon/tests/reader_base_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 3050ce0b44f7..83715ffc6873 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -781,7 +781,7 @@ def test_primary_key_value_stats(self): for field_name in file_meta.value_stats_cols: is_system_field = (field_name.startswith('_KEY_') or - field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") From 94d95a49c9334a56ac2fee1fffe71aee6f83945e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 23:06:31 +0800 Subject: [PATCH 19/20] fix --- paimon-python/pypaimon/tests/reader_base_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 83715ffc6873..e81fd182b432 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -781,7 +781,7 @@ def test_primary_key_value_stats(self): for field_name in file_meta.value_stats_cols: is_system_field = (field_name.startswith('_KEY_') or - field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) + field_name in ['_SEQUENCE_NUMBER', '_VALUE_KIND', '_ROW_ID']) self.assertFalse(is_system_field, f"value_stats_cols should not contain system field: {field_name}") From 7275135277c60cbec5494c0054455b07da050e14 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Thu, 1 Jan 2026 23:40:04 +0800 Subject: [PATCH 20/20] fix --- .../pypaimon/write/writer/data_writer.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 0528dc4f91ee..eba7802d7ada 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -201,10 +201,7 @@ def _write_data_to_file(self, data: pa.Table): } key_fields = self.trimmed_primary_keys_fields key_field_names = {field.name for field in key_fields} - value_fields = [field for field in stats_fields if - field.name not in key_field_names and - not (field.name.startswith('_KEY_') or SpecialFields.is_system_field(field.name))] \ - if value_stats_enabled else [] + value_fields = self._filter_value_fields(stats_fields, key_field_names) if value_stats_enabled else [] key_stats = self._collect_value_stats(data, key_fields, column_stats) if not all(count == 0 for count in key_stats.null_counts): @@ -216,7 +213,14 @@ def _write_data_to_file(self, data: pa.Table): min_seq = self.sequence_generator.start max_seq = self.sequence_generator.current self.sequence_generator.start = self.sequence_generator.current - value_stats_cols = [field.name for field in value_fields] if value_stats_enabled else [] + if value_stats_enabled: + all_table_value_fields = self._filter_value_fields(self.table.fields, key_field_names) + if len(value_fields) == len(all_table_value_fields): + value_stats_cols = None + else: + value_stats_cols = [field.name for field in value_fields] + else: + value_stats_cols = [] self.committed_files.append(DataFileMeta.create( file_name=file_name, file_size=self.file_io.get_file_size(file_path), @@ -241,6 +245,11 @@ def _write_data_to_file(self, data: pa.Table): file_path=file_path, )) + def _filter_value_fields(self, fields: List, key_field_names: set) -> List: + return [field for field in fields if + field.name not in key_field_names and + not (field.name.startswith('_KEY_') or SpecialFields.is_system_field(field.name))] + def _generate_file_path(self, file_name: str) -> str: if self.external_path_provider: external_path = self.external_path_provider.get_next_external_data_path(file_name)