Skip to content

Commit fd0825e

Browse files
committed
Implement path_column for files()
1 parent 2509d54 commit fd0825e

File tree

12 files changed

+200
-10
lines changed

12 files changed

+200
-10
lines changed

be/src/exec/file_scanner/avro_cpp_scanner.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ Status AvroCppScanner::open() {
5454
if (rng.num_of_columns_from_file != first_range.num_of_columns_from_file) {
5555
return Status::InvalidArgument("Column count from file of range mismatch");
5656
}
57-
if (rng.num_of_columns_from_file + rng.columns_from_path.size() != _src_slot_descriptors.size()) {
57+
int path_column_count = (rng.__isset.include_file_path_column && rng.include_file_path_column) ? 1 : 0;
58+
if (rng.num_of_columns_from_file + rng.columns_from_path.size() + path_column_count !=
59+
_src_slot_descriptors.size()) {
5860
return Status::InvalidArgument("Slot descriptor and column count mismatch");
5961
}
6062
}
@@ -115,8 +117,12 @@ StatusOr<ChunkPtr> AvroCppScanner::get_next() {
115117
}
116118
}
117119

118-
fill_columns_from_path(src_chunk, _num_of_columns_from_file, _scan_range.ranges[_next_range - 1].columns_from_path,
119-
src_chunk->num_rows());
120+
const auto& range = _scan_range.ranges[_next_range - 1];
121+
fill_columns_from_path(src_chunk, _num_of_columns_from_file, range.columns_from_path, src_chunk->num_rows());
122+
if (range.__isset.include_file_path_column && range.include_file_path_column) {
123+
int path_column_slot = _num_of_columns_from_file + range.columns_from_path.size();
124+
fill_file_path_column(src_chunk, path_column_slot, range.path, src_chunk->num_rows());
125+
}
120126

121127
return materialize(src_chunk, src_chunk);
122128
}

be/src/exec/file_scanner/csv_scanner.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ Status CSVScanner::open() {
192192
if (rng.num_of_columns_from_file != first_range.num_of_columns_from_file) {
193193
return Status::InvalidArgument("CSV column count of range mismatch");
194194
}
195-
if (rng.num_of_columns_from_file + rng.columns_from_path.size() != _src_slot_descriptors.size()) {
195+
int path_column_count = (rng.__isset.include_file_path_column && rng.include_file_path_column) ? 1 : 0;
196+
if (rng.num_of_columns_from_file + rng.columns_from_path.size() + path_column_count !=
197+
_src_slot_descriptors.size()) {
196198
return Status::InvalidArgument("slot descriptor and column count mismatch");
197199
}
198200
}
@@ -331,8 +333,12 @@ StatusOr<ChunkPtr> CSVScanner::get_next() {
331333
}
332334
} while ((src_chunk)->num_rows() == 0);
333335

334-
fill_columns_from_path(src_chunk, _num_fields_in_csv, _scan_range.ranges[_curr_file_index].columns_from_path,
335-
src_chunk->num_rows());
336+
const auto& range = _scan_range.ranges[_curr_file_index];
337+
fill_columns_from_path(src_chunk, _num_fields_in_csv, range.columns_from_path, src_chunk->num_rows());
338+
if (range.__isset.include_file_path_column && range.include_file_path_column) {
339+
int path_column_slot = _num_fields_in_csv + range.columns_from_path.size();
340+
fill_file_path_column(src_chunk, path_column_slot, range.path, src_chunk->num_rows());
341+
}
336342
ASSIGN_OR_RETURN(chunk, materialize(nullptr, src_chunk));
337343

338344
return std::move(chunk);

be/src/exec/file_scanner/file_scanner.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,17 @@ void FileScanner::fill_columns_from_path(starrocks::ChunkPtr& chunk, int slot_st
168168
}
169169
}
170170

