Skip to content
Open
38 changes: 38 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,32 @@ public InlineElement getDescription() {
"Open file cost of a source file. It is used to avoid reading"
+ " too many files with a source split, which can be very slow.");

public static final ConfigOption<Boolean> SOURCE_SPLIT_FILE_ENABLED =
key("source.split.file-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable finer-grained file splitting. When enabled, large files can be split "
+ "by row groups (Parquet) or stripes (ORC) to improve concurrency. "
+ "This feature is disabled by default for backward compatibility.");

public static final ConfigOption<MemorySize> SOURCE_SPLIT_FILE_THRESHOLD =
key("source.split.file-threshold")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription(
"Minimum file size to consider for finer-grained splitting. Files smaller "
+ "than this threshold will not be split further.");

public static final ConfigOption<Integer> SOURCE_SPLIT_FILE_MAX_SPLITS =
key("source.split.file-max-splits")
.intType()
.defaultValue(100)
.withDescription(
"Maximum number of splits to generate per file when using finer-grained "
+ "splitting. This prevents excessive splitting for files with many "
+ "row groups or stripes.");

public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
key("write-buffer-size")
.memoryType()
Expand Down Expand Up @@ -2534,6 +2560,18 @@ public long splitOpenFileCost() {
return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
}

public boolean splitFileEnabled() {
return options.get(SOURCE_SPLIT_FILE_ENABLED);
}

public long splitFileThreshold() {
return options.get(SOURCE_SPLIT_FILE_THRESHOLD).getBytes();
}

public int splitFileMaxSplits() {
return options.get(SOURCE_SPLIT_FILE_MAX_SPLITS);
}

public long writeBufferSize() {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import java.util.Objects;

/**
* Represents a split boundary within a file (e.g., a row group in Parquet or a stripe in ORC).
*
* <p>This class contains the byte offset, length, and row count for a portion of a file that can be
* read independently. This enables finer-grained splitting than file-level splitting.
*
* @since 0.9.0
*/
public class FileSplitBoundary {

private final long offset;
private final long length;
private final long rowCount;

public FileSplitBoundary(long offset, long length, long rowCount) {
this.offset = offset;
this.length = length;
this.rowCount = rowCount;
}

/** Byte offset where this split starts in the file. */
public long offset() {
return offset;
}

/** Byte length of this split. */
public long length() {
return length;
}

/** Number of rows in this split. */
public long rowCount() {
return rowCount;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FileSplitBoundary that = (FileSplitBoundary) o;
return offset == that.offset && length == that.length && rowCount == that.rowCount;
}

@Override
public int hashCode() {
return Objects.hash(offset, length, rowCount);
}

@Override
public String toString() {
return String.format(
"FileSplitBoundary{offset=%d, length=%d, rowCount=%d}", offset, length, rowCount);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;

import java.io.IOException;
import java.util.List;

/**
* Interface for reading format-specific metadata to enable finer-grained splitting.
*
* <p>This interface allows extracting split boundaries (e.g., row groups for Parquet, stripes for
* ORC) from files, enabling splits to be generated at finer granularity than file level.
*
* @since 0.9.0
*/
public interface FormatMetadataReader {

/**
* Get split boundaries for a file (row groups for Parquet, stripes for ORC).
*
* <p>Each boundary represents a portion of the file that can be read independently. The
* boundaries are returned in order, starting from the beginning of the file.
*
* @param fileIO FileIO instance to read the file
* @param filePath Path to the file
* @param fileSize Size of the file in bytes
* @return List of split boundaries, one per row group/stripe. Returns empty list if file cannot
* be split further or if an error occurs.
* @throws IOException if an error occurs while reading file metadata
*/
List<FileSplitBoundary> getSplitBoundaries(FileIO fileIO, Path filePath, long fileSize)
throws IOException;

/**
* Check if this format supports finer-grained splitting.
*
* @return true if the format supports splitting files into smaller units (e.g., row groups,
* stripes), false otherwise
*/
boolean supportsFinerGranularity();
}
Loading
Loading