Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions paimon-python/pypaimon/data/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone


class Timestamp:
Expand Down Expand Up @@ -117,8 +117,9 @@ def __str__(self):

@staticmethod
def now() -> 'Timestamp':
"""Creates an instance of Timestamp for now."""
return Timestamp.from_local_date_time(datetime.now())
"""Creates an instance of Timestamp for now (utc)."""
dt = datetime.now(timezone.utc)
return Timestamp.from_date_time(dt)

@staticmethod
def from_epoch_millis(milliseconds: int, nanos_of_millisecond: int = 0) -> 'Timestamp':
Expand All @@ -131,15 +132,13 @@ def from_epoch_millis(milliseconds: int, nanos_of_millisecond: int = 0) -> 'Time
return Timestamp(milliseconds, nanos_of_millisecond)

@staticmethod
def from_local_date_time(date_time: datetime) -> 'Timestamp':
def from_date_time(date_time: datetime) -> 'Timestamp':
"""
Creates an instance of Timestamp from a datetime (timezone-free).
Creates an instance of Timestamp from a datetime

Args:
date_time: a datetime object (should be naive, without timezone)
date_time: a datetime object
"""
if date_time.tzinfo is not None:
raise ValueError("datetime must be naive (no timezone)")

epoch_date = datetime(1970, 1, 1).date()
date_time_date = date_time.date()
Expand Down
2 changes: 1 addition & 1 deletion paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
epoch_millis = int(creation_time_value.timestamp() * 1000)
creation_time_ts = Timestamp.from_epoch_millis(epoch_millis)
else:
creation_time_ts = Timestamp.from_local_date_time(creation_time_value)
creation_time_ts = Timestamp.from_date_time(creation_time_value)
elif isinstance(creation_time_value, (int, float)):
creation_time_ts = Timestamp.from_epoch_millis(int(creation_time_value))
else:
Expand Down
37 changes: 31 additions & 6 deletions paimon-python/pypaimon/tests/file_store_commit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import unittest
from datetime import datetime
from zoneinfo import ZoneInfo
from unittest.mock import Mock, patch

from pypaimon.manifest.schema.data_file_meta import DataFileMeta
Expand Down Expand Up @@ -65,7 +66,7 @@ def test_generate_partition_statistics_single_partition_single_file(
from pypaimon.data.timestamp import Timestamp
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.schema.simple_stats import SimpleStats
creation_time = Timestamp.from_local_date_time(creation_time_dt)
creation_time = Timestamp.from_date_time(creation_time_dt)
file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024, # 1MB
Expand Down Expand Up @@ -115,8 +116,8 @@ def test_generate_partition_statistics_multiple_files_same_partition(
from pypaimon.data.timestamp import Timestamp
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.schema.simple_stats import SimpleStats
creation_time_1 = Timestamp.from_local_date_time(datetime(2024, 1, 15, 10, 30, 0))
creation_time_2 = Timestamp.from_local_date_time(datetime(2024, 1, 15, 11, 30, 0)) # Later time
creation_time_1 = Timestamp.from_date_time(datetime(2024, 1, 15, 10, 30, 0))
creation_time_2 = Timestamp.from_date_time(datetime(2024, 1, 15, 11, 30, 0)) # Later time

file_meta_1 = DataFileMeta.create(
file_name="test_file_1.parquet",
Expand Down Expand Up @@ -180,7 +181,7 @@ def test_generate_partition_statistics_multiple_partitions(
from pypaimon.data.timestamp import Timestamp
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.schema.simple_stats import SimpleStats
creation_time = Timestamp.from_local_date_time(creation_time_dt)
creation_time = Timestamp.from_date_time(creation_time_dt)

# File for partition 1
file_meta_1 = DataFileMeta.create(
Expand Down Expand Up @@ -271,7 +272,7 @@ def test_generate_partition_statistics_unpartitioned_table(
from pypaimon.data.timestamp import Timestamp
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.schema.simple_stats import SimpleStats
creation_time = Timestamp.from_local_date_time(creation_time_dt)
creation_time = Timestamp.from_date_time(creation_time_dt)
file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024,
Expand Down Expand Up @@ -346,6 +347,30 @@ def test_generate_partition_statistics_no_creation_time(
# Should have a valid timestamp (current time)
self.assertGreater(stat.last_file_creation_time, 0)

def test_creation_time(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
from pypaimon.data.timestamp import Timestamp
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.manifest.schema.simple_stats import SimpleStats
file_meta = DataFileMeta.create(
file_name="test_file_1.parquet",
file_size=1024 * 1024,
row_count=10000,
min_key=GenericRow([], []),
max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(),
value_stats=SimpleStats.empty_stats(),
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
extra_files=[],
creation_time=None
)
creation_time = file_meta.creation_time
now = Timestamp.from_date_time(datetime.now(ZoneInfo("Asia/Shanghai")))
self.assertEqual(round((now.get_millisecond() - creation_time.get_millisecond()) / 60 / 60 / 1000), 8)

def test_generate_partition_statistics_mismatched_partition_keys(
self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager):
"""Test partition statistics generation when partition tuple doesn't match partition keys."""
Expand All @@ -369,7 +394,7 @@ def test_generate_partition_statistics_mismatched_partition_keys(
schema_id=0,
level=0,
extra_files=[],
creation_time=Timestamp.from_local_date_time(datetime(2024, 1, 15, 10, 30, 0))
creation_time=Timestamp.from_date_time(datetime(2024, 1, 15, 10, 30, 0))
)

commit_message = CommitMessage(
Expand Down