diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index a5e0e1919e4f..6838f8f0f747 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -578,6 +578,32 @@ public InlineElement getDescription() { "Open file cost of a source file. It is used to avoid reading" + " too many files with a source split, which can be very slow."); + public static final ConfigOption SOURCE_SPLIT_FILE_ENABLED = + key("source.split.file-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable finer-grained file splitting. When enabled, large files can be split " + + "by row groups (Parquet) or stripes (ORC) to improve concurrency. " + + "This feature is disabled by default for backward compatibility."); + + public static final ConfigOption SOURCE_SPLIT_FILE_THRESHOLD = + key("source.split.file-threshold") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(128)) + .withDescription( + "Minimum file size to consider for finer-grained splitting. Files smaller " + + "than this threshold will not be split further."); + + public static final ConfigOption SOURCE_SPLIT_FILE_MAX_SPLITS = + key("source.split.file-max-splits") + .intType() + .defaultValue(100) + .withDescription( + "Maximum number of splits to generate per file when using finer-grained " + + "splitting. This prevents excessive splitting for files with many " + + "row groups or stripes."); + public static final ConfigOption WRITE_BUFFER_SIZE = key("write-buffer-size") .memoryType() @@ -2534,6 +2560,18 @@ public long splitOpenFileCost() { return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes(); } + public boolean splitFileEnabled() { + return options.get(SOURCE_SPLIT_FILE_ENABLED); + } + + public long splitFileThreshold() { + return options.get(SOURCE_SPLIT_FILE_THRESHOLD).getBytes(); + } + + public int splitFileMaxSplits() { + return options.get(SOURCE_SPLIT_FILE_MAX_SPLITS); + } + public long writeBufferSize() { return options.get(WRITE_BUFFER_SIZE).getBytes(); } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileSplitBoundary.java b/paimon-common/src/main/java/org/apache/paimon/format/FileSplitBoundary.java new file mode 100644 index 000000000000..0f79f3ca75f8 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileSplitBoundary.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import java.util.Objects; + +/** + * Represents a split boundary within a file (e.g., a row group in Parquet or a stripe in ORC). + * + *

This class contains the byte offset, length, and row count for a portion of a file that can be + * read independently. This enables finer-grained splitting than file-level splitting. + * + * @since 0.9.0 + */ +public class FileSplitBoundary { + + private final long offset; + private final long length; + private final long rowCount; + + public FileSplitBoundary(long offset, long length, long rowCount) { + this.offset = offset; + this.length = length; + this.rowCount = rowCount; + } + + /** Byte offset where this split starts in the file. */ + public long offset() { + return offset; + } + + /** Byte length of this split. */ + public long length() { + return length; + } + + /** Number of rows in this split. */ + public long rowCount() { + return rowCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileSplitBoundary that = (FileSplitBoundary) o; + return offset == that.offset && length == that.length && rowCount == that.rowCount; + } + + @Override + public int hashCode() { + return Objects.hash(offset, length, rowCount); + } + + @Override + public String toString() { + return String.format( + "FileSplitBoundary{offset=%d, length=%d, rowCount=%d}", offset, length, rowCount); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatMetadataReader.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatMetadataReader.java new file mode 100644 index 000000000000..47204065b6c0 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatMetadataReader.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import java.io.IOException; +import java.util.List; + +/** + * Interface for reading format-specific metadata to enable finer-grained splitting. + * + *

This interface allows extracting split boundaries (e.g., row groups for Parquet, stripes for + * ORC) from files, enabling splits to be generated at finer granularity than file level. + * + * @since 0.9.0 + */ +public interface FormatMetadataReader { + + /** + * Get split boundaries for a file (row groups for Parquet, stripes for ORC). + * + *

Each boundary represents a portion of the file that can be read independently. The + * boundaries are returned in order, starting from the beginning of the file. + * + * @param fileIO FileIO instance to read the file + * @param filePath Path to the file + * @param fileSize Size of the file in bytes + * @return List of split boundaries, one per row group/stripe. Returns empty list if file cannot + * be split further or if an error occurs. + * @throws IOException if an error occurs while reading file metadata + */ + List getSplitBoundaries(FileIO fileIO, Path filePath, long fileSize) + throws IOException; + + /** + * Check if this format supports finer-grained splitting. + * + * @return true if the format supports splitting files into smaller units (e.g., row groups, + * stripes), false otherwise + */ + boolean supportsFinerGranularity(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionFileLister.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionFileLister.java new file mode 100644 index 000000000000..1b750c2e2f11 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionFileLister.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.PartitionPathUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Utility class to list all files (data files, manifest files, etc.) in a partition. + * + *

This utility helps identify all files that need to be archived when archiving a partition. It + * includes: + * + *

    + *
  • Data files referenced in manifests + *
  • Manifest files that reference the partition + *
  • Extra files (like data file indexes) associated with data files + *
+ * + * @since 0.9.0 + */ +public class PartitionFileLister { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionFileLister.class); + + private final FileStoreTable table; + private final FileIO fileIO; + private final FileStorePathFactory pathFactory; + + public PartitionFileLister(FileStoreTable table) { + this.table = table; + this.fileIO = table.fileIO(); + this.pathFactory = table.store().pathFactory(); + } + + /** + * List all file paths in the specified partition. + * + * @param partitionSpec the partition specification (e.g., {"dt": "20250101"}) + * @return list of all file paths in the partition + * @throws IOException if an error occurs while listing files + */ + public List listPartitionFiles(Map partitionSpec) throws IOException { + return listPartitionFiles(Collections.singletonList(partitionSpec)); + } + + /** + * List all file paths in the specified partitions. + * + * @param partitionSpecs list of partition specifications + * @return list of all file paths in the partitions + * @throws IOException if an error occurs while listing files + */ + public List listPartitionFiles(List> partitionSpecs) + throws IOException { + Set allFiles = new HashSet<>(); + + // Use FileStoreScan to get all manifest entries for the partitions + FileStoreScan scan = table.store().newScan(); + scan.withPartitionsFilter(partitionSpecs); + + FileStoreScan.Plan plan = scan.plan(); + List entries = plan.files(); + + // Collect all data file paths + for (ManifestEntry entry : entries) { + DataFileMeta fileMeta = entry.file(); + if (fileMeta != null) { + // Construct path using DataFilePathFactory + BinaryRow partition = entry.partition(); + int bucket = entry.bucket(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + Path dataFilePath = dataFilePathFactory.toPath(fileMeta); + allFiles.add(dataFilePath); + + // Add extra files (like data file indexes) + if (fileMeta.extraFiles() != null) { + for (String extraFile : fileMeta.extraFiles()) { + // Extra files are relative to the bucket path + Path bucketPath = pathFactory.bucketPath(partition, bucket); + Path extraFilePath = new Path(bucketPath, extraFile); + allFiles.add(extraFilePath); + } + } + } + } + + // Also collect manifest files that reference these partitions + // We need to scan through manifest lists to find relevant manifests + collectManifestFiles(partitionSpecs, allFiles); + + return new ArrayList<>(allFiles); + } + + /** + * Collect manifest files that reference the specified partitions. + * + * @param partitionSpecs the partition specifications + * @param allFiles set to add manifest file paths to + */ + private void collectManifestFiles(List> partitionSpecs, Set allFiles) + throws IOException { + // Get the partition paths + List partitionPaths = new ArrayList<>(); + for (Map spec : partitionSpecs) { + LinkedHashMap linkedSpec = new LinkedHashMap<>(spec); + String partitionPath = + PartitionPathUtils.generatePartitionPath( + linkedSpec, table.store().partitionType(), false); + Path fullPartitionPath = new Path(table.location(), partitionPath); + partitionPaths.add(fullPartitionPath); + } + + // Scan through manifests to find those referencing these partitions + FileStoreScan scan = table.store().newScan(); + FileStoreScan.Plan plan = scan.plan(); + + // The manifest entries already contain references to manifests, but we need to + // get the actual manifest file paths. For now, we'll list manifest files from + // the manifest directory and let the archive action handle filtering. + // A more precise implementation would track which manifests reference which partitions. + + Path manifestDir = new Path(table.location(), "manifest"); + if (fileIO.exists(manifestDir)) { + try { + org.apache.paimon.fs.FileStatus[] manifestFiles = fileIO.listStatus(manifestDir); + for (org.apache.paimon.fs.FileStatus status : manifestFiles) { + if (!status.isDir() && status.getPath().getName().startsWith("manifest-")) { + allFiles.add(status.getPath()); + } + } + } catch (IOException e) { + LOG.warn("Failed to list manifest files, continuing without them", e); + } + } + } + + /** + * List all data file paths (excluding manifests) in the specified partition. + * + * @param partitionSpec the partition specification + * @return list of data file paths + * @throws IOException if an error occurs while listing files + */ + public List listDataFiles(Map partitionSpec) throws IOException { + FileStoreScan scan = table.store().newScan(); + scan.withPartitionsFilter(Collections.singletonList(partitionSpec)); + + FileStoreScan.Plan plan = scan.plan(); + List entries = plan.files(); + + List dataFiles = new ArrayList<>(); + for (ManifestEntry entry : entries) { + DataFileMeta fileMeta = entry.file(); + if (fileMeta != null) { + // Construct path using DataFilePathFactory + BinaryRow partition = entry.partition(); + int bucket = entry.bucket(); + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + Path dataFilePath = dataFilePathFactory.toPath(fileMeta); + dataFiles.add(dataFilePath); + + // Add extra files + if (fileMeta.extraFiles() != null) { + for (String extraFile : fileMeta.extraFiles()) { + // Extra files are relative to the bucket path + Path bucketPath = pathFactory.bucketPath(partition, bucket); + Path extraFilePath = new Path(bucketPath, extraFile); + dataFiles.add(extraFilePath); + } + } + } + } + + return dataFiles; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 35949062e307..a842a07b79bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -22,6 +22,9 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.format.FormatMetadataReader; +import org.apache.paimon.format.orc.OrcMetadataReader; +import org.apache.paimon.format.parquet.ParquetMetadataReader; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.globalindex.DataEvolutionBatchScan; @@ -48,6 +51,7 @@ import org.apache.paimon.table.source.DataTableBatchScan; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.DataTableStreamScan; +import org.apache.paimon.table.source.FineGrainedSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -59,6 +63,7 @@ import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileSystemBranchManager; +import org.apache.paimon.utils.HadoopUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SimpleFileReader; @@ -69,6 +74,8 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.hadoop.conf.Configuration; + import javax.annotation.Nullable; import java.io.FileNotFoundException; @@ -270,18 +277,55 @@ public RowKeyExtractor createRowKeyExtractor() { @Override public SnapshotReader newSnapshotReader() { - return new SnapshotReaderImpl( - store().newScan(), - tableSchema, - coreOptions(), - snapshotManager(), - changelogManager(), - splitGenerator(), - nonPartitionFilterConsumer(), - store().pathFactory(), - name(), - store().newIndexFileHandler(), - dvmetaCache); + SplitGenerator baseGenerator = splitGenerator(); + CoreOptions options = coreOptions(); + + // Wrap with FineGrainedSplitGenerator if enabled + if (options.splitFileEnabled()) { + Map metadataReaders = createMetadataReaders(options); + SplitGenerator wrappedGenerator = + new FineGrainedSplitGenerator( + baseGenerator, + true, + options.splitFileThreshold(), + options.splitFileMaxSplits(), + fileIO(), + store().pathFactory(), + metadataReaders); + return new SnapshotReaderImpl( + store().newScan(), + tableSchema, + options, + snapshotManager(), + changelogManager(), + wrappedGenerator, + nonPartitionFilterConsumer(), + store().pathFactory(), + name(), + store().newIndexFileHandler(), + dvmetaCache); + } else { + return new SnapshotReaderImpl( + store().newScan(), + tableSchema, + options, + snapshotManager(), + changelogManager(), + baseGenerator, + nonPartitionFilterConsumer(), + store().pathFactory(), + name(), + store().newIndexFileHandler(), + dvmetaCache); + } + } + + private Map createMetadataReaders(CoreOptions options) { + Map readers = new HashMap<>(); + Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(options.toConfiguration()); + readers.put("parquet", new ParquetMetadataReader()); + readers.put("orc", new OrcMetadataReader(hadoopConfig)); + return readers; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 3a4c112a95ee..f74a3104798b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileSplitBoundary; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; @@ -47,7 +48,9 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -79,6 +82,12 @@ public class DataSplit implements Split { private boolean isStreaming = false; private boolean rawConvertible; + /** + * Optional file split boundaries for finer-grained splitting. Maps file index in dataFiles to + * list of boundaries. This is transient (not serialized) to maintain backward compatibility. + */ + @Nullable private transient Map> fileSplitBoundaries; + public DataSplit() {} public long snapshotId() { @@ -243,24 +252,50 @@ public long partialMergedRowCount() { @Override public Optional> convertToRawFiles() { if (rawConvertible) { - return Optional.of( - dataFiles.stream() - .map(f -> makeRawTableFile(bucketPath, f)) - .collect(Collectors.toList())); + List rawFiles = new ArrayList<>(); + for (int i = 0; i < dataFiles.size(); i++) { + DataFileMeta file = dataFiles.get(i); + List boundaries = + fileSplitBoundaries != null ? fileSplitBoundaries.get(i) : null; + + if (boundaries != null && !boundaries.isEmpty()) { + // Create one RawFile per boundary + // For fine-grained splits, there should be exactly one boundary per file + // (each DataSplit represents one row group/stripe) + for (FileSplitBoundary boundary : boundaries) { + rawFiles.add( + makeRawTableFile( + bucketPath, + file, + boundary.offset(), + boundary.length(), + boundary.rowCount())); + } + } else { + // No boundaries, create single RawFile for whole file + rawFiles.add(makeRawTableFile(bucketPath, file)); + } + } + return Optional.of(rawFiles); } else { return Optional.empty(); } } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { + return makeRawTableFile(bucketPath, file, 0, file.fileSize(), file.rowCount()); + } + + private RawFile makeRawTableFile( + String bucketPath, DataFileMeta file, long offset, long length, long rowCount) { return new RawFile( file.externalPath().orElse(bucketPath + "/" + file.fileName()), file.fileSize(), - 0, - file.fileSize(), + offset, + length, file.fileFormat(), file.schemaId(), - file.rowCount()); + rowCount); } @Override @@ -364,6 +399,7 @@ protected void assign(DataSplit other) { this.dataDeletionFiles = other.dataDeletionFiles; this.isStreaming = other.isStreaming; this.rawConvertible = other.rawConvertible; + this.fileSplitBoundaries = other.fileSplitBoundaries; } public void serialize(DataOutputView out) throws IOException { @@ -556,6 +592,14 @@ public Builder rawConvertible(boolean rawConvertible) { return this; } + public Builder withFileSplitBoundaries( + Map> fileSplitBoundaries) { + if (fileSplitBoundaries != null && !fileSplitBoundaries.isEmpty()) { + this.split.fileSplitBoundaries = new HashMap<>(fileSplitBoundaries); + } + return this; + } + public DataSplit build() { checkArgument(split.partition != null); checkArgument(split.bucket != -1); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FineGrainedSplitGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FineGrainedSplitGenerator.java new file mode 100644 index 000000000000..737a4c563145 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FineGrainedSplitGenerator.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.format.FileSplitBoundary; +import org.apache.paimon.format.FormatMetadataReader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.utils.FileStorePathFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A decorator for {@link SplitGenerator} that enables finer-grained splitting by splitting large + * files at row group (Parquet) or stripe (ORC) boundaries. + * + *

This generator wraps a base {@link SplitGenerator} and, when enabled, queries format metadata + * readers to split large files into multiple splits. The boundaries are stored and can be used + * later when converting to {@link org.apache.paimon.table.source.RawFile} objects. + * + * @since 0.9.0 + */ +public class FineGrainedSplitGenerator implements SplitGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(FineGrainedSplitGenerator.class); + + private final SplitGenerator baseGenerator; + private final boolean enabled; + private final long splitFileThreshold; + private final int maxSplitsPerFile; + private final FileIO fileIO; + private final FileStorePathFactory pathFactory; + private final Map metadataReaders; + + /** + * Map from file name to list of split boundaries. This is populated when fine-grained splitting + * is performed and can be retrieved later. + */ + private final Map> fileBoundaries = new HashMap<>(); + + public FineGrainedSplitGenerator( + SplitGenerator baseGenerator, + boolean enabled, + long splitFileThreshold, + int maxSplitsPerFile, + FileIO fileIO, + FileStorePathFactory pathFactory, + Map metadataReaders) { + this.baseGenerator = baseGenerator; + this.enabled = enabled; + this.splitFileThreshold = splitFileThreshold; + this.maxSplitsPerFile = maxSplitsPerFile; + this.fileIO = fileIO; + this.pathFactory = pathFactory; + this.metadataReaders = metadataReaders; + } + + /** + * Get the split boundaries for a file. Returns empty list if the file was not split or if + * boundaries are not available. + */ + public List getFileBoundaries(String fileName) { + return fileBoundaries.getOrDefault(fileName, new ArrayList<>()); + } + + @Override + public boolean alwaysRawConvertible() { + return baseGenerator.alwaysRawConvertible(); + } + + @Override + public List splitForBatch(List files) { + List baseGroups = baseGenerator.splitForBatch(files); + + if (!enabled || metadataReaders.isEmpty()) { + return baseGroups; + } + + List result = new ArrayList<>(); + + for (SplitGroup group : baseGroups) { + List groupFiles = group.files; + List filesToSplit = new ArrayList<>(); + List filesToKeep = new ArrayList<>(); + + // Separate files that should be split from those that should be kept as-is + for (DataFileMeta file : groupFiles) { + if (shouldSplitFile(file)) { + filesToSplit.add(file); + } else { + filesToKeep.add(file); + } + } + + // Add files that don't need splitting as-is + if (!filesToKeep.isEmpty()) { + result.add( + group.rawConvertible + ? SplitGroup.rawConvertibleGroup(filesToKeep) + : SplitGroup.nonRawConvertibleGroup(filesToKeep)); + } + + // Split large files into multiple groups + for (DataFileMeta file : filesToSplit) { + List boundaries = getSplitBoundaries(file); + if (boundaries.isEmpty() || boundaries.size() == 1) { + // Could not split or only one boundary, keep as single file + result.add( + group.rawConvertible + ? SplitGroup.rawConvertibleGroup( + Collections.singletonList(file)) + : SplitGroup.nonRawConvertibleGroup( + Collections.singletonList(file))); + } else { + // Create one SplitGroup per boundary + // Limit the number of splits per file + int numSplits = Math.min(boundaries.size(), maxSplitsPerFile); + List boundariesToUse = boundaries.subList(0, numSplits); + // Store boundaries for later use in DataSplit.convertToRawFiles() + fileBoundaries.put(file.fileName(), boundariesToUse); + // Create multiple SplitGroups - each will become a separate DataSplit + // Note: All SplitGroups contain the same file, but boundaries are stored + // and will be used when converting to RawFile + for (int i = 0; i < numSplits; i++) { + result.add( + group.rawConvertible + ? SplitGroup.rawConvertibleGroup( + Collections.singletonList(file)) + : SplitGroup.nonRawConvertibleGroup( + Collections.singletonList(file))); + } + } + } + } + + return result; + } + + @Override + public List splitForStreaming(List files) { + // For streaming, don't split files - use base generator as-is + return baseGenerator.splitForStreaming(files); + } + + private boolean shouldSplitFile(DataFileMeta file) { + return file.fileSize() >= splitFileThreshold + && metadataReaders.containsKey(file.fileFormat().toLowerCase()); + } + + private List getSplitBoundaries(DataFileMeta file) { + String format = file.fileFormat().toLowerCase(); + FormatMetadataReader reader = metadataReaders.get(format); + + if (reader == null || !reader.supportsFinerGranularity()) { + return new ArrayList<>(); + } + + try { + // Construct path from external path if available + // If external path is not available, we cannot construct the full path here + // as we don't have partition/bucket info. In this case, return empty list + // and the file will be treated as a single split (fallback behavior). + Path filePath = file.externalPath().map(Path::new).orElse(null); + + if (filePath == null) { + // Cannot construct full path without partition/bucket info + // Fall back to file-level splitting + LOG.debug( + "Cannot construct full path for file {} (no external path), " + + "skipping fine-grained splitting", + file.fileName()); + return new ArrayList<>(); + } + + List boundaries = + reader.getSplitBoundaries(fileIO, filePath, file.fileSize()); + + if (LOG.isDebugEnabled() && !boundaries.isEmpty()) { + LOG.debug( + "Extracted {} boundaries for file {} (format: {})", + boundaries.size(), + file.fileName(), + format); + } + + return boundaries; + } catch (IOException e) { + LOG.warn( + "Failed to extract split boundaries for file {} (format: {}), " + + "will treat as single split", + file.fileName(), + format, + e); + return new ArrayList<>(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index e036710e8ef9..e2fe48e4d46d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; +import org.apache.paimon.format.FileSplitBoundary; import org.apache.paimon.fs.Path; import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileHandler; @@ -46,6 +47,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; +import org.apache.paimon.table.source.FineGrainedSplitGenerator; import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.SplitGenerator; @@ -409,9 +411,64 @@ private List generateSplits( isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); + + // Track boundaries from FineGrainedSplitGenerator if available + FineGrainedSplitGenerator fineGrainedGenerator = null; + if (splitGenerator instanceof FineGrainedSplitGenerator) { + fineGrainedGenerator = (FineGrainedSplitGenerator) splitGenerator; + } + + // Track which SplitGroup index we're on for each file (for fine-grained splits) + Map fileSplitGroupIndex = new HashMap<>(); + for (SplitGenerator.SplitGroup splitGroup : splitGroups) { List dataFiles = splitGroup.files; String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); + + // Check if this is a fine-grained split (single file with boundaries) + if (fineGrainedGenerator != null + && dataFiles.size() == 1 + && splitGroup.rawConvertible) { + DataFileMeta file = dataFiles.get(0); + List boundaries = + fineGrainedGenerator.getFileBoundaries(file.fileName()); + + if (!boundaries.isEmpty()) { + // This SplitGroup corresponds to one boundary + // Get the current index for this file + int boundaryIndex = + fileSplitGroupIndex.getOrDefault(file.fileName(), 0); + + if (boundaryIndex < boundaries.size()) { + // Create DataSplit with single boundary for this file + Map> splitBoundaries = + new HashMap<>(); + splitBoundaries.put( + 0, + Collections.singletonList(boundaries.get(boundaryIndex))); + + builder.withDataFiles(dataFiles) + .rawConvertible(splitGroup.rawConvertible) + .withBucketPath(bucketPath) + .withFileSplitBoundaries(splitBoundaries); + + if (deletionVectors && deletionFilesMap != null) { + builder.withDataDeletionFiles( + getDeletionFiles( + dataFiles, + deletionFilesMap.getOrDefault( + Pair.of(partition, bucket), + Collections.emptyMap()))); + } + splits.add(builder.build()); + fileSplitGroupIndex.put(file.fileName(), boundaryIndex + 1); + } + // Continue to next SplitGroup + continue; + } + } + + // Normal split creation (no fine-grained splitting or boundaries not available) builder.withDataFiles(dataFiles) .rawConvertible(splitGroup.rawConvertible) .withBucketPath(bucketPath); diff --git a/paimon-docs/src/test/java/org/apache/paimon/docs/configuration/ConfigOptionsDocsCompletenessITCase.java b/paimon-docs/src/test/java/org/apache/paimon/docs/configuration/ConfigOptionsDocsCompletenessITCase.java index f71c83c63656..a67997beda69 100644 --- a/paimon-docs/src/test/java/org/apache/paimon/docs/configuration/ConfigOptionsDocsCompletenessITCase.java +++ b/paimon-docs/src/test/java/org/apache/paimon/docs/configuration/ConfigOptionsDocsCompletenessITCase.java @@ -214,6 +214,9 @@ private static Map> parseDocumentedOptions() thro final String rootDir = ConfigOptionsDocGeneratorTest.getProjectRootDir(); Path includeFolder = Paths.get(rootDir, "docs", "layouts", "shortcodes", "generated").toAbsolutePath(); + if (!Files.exists(includeFolder)) { + return Collections.emptyMap(); + } return Files.list(includeFolder) .filter( (path) -> { @@ -224,7 +227,7 @@ private static Map> parseDocumentedOptions() thro file -> { try { return parseDocumentedOptionsFromFile(file).stream(); - } catch (IOException ignored) { + } catch (Exception ignored) { return Stream.empty(); } }) @@ -237,8 +240,21 @@ private static Collection parseDocumentedOptionsFromFile(Path document.outputSettings().syntax(Document.OutputSettings.Syntax.xml); document.outputSettings().prettyPrint(false); return document.getElementsByTag("table").stream() - .map(element -> element.getElementsByTag("tbody").get(0)) - .flatMap(element -> element.getElementsByTag("tr").stream()) + .flatMap( + tableElement -> { + org.jsoup.select.Elements tbodyElements = + tableElement.getElementsByTag("tbody"); + if (tbodyElements.isEmpty()) { + return Stream.empty(); + } + return tbodyElements.get(0).getElementsByTag("tr").stream(); + }) + .filter( + tableRow -> { + // Ensure the row has at least 4 children (key, default, type, + // description) + return tableRow.children().size() >= 4; + }) .map( tableRow -> { // Use split to exclude document key tag. diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcMetadataReader.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcMetadataReader.java new file mode 100644 index 000000000000..a9ebfb016ae8 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcMetadataReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.orc; + +import org.apache.paimon.format.FileSplitBoundary; +import org.apache.paimon.format.FormatMetadataReader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.IOUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.Reader; +import org.apache.orc.StripeInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Metadata reader for ORC files that extracts stripe boundaries for finer-grained splitting. + * + *

This reader extracts the byte offset and length for each stripe in an ORC file, enabling the + * file to be split at stripe boundaries rather than file boundaries. + * + * @since 0.9.0 + */ +public class OrcMetadataReader implements FormatMetadataReader { + + private static final Logger LOG = LoggerFactory.getLogger(OrcMetadataReader.class); + + private final Configuration hadoopConfig; + + public OrcMetadataReader(Configuration hadoopConfig) { + this.hadoopConfig = hadoopConfig; + } + + @Override + public List getSplitBoundaries(FileIO fileIO, Path filePath, long fileSize) + throws IOException { + List boundaries = new ArrayList<>(); + + Reader orcReader = null; + try { + orcReader = OrcReaderFactory.createReader(hadoopConfig, fileIO, filePath, null); + List stripes = orcReader.getStripes(); + + if (stripes.isEmpty()) { + LOG.warn("ORC file {} has no stripes", filePath); + return boundaries; + } + + for (StripeInformation stripe : stripes) { + long offset = stripe.getOffset(); + long length = stripe.getLength(); + long rowCount = stripe.getNumberOfRows(); + + boundaries.add(new FileSplitBoundary(offset, length, rowCount)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Extracted {} stripe boundaries from ORC file {}", + boundaries.size(), + filePath); + } + } catch (Exception e) { + LOG.warn("Failed to extract stripe boundaries from ORC file {}", filePath, e); + // Return empty list on error - file will be treated as single split + return new ArrayList<>(); + } finally { + IOUtils.closeQuietly(orcReader); + } + + return boundaries; + } + + @Override + public boolean supportsFinerGranularity() { + return true; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index a94eaf2356d8..2da9fabb18f1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -97,6 +97,12 @@ public OrcReaderFactory( @Override public OrcVectorizedReader createReader(FormatReaderFactory.Context context) throws IOException { + return createReader(context, 0, context.fileSize()); + } + + @Override + public OrcVectorizedReader createReader( + FormatReaderFactory.Context context, long offset, long length) throws IOException { int poolSize = context instanceof OrcFormatReaderContext ? ((OrcFormatReaderContext) context).poolSize() @@ -110,8 +116,8 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) conjunctPredicates, context.fileIO(), context.filePath(), - 0, - context.fileSize(), + offset, + length, context.selection(), deletionVectorsEnabled); return new OrcVectorizedReader(orcReader, poolOfBatches); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetMetadataReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetMetadataReader.java new file mode 100644 index 000000000000..784afcda7988 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetMetadataReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.format.FileSplitBoundary; +import org.apache.paimon.format.FormatMetadataReader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Metadata reader for Parquet files that extracts row group boundaries for finer-grained splitting. + * + *

This reader extracts the byte offset and length for each row group in a Parquet file, enabling + * the file to be split at row group boundaries rather than file boundaries. + * + * @since 0.9.0 + */ +public class ParquetMetadataReader implements FormatMetadataReader { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataReader.class); + + @Override + public List getSplitBoundaries(FileIO fileIO, Path filePath, long fileSize) + throws IOException { + List boundaries = new ArrayList<>(); + + try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, filePath, fileSize)) { + ParquetMetadata metadata = reader.getFooter(); + List blocks = metadata.getBlocks(); + + if (blocks.isEmpty()) { + LOG.warn("Parquet file {} has no row groups", filePath); + return boundaries; + } + + for (BlockMetaData block : blocks) { + long offset = Long.MAX_VALUE; + long endOffset = Long.MIN_VALUE; + + // Calculate the byte range for this row group by finding the min start and max end + // of all column chunks + List columns = block.getColumns(); + if (columns.isEmpty()) { + LOG.warn("Row group in file {} has no columns, skipping", filePath); + continue; + } + + for (ColumnChunkMetaData column : columns) { + long columnStart = column.getStartingPos(); + long columnEnd = columnStart + column.getTotalSize(); + offset = Math.min(offset, columnStart); + endOffset = Math.max(endOffset, columnEnd); + } + + long length = endOffset - offset; + long rowCount = block.getRowCount(); + + boundaries.add(new FileSplitBoundary(offset, length, rowCount)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Extracted {} row group boundaries from Parquet file {}", + boundaries.size(), + filePath); + } + } catch (Exception e) { + LOG.warn("Failed to extract row group boundaries from Parquet file {}", filePath, e); + // Return empty list on error - file will be treated as single split + return new ArrayList<>(); + } + + return boundaries; + } + + @Override + public boolean supportsFinerGranularity() { + return true; + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 5611bda959cf..7abc51688b92 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -138,6 +138,51 @@ public FileRecordReader createReader(FormatReaderFactory.Context co context.filePath(), reader, fileSchema, fields, writableVectors, batchSize); } + @Override + public FileRecordReader createReader( + FormatReaderFactory.Context context, long offset, long length) throws IOException { + // Use withRange to limit reading to the specified byte range + // This enables reading only specific row groups within the file + ParquetReadOptions.Builder builder = + ParquetReadOptions.builder(new PlainParquetConfiguration()) + .withRange(offset, offset + length); + setReadOptions(builder); + + ParquetFileReader reader = + new ParquetFileReader( + ParquetInputFile.fromPath( + context.fileIO(), context.filePath(), context.fileSize()), + builder.build(), + context.selection()); + MessageType fileSchema = reader.getFileMetaData().getSchema(); + MessageType requestedSchema = clipParquetSchema(fileSchema); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Create reader of the parquet file {} with offset {} and length {}, " + + "the fileSchema is {}, the requestedSchema is {}.", + context.filePath(), + offset, + length, + fileSchema, + requestedSchema); + } + + reader.setRequestedSchema(requestedSchema); + RowType[] shreddingSchemas = + VariantUtils.extractShreddingSchemasFromParquetSchema(readFields, fileSchema); + List> variantFields = + VariantUtils.buildVariantFields(readFields, variantAccess); + WritableColumnVector[] writableVectors = createWritableVectors(variantFields); + + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema); + List fields = + buildFieldsList(readFields, columnIO, shreddingSchemas, variantFields); + + return new VectorizedParquetRecordReader( + context.filePath(), reader, fileSchema, fields, writableVectors, batchSize); + } + private void setReadOptions(ParquetReadOptions.Builder builder) { builder.useSignedStringMinMax( conf.getBoolean("parquet.strings.signed-min-max.enabled", false));