171+
void FileScanner::fill_file_path_column(starrocks::ChunkPtr& chunk, int slot_index, const std::string& file_path,
172+
int size) {
173+
auto varchar_type = TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
174+
auto slot_desc = _src_slot_descriptors.at(slot_index);
175+
if (slot_desc == nullptr) return;
176+
auto col = ColumnHelper::create_column(varchar_type, slot_desc->is_nullable());
177+
Slice s(file_path.c_str(), file_path.size());
178+
col->append_value_multiple_times(&s, size);
179+
chunk->append_column(std::move(col), slot_desc->id());
180+
}
181+
171182
StatusOr<ChunkPtr> FileScanner::materialize(const starrocks::ChunkPtr& src, starrocks::ChunkPtr& cast) {
172183
SCOPED_RAW_TIMER(&_counter->materialize_ns);
173184

be/src/exec/file_scanner/file_scanner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class FileScanner {
8383
protected:
8484
void fill_columns_from_path(ChunkPtr& chunk, int slot_start, const std::vector<std::string>& columns_from_path,
8585
int size);
86+
// fill the file path column with the full file path for all rows in the chunk
87+
void fill_file_path_column(ChunkPtr& chunk, int slot_index, const std::string& file_path, int size);
8688
// materialize is used to transform source chunk depicted by src_slot_descriptors into destination
8789
// chunk depicted by dest_slot_descriptors
8890
StatusOr<ChunkPtr> materialize(const starrocks::ChunkPtr& src, starrocks::ChunkPtr& cast);

be/src/exec/file_scanner/orc_scanner.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ StatusOr<ChunkPtr> ORCScanner::_next_orc_chunk() {
147147
if (range.__isset.num_of_columns_from_file) {
148148
fill_columns_from_path(chunk, range.num_of_columns_from_file, range.columns_from_path, chunk->num_rows());
149149
}
150+
if (range.__isset.include_file_path_column && range.include_file_path_column) {
151+
int path_column_slot = range.num_of_columns_from_file + range.columns_from_path.size();
152+
fill_file_path_column(chunk, path_column_slot, range.path, chunk->num_rows());
153+
}
150154
return std::move(chunk);
151155
} catch (orc::ParseError& e) {
152156
std::string s = strings::Substitute("ParseError: $0", e.what());

be/src/exec/file_scanner/parquet_scanner.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,15 @@ Status ParquetScanner::finalize_src_chunk(ChunkPtr* chunk) {
143143
column = ColumnHelper::unfold_const_column(slot_desc->type(), (*chunk)->num_rows(), std::move(column));
144144
cast_chunk->append_column(column, slot_desc->id());
145145
}
146-
auto range = _scan_range.ranges.at(_next_file - 1);
146+
const auto& range = _scan_range.ranges.at(_next_file - 1);
147147
if (range.__isset.num_of_columns_from_file) {
148148
fill_columns_from_path(cast_chunk, range.num_of_columns_from_file, range.columns_from_path,
149149
cast_chunk->num_rows());
150150
}
151+
if (range.__isset.include_file_path_column && range.include_file_path_column) {
152+
int path_column_slot = range.num_of_columns_from_file + range.columns_from_path.size();
153+
fill_file_path_column(cast_chunk, path_column_slot, range.path, cast_chunk->num_rows());
154+
}
151155
if (VLOG_ROW_IS_ON) {
152156
VLOG_ROW << "after finalize chunk: " << cast_chunk->debug_columns();
153157
}

fe/fe-core/src/main/java/com/starrocks/catalog/TableFunctionTable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class TableFunctionTable extends Table {
123123
public static final String PROPERTY_PARTITION_BY = "partition_by";
124124

125125
public static final String PROPERTY_COLUMNS_FROM_PATH = "columns_from_path";
126+
public static final String PROPERTY_PATH_COLUMN = "path_column";
126127
private static final String PROPERTY_STRICT_MODE = LoadStmt.STRICT_MODE;
127128

128129
public static final String PROPERTY_AUTO_DETECT_SAMPLE_FILES = "auto_detect_sample_files";
@@ -179,6 +180,7 @@ public enum FilesTableType {
179180
private int autoDetectSampleRows = DEFAULT_AUTO_DETECT_SAMPLE_ROWS;
180181

181182
private List<String> columnsFromPath = new ArrayList<>();
183+
private String pathColumnName = null;
182184
private boolean strictMode = false;
183185
private final Map<String, String> properties;
184186

@@ -263,6 +265,11 @@ private void setSchemaForLoadAndQuery() throws DdlException {
263265
// get columns from path
264266
columns.addAll(getSchemaFromPath());
265267

268+
// add path column if specified
269+
if (pathColumnName != null) {
270+
columns.add(new Column(pathColumnName, StringType.DEFAULT_STRING, true));
271+
}
272+
266273
// check duplicate columns
267274
checkDuplicateColumns(columns);
268275

@@ -469,6 +476,11 @@ private void parsePropertiesForLoad(Map<String, String> properties) throws DdlEx
469476
}
470477
}
471478

479+
String pathColumnProp = properties.get(PROPERTY_PATH_COLUMN);
480+
if (!Strings.isNullOrEmpty(pathColumnProp)) {
481+
pathColumnName = pathColumnProp.trim();
482+
}
483+
472484
if (properties.containsKey(PROPERTY_STRICT_MODE)) {
473485
strictMode = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_MODE));
474486
}
@@ -724,6 +736,14 @@ public List<String> getColumnsFromPath() {
724736
return columnsFromPath;
725737
}
726738

739+
public String getPathColumnName() {
740+
return pathColumnName;
741+
}
742+
743+
public boolean hasPathColumn() {
744+
return pathColumnName != null;
745+
}
746+
727747
public boolean isStrictMode() {
728748
return strictMode;
729749
}

fe/fe-core/src/main/java/com/starrocks/load/BrokerFileGroup.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class BrokerFileGroup implements Writable {
9191
// fileFieldNames should be filled automatically according to schema and mapping when loading from hive table.
9292
private List<String> fileFieldNames;
9393
private List<String> columnsFromPath;
94+
// if true, the file path will be exposed as an additional column after columnsFromPath
95+
private boolean hasPathColumn = false;
9496
// columnExprList includes all fileFieldNames, columnsFromPath and column mappings
9597
// this param will be recreated by data desc when the log replay
9698
private List<ImportColumnDesc> columnExprList;
@@ -143,6 +145,7 @@ public BrokerFileGroup(TableFunctionTable table, Set<String> scanColumns) throws
143145

144146
this.columnExprList = table.getColumnExprList(scanColumns);
145147
this.columnsFromPath = table.getColumnsFromPath();
148+
this.hasPathColumn = table.hasPathColumn();
146149
}
147150

148151
public BrokerFileGroup(DataDescription dataDescription) {
@@ -326,6 +329,10 @@ public List<String> getColumnsFromPath() {
326329
return columnsFromPath;
327330
}
328331

332+
public boolean hasPathColumn() {
333+
return hasPathColumn;
334+
}
335+
329336
public List<ImportColumnDesc> getColumnExprList() {
330337
return columnExprList;
331338
}

fe/fe-core/src/main/java/com/starrocks/planner/FileScanNode.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,9 @@ private void processFileGroup(
577577
TFileFormatType formatType = Load.getFormatType(context.fileGroup.getFileFormat(), fileStatus.path);
578578
List<String> columnsFromPath = HdfsUtil.parseColumnsFromPath(fileStatus.path,
579579
context.fileGroup.getColumnsFromPath());
580-
int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size();
580+
boolean hasPathColumn = context.fileGroup.hasPathColumn();
581+
int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size()
582+
- (hasPathColumn ? 1 : 0);
581583

582584
smallestLocations = locationsHeap.poll();
583585
long leftBytes = fileStatus.size - curFileOffset;
@@ -595,7 +597,7 @@ private void processFileGroup(
595597

596598
TBrokerRangeDesc rangeDesc =
597599
createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes, columnsFromPath,
598-
numberOfColumnsFromFile);
600+
numberOfColumnsFromFile, hasPathColumn);
599601

600602
rangeDesc.setStrip_outer_array(jsonOptions.stripOuterArray);
601603
rangeDesc.setJsonpaths(jsonOptions.jsonPaths);
@@ -630,7 +632,8 @@ private boolean isFileFormatSupportSplit(TFileFormatType format) {
630632

631633
private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus,
632634
TFileFormatType formatType, long rangeBytes,
633-
List<String> columnsFromPath, int numberOfColumnsFromFile) {
635+
List<String> columnsFromPath, int numberOfColumnsFromFile,
636+
boolean hasPathColumn) {
634637
TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
635638
rangeDesc.setFile_type(TFileType.FILE_BROKER);
636639
rangeDesc.setFormat_type(formatType);
@@ -641,6 +644,7 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt
641644
rangeDesc.setFile_size(fileStatus.size);
642645
rangeDesc.setNum_of_columns_from_file(numberOfColumnsFromFile);
643646
rangeDesc.setColumns_from_path(columnsFromPath);
647+
rangeDesc.setInclude_file_path_column(hasPathColumn);
644648
return rangeDesc;
645649
}
646650

fe/fe-core/src/test/java/com/starrocks/catalog/TableFunctionTableTest.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,90 @@ public void testParquetVersion() {
362362
"Invalid parquet.version: '2.0'. Expected values should be 2.4, 2.6, 1.0",
363363
() -> new TableFunctionTable(new ArrayList<>(), properties, new SessionVariable()));
364364
}
365+
366+
@Test
367+
public void testPathColumn() {
368+
// Test path_column property parsing
369+
Assertions.assertDoesNotThrow(() -> {
370+
Map<String, String> properties = newProperties();
371+
properties.put("path_column", "_filepath");
372+
TableFunctionTable table = new TableFunctionTable(properties);
373+
374+
// Check path_column is parsed correctly
375+
Assertions.assertTrue(table.hasPathColumn());
376+
Assertions.assertEquals("_filepath", table.getPathColumnName());
377+
378+
// Check schema includes path column after columns_from_path
379+
List<Column> schema = table.getFullSchema();
380+
// Schema: col_int, col_string (from file) + col_path1, col_path2, col_path3 (from path) + _filepath (path column)
381+
Assertions.assertEquals(6, schema.size());
382+
Assertions.assertEquals(new Column("col_int", IntegerType.INT), schema.get(0));
383+
Assertions.assertEquals(new Column("col_string", VarcharType.VARCHAR), schema.get(1));
384+
Assertions.assertEquals(new Column("col_path1", StringType.DEFAULT_STRING, true), schema.get(2));
385+
Assertions.assertEquals(new Column("col_path2", StringType.DEFAULT_STRING, true), schema.get(3));
386+
Assertions.assertEquals(new Column("col_path3", StringType.DEFAULT_STRING, true), schema.get(4));
387+
Assertions.assertEquals(new Column("_filepath", StringType.DEFAULT_STRING, true), schema.get(5));
388+
});
389+
}
390+
391+
@Test
392+
public void testPathColumnWithoutColumnsFromPath() {
393+
// Test path_column without columns_from_path
394+
Assertions.assertDoesNotThrow(() -> {
395+
Map<String, String> properties = new HashMap<>();
396+
properties.put("path", "fake://some_bucket/some_path/*");
397+
properties.put("format", "ORC");
398+
properties.put("path_column", "source_file");
399+
TableFunctionTable table = new TableFunctionTable(properties);
400+
401+
Assertions.assertTrue(table.hasPathColumn());
402+
Assertions.assertEquals("source_file", table.getPathColumnName());
403+
404+
// Schema: col_int, col_string (from file) + source_file (path column)
405+
List<Column> schema = table.getFullSchema();
406+
Assertions.assertEquals(3, schema.size());
407+
Assertions.assertEquals(new Column("source_file", StringType.DEFAULT_STRING, true), schema.get(2));
408+
});
409+
}
410+
411+
@Test
412+
public void testNoPathColumn() {
413+
// Test without path_column
414+
Assertions.assertDoesNotThrow(() -> {
415+
Map<String, String> properties = newProperties();
416+
TableFunctionTable table = new TableFunctionTable(properties);
417+
418+
Assertions.assertFalse(table.hasPathColumn());
419+
Assertions.assertNull(table.getPathColumnName());
420+
});
421+
}
422+
423+
@Test
424+
public void testDuplicatePathColumnName() {
425+
// Test duplicate path_column name with file column
426+
Map<String, String> properties = newProperties();
427+
properties.put("path_column", "col_int");
428+
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
429+
"Duplicate column name 'col_int'",
430+
() -> new TableFunctionTable(properties));
431+
432+
// Test duplicate path_column name with columns_from_path
433+
properties.put("path_column", "col_path1");
434+
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
435+
"Duplicate column name 'col_path1'",
436+
() -> new TableFunctionTable(properties));
437+
}
438+
439+
@Test
440+
public void testPathColumnWithWhitespace() {
441+
// Test path_column with leading/trailing whitespace
442+
Assertions.assertDoesNotThrow(() -> {
443+
Map<String, String> properties = newProperties();
444+
properties.put("path_column", " _filepath ");
445+
TableFunctionTable table = new TableFunctionTable(properties);
446+
447+
Assertions.assertTrue(table.hasPathColumn());
448+
Assertions.assertEquals("_filepath", table.getPathColumnName());
449+
});
450+
}
365451
}

0 commit comments

Comments
 (0)