diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 2798c60..43d3900 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -18,6 +18,7 @@ import com.altinity.ice.cli.internal.cmd.DeleteNamespace; import com.altinity.ice.cli.internal.cmd.DeleteTable; import com.altinity.ice.cli.internal.cmd.Describe; +import com.altinity.ice.cli.internal.cmd.DescribeParquet; import com.altinity.ice.cli.internal.cmd.Insert; import com.altinity.ice.cli.internal.cmd.InsertWatch; import com.altinity.ice.cli.internal.cmd.Scan; @@ -134,6 +135,68 @@ void describe( } } + @CommandLine.Command(name = "describe-parquet", description = "Describe parquet file metadata.") + void describeParquet( + @CommandLine.Parameters( + arity = "1", + paramLabel = "", + description = "Path to parquet file") + String target, + @CommandLine.Option( + names = {"-a", "--all"}, + description = "Show everything") + boolean showAll, + @CommandLine.Option( + names = {"-s", "--summary"}, + description = "Show size, rows, number of row groups, size, compress_size, etc.") + boolean showSummary, + @CommandLine.Option( + names = {"--columns"}, + description = "Show columns") + boolean showColumns, + @CommandLine.Option( + names = {"-r", "--row-groups"}, + description = "Show row groups") + boolean showRowGroups, + @CommandLine.Option( + names = {"-d", "--row-group-details"}, + description = "Show column stats within row group") + boolean showRowGroupDetails, + @CommandLine.Option( + names = {"--json"}, + description = "Output JSON instead of YAML") + boolean json, + @CommandLine.Option(names = {"--s3-region"}) String s3Region, + @CommandLine.Option( + names = {"--s3-no-sign-request"}, + description = "Access S3 files without authentication") + boolean s3NoSignRequest) + throws IOException { + setAWSRegion(s3Region); + try (RESTCatalog catalog = loadCatalog()) { + var options = new java.util.ArrayList(); + if (showAll || showSummary) { + options.add(DescribeParquet.Option.SUMMARY); + } + if (showAll || showColumns) { + options.add(DescribeParquet.Option.COLUMNS); + } + if (showAll || showRowGroups) { + options.add(DescribeParquet.Option.ROW_GROUPS); + } + if (showAll || showRowGroupDetails) { + options.add(DescribeParquet.Option.ROW_GROUP_DETAILS); + } + + if (options.isEmpty()) { + options.add(DescribeParquet.Option.SUMMARY); + } + + DescribeParquet.run( + catalog, target, json, s3NoSignRequest, options.toArray(new DescribeParquet.Option[0])); + } + } + public record IceSortOrder( @JsonProperty("column") String column, @JsonProperty("desc") boolean desc, diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DescribeParquet.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DescribeParquet.java new file mode 100644 index 0000000..83abf7a --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DescribeParquet.java @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.cli.internal.cmd; + +import com.altinity.ice.cli.internal.iceberg.io.Input; +import com.altinity.ice.cli.internal.iceberg.parquet.Metadata; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionSyncClient; +import software.amazon.awssdk.utils.Lazy; + +public final class DescribeParquet { + + private DescribeParquet() {} + + public enum Option { + ALL, + SUMMARY, + COLUMNS, + ROW_GROUPS, + ROW_GROUP_DETAILS + } + + public static void run( + RESTCatalog catalog, + String filePath, + boolean json, + boolean s3NoSignRequest, + Option... options) + throws IOException { + + Lazy s3ClientLazy = + new Lazy<>( + () -> + new S3CrossRegionSyncClient( + com.altinity.ice.cli.internal.s3.S3.newClient(s3NoSignRequest))); + FileIO io = Input.newIO(filePath, null, s3ClientLazy); + InputFile inputFile = Input.newFile(filePath, catalog, io); + run(inputFile, json, options); + } + + public static void run(InputFile inputFile, boolean json, Option... options) throws IOException { + + ParquetMetadata metadata = Metadata.read(inputFile); + + ParquetInfo info = extractParquetInfo(metadata, options); + + ObjectMapper mapper = json ? new ObjectMapper() : new ObjectMapper(new YAMLFactory()); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + String output = mapper.writeValueAsString(info); + System.out.println(output); + } + + private static ParquetInfo extractParquetInfo(ParquetMetadata metadata, Option... options) { + var optionsSet = java.util.Set.of(options); + boolean includeAll = optionsSet.contains(Option.ALL); + + FileMetaData fileMetadata = metadata.getFileMetaData(); + + // Summary info + Summary summary = null; + if (includeAll || optionsSet.contains(Option.SUMMARY)) { + long totalRows = metadata.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum(); + + long compressedSize = + metadata.getBlocks().stream().mapToLong(BlockMetaData::getCompressedSize).sum(); + + long uncompressedSize = + metadata.getBlocks().stream().mapToLong(BlockMetaData::getTotalByteSize).sum(); + + summary = + new Summary( + totalRows, + metadata.getBlocks().size(), + compressedSize, + uncompressedSize, + fileMetadata.getCreatedBy(), + fileMetadata.getSchema().getFieldCount()); + } + + // Column info + List columns = null; + if (includeAll || optionsSet.contains(Option.COLUMNS)) { + columns = extractColumns(fileMetadata.getSchema()); + } + + // Row group info + List rowGroups = null; + if (includeAll + || optionsSet.contains(Option.ROW_GROUPS) + || optionsSet.contains(Option.ROW_GROUP_DETAILS)) { + boolean includeDetails = includeAll || optionsSet.contains(Option.ROW_GROUP_DETAILS); + rowGroups = extractRowGroups(metadata.getBlocks(), includeDetails); + } + + return new ParquetInfo(summary, columns, rowGroups); + } + + private static List extractColumns(MessageType schema) { + List columns = new ArrayList<>(); + for (Type field : schema.getFields()) { + String logicalType = null; + if (field.isPrimitive()) { + var annotation = field.asPrimitiveType().getLogicalTypeAnnotation(); + logicalType = annotation != null ? annotation.toString() : null; + } + columns.add( + new Column( + field.getName(), + field.isPrimitive() ? field.asPrimitiveType().getPrimitiveTypeName().name() : "GROUP", + field.getRepetition().name(), + logicalType)); + } + return columns; + } + + private static List extractRowGroups( + List blocks, boolean includeDetails) { + List rowGroups = new ArrayList<>(); + + for (int i = 0; i < blocks.size(); i++) { + BlockMetaData block = blocks.get(i); + + List columnChunks = null; + if (includeDetails) { + columnChunks = new ArrayList<>(); + for (ColumnChunkMetaData column : block.getColumns()) { + Statistics stats = column.getStatistics(); + + ColumnStats columnStats = null; + if (stats != null && !stats.isEmpty()) { + long nulls = stats.isNumNullsSet() ? stats.getNumNulls() : 0; + String min = null; + String max = null; + if (stats.hasNonNullValue()) { + Object minVal = stats.genericGetMin(); + Object maxVal = stats.genericGetMax(); + min = minVal != null ? minVal.toString() : null; + max = maxVal != null ? maxVal.toString() : null; + } + columnStats = new ColumnStats(nulls, min, max); + } + + columnChunks.add( + new ColumnChunk( + column.getPath().toDotString(), + column.getPrimitiveType().getName(), + column.getEncodings().toString(), + column.getCodec().name(), + column.getTotalSize(), + column.getTotalUncompressedSize(), + column.getValueCount(), + columnStats)); + } + } + + rowGroups.add( + new RowGroup( + i, + block.getRowCount(), + block.getTotalByteSize(), + block.getCompressedSize(), + block.getStartingPos(), + columnChunks)); + } + + return rowGroups; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record ParquetInfo(Summary summary, List columns, List rowGroups) {} + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record Summary( + long rows, + int rowGroups, + long compressedSize, + long uncompressedSize, + String createdBy, + int columnCount) {} + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record Column(String name, String type, String repetition, String logicalType) {} + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record RowGroup( + int index, + long rowCount, + long totalSize, + long compressedSize, + long startingPos, + List columns) {} + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record ColumnChunk( + String path, + String type, + String encodings, + String codec, + long totalSize, + long uncompressedSize, + long valueCount, + ColumnStats stats) {} + + @JsonInclude(JsonInclude.Include.NON_NULL) + public record ColumnStats(long nulls, String min, String max) {} +} diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/io/Input.java b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/io/Input.java index 927a639..cd99444 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/io/Input.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/io/Input.java @@ -89,8 +89,11 @@ case String s when (s.startsWith("http:") || s.startsWith("https:")) -> { createParentDirs(dst.toFile()); String tempName = name + "~"; Path tmp = Paths.get(httpCachePath, tempName); + // Clean up any existing temp file from previous interrupted runs + if (Files.exists(tmp)) { + Files.delete(tmp); + } try (InputStream in = URI.create(s).toURL().openStream()) { - // FIXME: race with another copy Files.copy(in, tmp); } Files.move(tmp, dst); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/DescribeParquetTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/DescribeParquetTest.java new file mode 100644 index 0000000..a58d8aa --- /dev/null +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/DescribeParquetTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.cli.internal.cmd; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.altinity.ice.test.Resource; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.testng.annotations.Test; + +public class DescribeParquetTest { + + @Test + public void testDescribeParquetSummary() throws IOException { + ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outContent)); + + try { + InMemoryCatalog catalog = new InMemoryCatalog(); + catalog.initialize("test", java.util.Map.of()); + + var sampleFile = + Resource.asInputFile("com/altinity/ice/cli/internal/iceberg/parquet/sample-001.parquet"); + + DescribeParquet.run(sampleFile, false, DescribeParquet.Option.SUMMARY); + + String output = outContent.toString(); + + assertThat(output).contains("rows:"); + assertThat(output).contains("rowGroups:"); + assertThat(output).contains("compressedSize:"); + assertThat(output).contains("uncompressedSize:"); + } finally { + System.setOut(originalOut); + } + } + + @Test + public void testDescribeParquetColumns() throws IOException { + ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outContent)); + + try { + var sampleFile = + Resource.asInputFile("com/altinity/ice/cli/internal/iceberg/parquet/sample-001.parquet"); + + DescribeParquet.run(sampleFile, false, DescribeParquet.Option.COLUMNS); + + String output = outContent.toString(); + + assertThat(output).contains("columns:"); + assertThat(output).contains("name:"); + assertThat(output).contains("type:"); + } finally { + System.setOut(originalOut); + } + } + + @Test + public void testDescribeParquetJson() throws IOException { + ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + PrintStream originalOut = System.out; + System.setOut(new PrintStream(outContent)); + + try { + var sampleFile = + Resource.asInputFile("com/altinity/ice/cli/internal/iceberg/parquet/sample-001.parquet"); + + DescribeParquet.run(sampleFile, true, DescribeParquet.Option.SUMMARY); + + String output = outContent.toString(); + + assertThat(output).contains("{"); + assertThat(output).contains("}"); + assertThat(output).contains("\"summary\""); + } finally { + System.setOut(originalOut); + } + } +}