From 0982b5d50b327405f62151f95f926d5ce1058e06 Mon Sep 17 00:00:00 2001 From: Yann Date: Sun, 21 Dec 2025 20:27:09 +0800 Subject: [PATCH 1/2] [spark] Initiate fluss-spark and introduce spark catalog and table --- .github/workflows/ci-template.yaml | 2 +- .github/workflows/stage.sh | 22 +- .scalafmt.conf | 70 ++++ copyright.txt | 17 + .../apache/fluss/config/Configuration.java | 4 +- fluss-spark/fluss-spark-3.4/pom.xml | 129 +++++++ fluss-spark/fluss-spark-3.5/pom.xml | 129 +++++++ fluss-spark/fluss-spark-common/pom.xml | 71 ++++ .../org/apache/fluss/spark/SparkCatalog.scala | 109 ++++++ .../fluss/spark/SparkConnectorOptions.scala | 61 ++++ .../apache/fluss/spark/SparkConversions.scala | 100 ++++++ .../org/apache/fluss/spark/SparkTable.scala | 25 ++ .../spark/catalog/AbstractSparkTable.scala | 43 +++ .../catalog/SupportsFlussNamespaces.scala | 95 +++++ .../SupportsFlussPartitionManagement.scala | 53 +++ .../fluss/spark/catalog/WithFlussAdmin.scala | 60 ++++ .../spark/types/FlussToSparkTypeVisitor.scala | 116 ++++++ .../spark/types/SparkToFlussTypeVisitor.scala | 99 +++++ .../spark/sql/FlussIdentityTransform.scala | 31 ++ fluss-spark/fluss-spark-ut/pom.xml | 125 +++++++ .../apache/fluss/spark/FlussCatalogTest.scala | 142 ++++++++ .../fluss/spark/FlussSparkTestBase.scala | 81 +++++ fluss-spark/pom.xml | 337 ++++++++++++++++++ fluss-test-coverage/pom.xml | 41 +++ pom.xml | 19 + 25 files changed, 1978 insertions(+), 3 deletions(-) create mode 100644 .scalafmt.conf create mode 100644 copyright.txt create mode 100644 fluss-spark/fluss-spark-3.4/pom.xml create mode 100644 fluss-spark/fluss-spark-3.5/pom.xml create mode 100644 fluss-spark/fluss-spark-common/pom.xml create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala create mode 100644 fluss-spark/fluss-spark-ut/pom.xml create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala create mode 100644 fluss-spark/pom.xml diff --git a/.github/workflows/ci-template.yaml b/.github/workflows/ci-template.yaml index c5f77af3ad..7179b0c689 100644 --- a/.github/workflows/ci-template.yaml +++ b/.github/workflows/ci-template.yaml @@ -36,7 +36,7 @@ jobs: strategy: fail-fast: false matrix: - module: [ core, flink, lake ] + module: [ core, flink, spark3, lake ] name: "${{ matrix.module }}" steps: - name: Checkout code diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh index 18587dc292..b35b74e47f 100755 --- a/.github/workflows/stage.sh +++ b/.github/workflows/stage.sh @@ -19,6 +19,7 @@ STAGE_CORE="core" STAGE_FLINK="flink" +STAGE_SPARK="spark3" STAGE_LAKE="lake" MODULES_FLINK="\ @@ -28,6 +29,20 @@ fluss-flink/fluss-flink-2.2,\ fluss-flink/fluss-flink-1.20,\ " +MODULES_COMMON_SPARK="\ +fluss-spark,\ +fluss-spark/fluss-spark-common,\ +fluss-spark/fluss-spark-ut,\ +" + +MODULES_SPARK3="\ +fluss-spark,\ +fluss-spark/fluss-spark-common,\ +fluss-spark/fluss-spark-ut,\ +fluss-spark/fluss-spark-3.5,\ +fluss-spark/fluss-spark-3.4,\ +" + # we move Flink legacy version tests to "lake" module for balancing testing time MODULES_LAKE="\ fluss-flink/fluss-flink-1.19,\ @@ -42,10 +57,12 @@ function get_test_modules_for_stage() { local stage=$1 local modules_flink=$MODULES_FLINK + local modules_spark3=$MODULES_SPARK3 local modules_lake=$MODULES_LAKE local negated_flink=\!${MODULES_FLINK//,/,\!} + local negated_spark=\!${MODULES_COMMON_SPARK//,/,\!} local negated_lake=\!${MODULES_LAKE//,/,\!} - local modules_core="$negated_flink,$negated_lake" + local modules_core="$negated_flink,$negated_spark,$negated_lake" case ${stage} in (${STAGE_CORE}) @@ -54,6 +71,9 @@ function get_test_modules_for_stage() { (${STAGE_FLINK}) echo "-pl fluss-test-coverage,$modules_flink" ;; + (${STAGE_SPARK}) + echo "-Pspark3 -pl fluss-test-coverage,$modules_spark3" + ;; (${STAGE_LAKE}) echo "-pl fluss-test-coverage,$modules_lake" ;; diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000000..7a7c55f0f4 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,70 @@ +runner.dialect = scala212 + +# Version is required to make sure IntelliJ picks the right version +version = 3.10.2 +preset = default + +# Max column +maxColumn = 100 + +# This parameter simply says the .stripMargin method was not redefined by the user to assign +# special meaning to indentation preceding the | character. Hence, that indentation can be modified. +assumeStandardLibraryStripMargin = true +align.stripMargin = true + +# Align settings +align.preset = none +align.closeParenSite = false +align.openParenCallSite = false +danglingParentheses.defnSite = false +danglingParentheses.callSite = false +danglingParentheses.ctrlSite = true +danglingParentheses.tupleSite = false +align.openParenCallSite = false +align.openParenDefnSite = false +align.openParenTupleSite = false + +# Newlines +newlines.alwaysBeforeElseAfterCurlyIf = false +newlines.beforeCurlyLambdaParams = multiline # Newline before lambda params +newlines.afterCurlyLambdaParams = squash # No newline after lambda params +newlines.inInterpolation = "avoid" +newlines.avoidInResultType = true +optIn.annotationNewlines = true + +# Scaladoc +docstrings.style = Asterisk # Javadoc style +docstrings.removeEmpty = true +docstrings.oneline = fold +docstrings.forceBlankLineBefore = true + +# Indentation +indent.extendSite = 2 # This makes sure extend is not indented as the ctor parameters + +# Rewrites +rewrite.rules = [AvoidInfix, Imports, RedundantBraces, SortModifiers] + +# Imports +rewrite.imports.sort = scalastyle +rewrite.imports.groups = [ + ["org.apache.fluss\\..*"], + ["org.apache.fluss.shade\\..*"], + [".*"], + ["javax\\..*"], + ["java\\..*"], + ["scala\\..*"] +] +rewrite.imports.contiguousGroups = no +importSelectors = singleline # Imports in a single line, like IntelliJ + +# Remove redundant braces in string interpolation. +rewrite.redundantBraces.stringInterpolation = true +rewrite.redundantBraces.defnBodies = false +rewrite.redundantBraces.generalExpressions = false +rewrite.redundantBraces.ifElseExpressions = false +rewrite.redundantBraces.methodBodies = false +rewrite.redundantBraces.includeUnitMethods = false +rewrite.redundantBraces.maxBreaks = 1 + +# Remove trailing commas +rewrite.trailingCommas.style = "never" diff --git a/copyright.txt b/copyright.txt new file mode 100644 index 0000000000..29400e587d --- /dev/null +++ b/copyright.txt @@ -0,0 +1,17 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + diff --git a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java index a7ee42f75b..2c985f5a12 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/Configuration.java @@ -18,6 +18,7 @@ package org.apache.fluss.config; import org.apache.fluss.annotation.PublicStable; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.utils.CollectionUtils; import org.slf4j.Logger; @@ -643,7 +644,8 @@ void setValueInternal(String key, T value) { } } - private Optional getRawValue(String key) { + @VisibleForTesting + public Optional getRawValue(String key) { if (key == null) { throw new NullPointerException("Key must not be null."); } diff --git a/fluss-spark/fluss-spark-3.4/pom.xml b/fluss-spark/fluss-spark-3.4/pom.xml new file mode 100644 index 0000000000..a5d10f39a4 --- /dev/null +++ b/fluss-spark/fluss-spark-3.4/pom.xml @@ -0,0 +1,129 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.9-SNAPSHOT + + + fluss-spark-3.4_${scala.binary.version} + Fluss : Engine Spark : 3.4 + + + 3.4.3 + + + + + org.apache.fluss + fluss-spark-common_${scala.binary.version} + ${project.version} + + + + org.apache.fluss + fluss-spark-ut_${scala.binary.version} + ${project.version} + tests + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M5 + + + + integration-tests + integration-test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + 1 + + + + + default-test + test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-spark-common + org.apache.fluss:fluss-client + + + + + + + + + + diff --git a/fluss-spark/fluss-spark-3.5/pom.xml b/fluss-spark/fluss-spark-3.5/pom.xml new file mode 100644 index 0000000000..1f55f0e129 --- /dev/null +++ b/fluss-spark/fluss-spark-3.5/pom.xml @@ -0,0 +1,129 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.9-SNAPSHOT + + + fluss-spark-3.5_${scala.binary.version} + Fluss : Engine Spark : 3.5 + + + 3.5.7 + + + + + org.apache.fluss + fluss-spark-common_${scala.binary.version} + ${project.version} + + + + org.apache.fluss + fluss-spark-ut_${scala.binary.version} + ${project.version} + tests + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + ${skip.on.java8} + + ${skip.on.java8} + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.0.0-M5 + + + + integration-tests + integration-test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + 1 + + + + + default-test + test + false + + test + + + ${skip.on.java8} + + **/*ITCase.* + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-spark-common + org.apache.fluss:fluss-client + + + + + + + + + + diff --git a/fluss-spark/fluss-spark-common/pom.xml b/fluss-spark/fluss-spark-common/pom.xml new file mode 100644 index 0000000000..e285b8bbc2 --- /dev/null +++ b/fluss-spark/fluss-spark-common/pom.xml @@ -0,0 +1,71 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.9-SNAPSHOT + + + fluss-spark-common_${scala.binary.version} + Fluss : Engine Spark : Common + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + org.apache.fluss:fluss-spark-common + org.apache.fluss:fluss-client + + + + + + + + + diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala new file mode 100644 index 0000000000..b620d27001 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.exception.{DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin} + +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util +import java.util.concurrent.ExecutionException + +import scala.collection.JavaConverters._ + +class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin { + + private var catalogName: String = "fluss" + + override def listTables(namespace: Array[String]): Array[Identifier] = { + doNamespaceOperator(namespace) { + admin + .listTables(namespace.head) + .get() + .asScala + .map(table => Identifier.of(namespace, table)) + .toArray + } + } + + override def loadTable(ident: Identifier): Table = { + try { + SparkTable(admin.getTableInfo(toTablePath(ident)).get()) + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[TableNotExistException] => + throw new NoSuchTableException(ident) + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + try { + val tableDescriptor = SparkConversions.toFlussTable(schema, partitions, properties) + admin.createTable(toTablePath(ident), tableDescriptor, false).get() + loadTable(ident) + } catch { + case e: ExecutionException => + if (e.getCause.isInstanceOf[DatabaseNotExistException]) { + throw new NoSuchNamespaceException(ident.namespace()) + } else if (e.getCause.isInstanceOf[TableAlreadyExistException]) { + throw new TableAlreadyExistsException(ident) + } else { + throw new RuntimeException(e) + } + } + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + throw new UnsupportedOperationException("Altering table is not supported") + } + + override def dropTable(ident: Identifier): Boolean = { + try { + admin.dropTable(toTablePath(ident), false).get() + true + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[TableNotExistException] => + throw new NoSuchTableException(ident) + } + } + + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { + throw new UnsupportedOperationException("Renaming table is not supported") + } + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + catalogName = name; + initFlussClient(options) + } + + override def name(): String = catalogName + + private def toTablePath(ident: Identifier): TablePath = { + assert(ident.namespace().length == 1, "Only single namespace is supported") + TablePath.of(ident.namespace().head, ident.name) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala new file mode 100644 index 0000000000..71b1689850 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConnectorOptions.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.config.{ConfigBuilder, ConfigOption} + +object SparkConnectorOptions { + + val PRIMARY_KEY: ConfigOption[String] = + ConfigBuilder + .key("primary.key") + .stringType() + .noDefaultValue() + .withDescription("The primary keys of a Fluss table.") + + val BUCKET_KEY: ConfigOption[String] = + ConfigBuilder + .key("bucket.key") + .stringType() + .noDefaultValue() + .withDescription( + """ + |Specific the distribution policy of the Fluss table. + |Data will be distributed to each bucket according to the hash value of bucket-key (It must be a subset of the primary keys excluding partition keys of the primary key table). + |If you specify multiple fields, delimiter is ','. + |If the table has a primary key and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). + |If the table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly. + |""".stripMargin) + + val BUCKET_NUMBER: ConfigOption[Integer] = + ConfigBuilder + .key("bucket.num") + .intType() + .noDefaultValue() + .withDescription("The number of buckets of a Fluss table.") + + val COMMENT: ConfigOption[String] = + ConfigBuilder + .key("comment") + .stringType() + .noDefaultValue() + .withDescription("The comment of a Fluss table.") + + val SPARK_TABLE_OPTIONS: Seq[String] = + Seq(PRIMARY_KEY, BUCKET_KEY, BUCKET_NUMBER, COMMENT).map(_.key) +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala new file mode 100644 index 0000000000..f85e33f81e --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkConversions.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.config.FlussConfigUtils +import org.apache.fluss.metadata.{Schema, TableDescriptor} +import org.apache.fluss.spark.SparkConnectorOptions._ +import org.apache.fluss.spark.types.{FlussToSparkTypeVisitor, SparkToFlussTypeVisitor} +import org.apache.fluss.types.RowType + +import org.apache.spark.sql.FlussIdentityTransform +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +object SparkConversions { + + def toFlussDataType(schema: StructType): RowType = + SparkToFlussTypeVisitor.visit(schema).asInstanceOf[RowType] + + def toSparkDataType(rowType: RowType): StructType = + FlussToSparkTypeVisitor.visit(rowType).asInstanceOf[StructType] + + def toFlussTable( + sparkSchema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): TableDescriptor = { + val caseInsensitiveProps = CaseInsensitiveMap(properties.asScala.toMap) + + val tableDescriptorBuilder = TableDescriptor.builder() + val schemaBuilder = Schema.newBuilder().fromRowType(toFlussDataType(sparkSchema)) + + val partitionKey = toPartitionKeys(partitions) + tableDescriptorBuilder.partitionedBy(partitionKey: _*) + + val primaryKeys = if (caseInsensitiveProps.contains(PRIMARY_KEY.key)) { + val pks = caseInsensitiveProps.get(PRIMARY_KEY.key).get.split(",") + schemaBuilder.primaryKey(pks: _*) + pks + } else { + Array.empty[String] + } + + if (caseInsensitiveProps.contains(BUCKET_NUMBER.key)) { + val bucketNum = caseInsensitiveProps.get(BUCKET_NUMBER.key).get.toInt + val bucketKeys = if (caseInsensitiveProps.contains(BUCKET_KEY.key)) { + caseInsensitiveProps.get(BUCKET_KEY.key).get.split(",") + } else { + primaryKeys.filterNot(partitionKey.contains) + } + tableDescriptorBuilder.distributedBy(bucketNum, bucketKeys: _*) + } + + if (caseInsensitiveProps.contains(COMMENT.key)) { + tableDescriptorBuilder.comment(caseInsensitiveProps.get(COMMENT.key).get) + } + + val (tableProps, customProps) = + caseInsensitiveProps.filterNot(SPARK_TABLE_OPTIONS.contains).partition { + case (key, _) => key.startsWith(FlussConfigUtils.TABLE_PREFIX) + } + + tableDescriptorBuilder + .schema(schemaBuilder.build()) + .properties(tableProps.asJava) + .customProperties(customProps.asJava) + .build() + } + + def toPartitionKeys(partitions: Array[Transform]): Array[String] = { + val partitionKeys = mutable.ArrayBuffer.empty[String] + partitions.foreach { + case FlussIdentityTransform(parts) if parts.length == 1 => + partitionKeys += parts.head + case p => + throw new UnsupportedOperationException("Unsupported partition transform: " + p) + } + partitionKeys.toArray + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala new file mode 100644 index 0000000000..1416ab415a --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.metadata.TableInfo +import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} + +case class SparkTable(table: TableInfo) + extends AbstractSparkTable(table) + with SupportsFlussPartitionManagement {} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala new file mode 100644 index 0000000000..8694c7f59b --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.metadata.TableInfo +import org.apache.fluss.spark.SparkConversions + +import org.apache.spark.sql.connector.catalog.{Table, TableCapability} +import org.apache.spark.sql.types.StructType + +import java.util + +import scala.collection.JavaConverters._ + +abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table { + + protected lazy val _schema: StructType = + SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType) + + protected lazy val _partitionSchema = new StructType( + _schema.fields.filter(tableInfo.getPartitionKeys.contains)) + + override def name(): String = tableInfo.toString + + override def schema(): StructType = _schema + + override def capabilities(): util.Set[TableCapability] = Set.empty[TableCapability].asJava +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala new file mode 100644 index 0000000000..ddc83a54b4 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussNamespaces.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.exception.DatabaseNotExistException +import org.apache.fluss.metadata.DatabaseDescriptor +import org.apache.fluss.utils.Preconditions + +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException +import org.apache.spark.sql.connector.catalog.{NamespaceChange, SupportsNamespaces} + +import java.util +import java.util.concurrent.ExecutionException + +import scala.collection.JavaConverters._ + +trait SupportsFlussNamespaces extends SupportsNamespaces with WithFlussAdmin { + + override def listNamespaces(): Array[Array[String]] = { + admin.listDatabases.get.asScala.map(Array(_)).toArray + } + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + if (namespace.isEmpty) { + return listNamespaces() + } + + doNamespaceOperator(namespace) { + val dbname = admin.getDatabaseInfo(namespace.head).get().getDatabaseName + Array(Array(dbname)) + } + } + + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = { + doNamespaceOperator(namespace) { + admin.getDatabaseInfo(namespace.head).get().getDatabaseDescriptor.getCustomProperties + } + } + + override def createNamespace( + namespace: Array[String], + metadata: util.Map[String, String]): Unit = { + doNamespaceOperator(namespace) { + val databaseDescriptor = DatabaseDescriptor + .builder() + .customProperties(metadata) + .build(); + admin.createDatabase(namespace.head, databaseDescriptor, false).get() + } + } + + override def alterNamespace( + namespace: Array[String], + namespaceChanges: NamespaceChange*): Unit = { + new UnsupportedOperationException("Altering namespace is not supported now.") + } + + override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { + doNamespaceOperator(namespace) { + admin.dropDatabase(namespace.head, false, cascade).get() + true + } + } + + protected def doNamespaceOperator[T](namespace: Array[String])(f: => T): T = { + checkNamespace(namespace) + try { + f + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[DatabaseNotExistException] => + throw new NoSuchNamespaceException(namespace) + } + } + + private def checkNamespace(namespace: Array[String]): Unit = { + Preconditions.checkArgument( + namespace.length == 1, + "Only single namespace is supported in Fluss.") + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala new file mode 100644 index 0000000000..ed888e5fe9 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement +import org.apache.spark.sql.types.StructType + +import java.util + +trait SupportsFlussPartitionManagement extends AbstractSparkTable with SupportsPartitionManagement { + + override def partitionSchema(): StructType = _partitionSchema + + override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException("Creating partition is not supported") + } + + override def dropPartition(ident: InternalRow): Boolean = { + throw new UnsupportedOperationException("Dropping partition is not supported") + } + + override def replacePartitionMetadata( + ident: InternalRow, + properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException("Replacing partition metadata is not supported") + } + + override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + throw new UnsupportedOperationException("Loading partition is not supported") + } + + override def listPartitionIdentifiers( + names: Array[String], + ident: InternalRow): Array[InternalRow] = { + throw new UnsupportedOperationException("Listing partition is not supported") + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala new file mode 100644 index 0000000000..fa62b91a45 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.client.{Connection, ConnectionFactory} +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.config.{Configuration => FlussConfiguration} +import org.apache.fluss.utils.{IOUtils, Preconditions} + +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util + +import scala.collection.JavaConverters._ + +trait WithFlussAdmin extends AutoCloseable { + + private var _connection: Connection = _ + private var _admin: Admin = _ + + // TODO: init lake spark catalog + protected var lakeCatalog: CatalogPlugin = _ + + protected def initFlussClient(options: CaseInsensitiveStringMap): Unit = { + val flussConfigs = new util.HashMap[String, String]() + options.entrySet().asScala.foreach { + entry: util.Map.Entry[String, String] => flussConfigs.put(entry.getKey, entry.getValue) + } + + _connection = ConnectionFactory.createConnection(FlussConfiguration.fromMap(flussConfigs)) + _admin = _connection.getAdmin + } + + protected def admin: Admin = { + Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.") + _admin + } + + override def close(): Unit = { + IOUtils.closeQuietly(_admin, "fluss-admin") + IOUtils.closeQuietly(_connection, "fluss-connection"); + } + +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala new file mode 100644 index 0000000000..f9df1e9c06 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types._ + +import org.apache.spark.sql.types.{DataType => SparkDataType, DataTypes => SparkDataTypes} + +import scala.collection.JavaConverters._ + +object FlussToSparkTypeVisitor extends DataTypeVisitor[SparkDataType] { + + override def visit(charType: CharType): SparkDataType = { + org.apache.spark.sql.types.CharType(charType.getLength) + } + + override def visit(stringType: StringType): SparkDataType = { + SparkDataTypes.StringType + } + + override def visit(booleanType: BooleanType): SparkDataType = { + SparkDataTypes.BooleanType + } + + override def visit(binaryType: BinaryType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(bytesType: BytesType): SparkDataType = { + SparkDataTypes.BinaryType + } + + override def visit(decimalType: DecimalType): SparkDataType = { + org.apache.spark.sql.types.DecimalType(decimalType.getPrecision, decimalType.getScale) + } + + override def visit(tinyIntType: TinyIntType): SparkDataType = { + SparkDataTypes.ByteType + } + + override def visit(smallIntType: SmallIntType): SparkDataType = { + SparkDataTypes.ShortType + } + + override def visit(intType: IntType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(bigIntType: BigIntType): SparkDataType = { + SparkDataTypes.LongType + } + + override def visit(floatType: FloatType): SparkDataType = { + SparkDataTypes.FloatType + } + + override def visit(doubleType: DoubleType): SparkDataType = { + SparkDataTypes.DoubleType + } + + override def visit(dateType: DateType): SparkDataType = { + SparkDataTypes.DateType + } + + override def visit(timeType: TimeType): SparkDataType = { + SparkDataTypes.IntegerType + } + + override def visit(timestampType: TimestampType): SparkDataType = { + SparkDataTypes.TimestampNTZType + } + + override def visit(localZonedTimestampType: LocalZonedTimestampType): SparkDataType = { + SparkDataTypes.TimestampType + } + + override def visit(arrayType: ArrayType): SparkDataType = { + SparkDataTypes.createArrayType( + arrayType.getElementType.accept(this), + arrayType.getElementType.isNullable) + } + + override def visit(mapType: MapType): SparkDataType = { + SparkDataTypes.createMapType( + mapType.getKeyType.accept(this), + mapType.getValueType.accept(this), + mapType.getValueType.isNullable + ) + } + + override def visit(rowType: RowType): SparkDataType = { + val sparkFields = rowType.getFields.asScala.map { + flussField => + SparkDataTypes.createStructField( + flussField.getName, + flussField.getType.accept(this), + flussField.getType.isNullable) + } + SparkDataTypes.createStructType(sparkFields.toArray) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala new file mode 100644 index 0000000000..255e3ff6be --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.types + +import org.apache.fluss.types.{ArrayType => FlussArrayType, DataField => FlussDataField, DataType => FlussDataType, MapType => FlussMapType, _} + +import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, MapType => SparkMapType, StructType, UserDefinedType} + +import scala.collection.JavaConverters._ + +object SparkToFlussTypeVisitor { + + def visit(dataType: SparkDataType): FlussDataType = { + dataType match { + case st: StructType => + visitStructType(st) + case mt: SparkMapType => + visitMapType(mt) + case at: SparkArrayType => + visitArrayType(at) + case _: UserDefinedType[_] => + throw new UnsupportedOperationException("User-defined type is not supported"); + case t => + visitPrimitiveType(t) + } + } + + private def visitStructType(st: StructType): RowType = { + val flussDataFields = st.fields.map { + field => + val flussDataType = visit(field.dataType) + new FlussDataField(field.name, flussDataType, field.getComment().orNull) + } + new RowType(flussDataFields.toList.asJava) + } + + private def visitMapType(mt: SparkMapType): FlussMapType = { + new FlussMapType(visit(mt.keyType), visit(mt.valueType).copy(mt.valueContainsNull)) + } + + private def visitArrayType(at: SparkArrayType): FlussArrayType = { + new FlussArrayType(visit(at.elementType).copy(at.containsNull)) + } + + private def visitPrimitiveType(t: SparkDataType): FlussDataType = { + t match { + case _: org.apache.spark.sql.types.BooleanType => + new BooleanType() + case _: org.apache.spark.sql.types.ByteType => + new TinyIntType() + case _: org.apache.spark.sql.types.ShortType => + new SmallIntType() + case _: org.apache.spark.sql.types.IntegerType => + new IntType() + case _: org.apache.spark.sql.types.LongType => + new BigIntType() + case _: org.apache.spark.sql.types.FloatType => + new FloatType() + case _: org.apache.spark.sql.types.DoubleType => + new DoubleType() + case dt: org.apache.spark.sql.types.DecimalType => + new DecimalType(dt.precision, dt.scale) + case _: org.apache.spark.sql.types.BinaryType => + new BinaryType(BinaryType.MAX_LENGTH) + case _: org.apache.spark.sql.types.VarcharType => + new StringType() + case ct: org.apache.spark.sql.types.CharType => + new CharType(ct.length) + case _: org.apache.spark.sql.types.StringType => + new StringType() + case _: org.apache.spark.sql.types.DateType => + new DateType() + case _: org.apache.spark.sql.types.TimestampType => + // spark only support 6 digits of precision + new LocalZonedTimestampType(6) + case _: org.apache.spark.sql.types.TimestampNTZType => + // spark only support 6 digits of precision + new TimestampType(6) + case _ => + throw new UnsupportedOperationException(s"Data type(${t.catalogString}) is not supported"); + } + } + +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala new file mode 100644 index 0000000000..9d9b4a5c31 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/FlussIdentityTransform.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} + +object FlussIdentityTransform { + def unapply(transform: Transform): Option[Seq[String]] = { + transform match { + case IdentityTransform(FieldReference(parts)) => + Some(parts) + case _ => + None + } + } +} diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml new file mode 100644 index 0000000000..7592c6ffd2 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -0,0 +1,125 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-spark + 0.9-SNAPSHOT + + + fluss-spark-ut_${scala.binary.version} + Fluss : Engine Spark : UT + + + + org.apache.fluss + fluss-spark-common_${scala.binary.version} + ${project.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + test + + + + org.apache.fluss + fluss-server + ${project.version} + tests + test + + + + org.apache.fluss + fluss-client + ${project.version} + test + + + + org.apache.fluss + fluss-test-utils + + + + + org.apache.curator + curator-test + ${curator.version} + test + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + tests + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + ${spark.version} + tests + test + + + + junit + junit + 3.8.1 + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + + diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala new file mode 100644 index 0000000000..a9022fff4a --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussCatalogTest.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.metadata.{DatabaseDescriptor, Schema, TableDescriptor, TablePath} +import org.apache.fluss.types.{DataTypes, RowType} + +import org.apache.spark.sql.Row +import org.assertj.core.api.Assertions.{assertThat, assertThatList} + +import scala.collection.JavaConverters._ + +class FlussCatalogTest extends FlussSparkTestBase { + + test("Catalog: namespaces") { + // Always a default database 'fluss'. + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + + sql("CREATE DATABASE testdb COMMENT 'created by spark'") + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("testdb") :: Nil) + + checkAnswer( + sql("DESC DATABASE testdb").filter("info_name != 'Owner'"), + Row("Catalog Name", "fluss_catalog") :: Row("Namespace Name", "testdb") :: Row( + "Comment", + "created by spark") :: Nil + ) + + sql("DROP DATABASE testdb") + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + } + + test("Catalog: basic table") { + sql(s"CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string) COMMENT 'my test table'") + checkAnswer(sql("SHOW TABLES"), Row(DEFAULT_DATABASE, "test_tbl", false) :: Nil) + checkAnswer(sql("DESC test_tbl"), Row("id", "int", null) :: Row("name", "string", null) :: Nil) + + val testTable = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() + assertThat(testTable.getTablePath.getTableName).isEqualTo("test_tbl") + assertThat(testTable.getComment.orElse(null)).isEqualTo("my test table") + assertThat(testTable.getRowType).isEqualTo( + RowType.builder().field("id", DataTypes.INT()).field("name", DataTypes.STRING()).build()) + + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.test_pt_tbl (id int, name string, pt string) + |PARTITIONED BY (pt) + |TBLPROPERTIES("key" = "value") + |""".stripMargin) + + val testPartitionedTable = + admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_pt_tbl")).get() + assertThat(testPartitionedTable.getRowType).isEqualTo( + RowType + .builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("pt", DataTypes.STRING()) + .build()) + assertThat(testPartitionedTable.getPartitionKeys.get(0)).isEqualTo("pt") + assertThat(testPartitionedTable.getCustomProperties.containsKey("key")).isEqualTo(true) + assertThat( + testPartitionedTable.getCustomProperties.getRawValue("key").get().asInstanceOf[String]) + .isEqualTo("value") + + sql("DROP TABLE test_tbl") + sql("DROP TABLE test_pt_tbl") + checkAnswer(sql("SHOW TABLES"), Nil) + } + + test("Catalog: primary-key table") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.test_tbl (id int, name string, pt string) + |PARTITIONED BY (pt) + |TBLPROPERTIES("primary.key" = "id,pt") + |""".stripMargin) + + val tbl1 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl")).get() + assertThatList(tbl1.getPrimaryKeys).hasSameElementsAs(Seq("id", "pt").toList.asJava) + assertThat(tbl1.getNumBuckets).isEqualTo(1) + assertThat(tbl1.getBucketKeys.contains("id")).isEqualTo(true) + assertThat(tbl1.getPartitionKeys.contains("pt")).isEqualTo(true) + + sql( + s""" + |CREATE TABLE $DEFAULT_DATABASE.test_tbl2 (pk1 int, pk2 long, name string, pt1 string, pt2 string) + |PARTITIONED BY (pt1, pt2) + |TBLPROPERTIES("primary.key" = "pk1,pk2,pt1,pt2", "bucket.num" = 3, "bucket.key" = "pk1") + |""".stripMargin) + + val tbl2 = admin.getTableInfo(TablePath.of(DEFAULT_DATABASE, "test_tbl2")).get() + assertThatList(tbl2.getPrimaryKeys).hasSameElementsAs( + Seq("pk1", "pk2", "pt1", "pt2").toList.asJava) + assertThat(tbl2.getNumBuckets).isEqualTo(3) + assertThatList(tbl2.getBucketKeys).hasSameElementsAs(Seq("pk1").toList.asJava) + } + + test("Catalog: check namespace and table created by admin") { + val dbDesc = DatabaseDescriptor.builder().comment("created by admin").build() + admin.createDatabase("db_by_admin", dbDesc, true).get() + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Row("db_by_admin") :: Nil) + + sql("USE db_by_admin") + val tablePath = TablePath.of("db_by_admin", "tbl_by_admin") + val rt = RowType + .builder() + .field("id", DataTypes.INT()) + .field("name", DataTypes.STRING()) + .field("pt", DataTypes.STRING()) + .build() + val tableDesc = TableDescriptor + .builder() + .schema(Schema.newBuilder().fromRowType(rt).build()) + .partitionedBy("pt") + .build() + admin.createTable(tablePath, tableDesc, false).get() + checkAnswer(sql("SHOW TABLES"), Row("db_by_admin", "tbl_by_admin", false) :: Nil) + checkAnswer( + sql("DESC tbl_by_admin"), + Row("id", "int", null) :: Row("name", "string", null) :: Row("pt", "string", null) :: Nil) + + admin.dropTable(tablePath, true).get() + checkAnswer(sql("SHOW TABLES"), Nil) + + admin.dropDatabase("db_by_admin", true, true).get() + checkAnswer(sql("SHOW DATABASES"), Row(DEFAULT_DATABASE) :: Nil) + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala new file mode 100644 index 0000000000..08b6d4600c --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.client.{Connection, ConnectionFactory} +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.server.testutils.FlussClusterExtension + +import org.apache.spark.SparkConf +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession +import org.junit.jupiter.api.extension.RegisterExtension +import org.scalactic.source.Position +import org.scalatest.Tag + +import java.time.Duration + +class FlussSparkTestBase extends QueryTest with SharedSparkSession { + + import FlussSparkTestBase._ + + protected val DEFAULT_DATABASE = "fluss"; + + protected var conn: Connection = _ + protected var admin: Admin = _ + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.catalog.fluss_catalog", classOf[SparkCatalog].getName) + .set("spark.sql.catalog.fluss_catalog.bootstrap.servers", bootstrapServers) + .set("spark.sql.defaultCatalog", "fluss_catalog") + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + conn = ConnectionFactory.createConnection(clientConf) + admin = conn.getAdmin + + sql(s"USE $DEFAULT_DATABASE") + } + + override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + println(testName) + super.test(testName, testTags: _*)(testFun)(pos) + } + +} + +@RegisterExtension +object FlussSparkTestBase { + val FLUSS_CLUSTER_EXTENSION: FlussClusterExtension = + FlussClusterExtension.builder + .setClusterConf( + new Configuration() + .set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + ) + .setNumOfTabletServers(3) + .build + + FLUSS_CLUSTER_EXTENSION.start() + + val clientConf: Configuration = FLUSS_CLUSTER_EXTENSION.getClientConfig + val bootstrapServers: String = FLUSS_CLUSTER_EXTENSION.getBootstrapServers +} diff --git a/fluss-spark/pom.xml b/fluss-spark/pom.xml new file mode 100644 index 0000000000..7773528096 --- /dev/null +++ b/fluss-spark/pom.xml @@ -0,0 +1,337 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss + 0.9-SNAPSHOT + + + fluss-spark + Fluss : Engine Spark : + pom + + + fluss-spark-common + fluss-spark-ut + fluss-spark-3.5 + fluss-spark-3.4 + + + + + org.apache.fluss + fluss-client + ${project.version} + + + + org.apache.fluss + fluss-common + ${project.version} + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scalatest + scalatest_${scala.binary.version} + 3.1.0 + test + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + + + org.apache.spark + spark-core_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + com.google.protobuf + protobuf-java + + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.spark + spark-hive_${scala.binary.version} + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + org.apache.spark + spark-sql_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.spark + spark-core_${scala.binary.version} + tests + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + + + + + + + + + org.scalastyle + scalastyle-maven-plugin + 1.0.0 + + + + check + + + + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${project.basedir}/target/scalastyle-output.xml + UTF-8 + UTF-8 + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + ${target.java.version} + ${target.java.version} + false + + -Xpkginfo:always + -Xlint:deprecation + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + + scala-compile-first + process-resources + + add-source + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.version} + false + + -nobootcp + -target:jvm-${target.java.version} + + + + + + org.scalatest + scalatest-maven-plugin + 2.1.0 + + ${project.build.directory}/surefire-reports + . + -ea -Xmx4g -Xss4m -XX:MaxMetaspaceSize=2g -XX:ReservedCodeCacheSize=128m ${extraJavaTestArgs} -Dio.netty.tryReflectionSetAccessible=true + FlussTestSuite.txt + once + true + + + + test + + test + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + + org.scalatest + scalatest-maven-plugin + + + + + \ No newline at end of file diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 7f0a4cc9b1..16e942860e 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -126,6 +126,7 @@ fluss-test-coverage/** fluss-test-utils/** fluss-flink/** + fluss-spark/** fluss-lake/** @@ -179,6 +180,44 @@ + + test-spark3 + + + + + org.apache.maven.plugins + maven-resources-plugin + + + copy-class-files + generate-resources + + copy-resources + + + false + + + ${project.basedir}/../ + + fluss-spark/**/target/classes/** + + + fluss-test-coverage/** + fluss-test-utils/** + + + + ${project.build.directory}/classes + + + + + + + + test-lake @@ -288,6 +327,8 @@ + org.apache.fluss.spark.* + org.apache.spark.sql.* org.apache.fluss.protogen.* org.apache.fluss.memory.* org.apache.fluss.utils.* diff --git a/pom.xml b/pom.xml index 247040141f..99558b790a 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ fluss-test-coverage fluss-server fluss-flink + fluss-spark fluss-protogen fluss-jmh fluss-lake @@ -91,6 +92,13 @@ 1.3.1 1.10.0 + + 2.12.18 + 2.13.16 + 2.12 + ${scala212.version} + 3.5.7 + 3.4.0 6.20.3-ververica-2.0 1.7.36 @@ -1007,6 +1015,17 @@ + + + 3.10.2 + ${maven.multiModuleProjectDirectory}/.scalafmt.conf + + + + ${maven.multiModuleProjectDirectory}/copyright.txt + ${spotless.delimiter} + + From 30373de2f58d3dacc93424ad40de0d6968851533 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Fri, 26 Dec 2025 19:37:08 +0800 Subject: [PATCH 2/2] mapping Spark BinaryType to Fluss BytesType --- .../apache/fluss/spark/types/SparkToFlussTypeVisitor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala index 255e3ff6be..ee7e906336 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/SparkToFlussTypeVisitor.scala @@ -75,8 +75,8 @@ object SparkToFlussTypeVisitor { new DoubleType() case dt: org.apache.spark.sql.types.DecimalType => new DecimalType(dt.precision, dt.scale) - case _: org.apache.spark.sql.types.BinaryType => - new BinaryType(BinaryType.MAX_LENGTH) + case x: org.apache.spark.sql.types.BinaryType => + new BytesType() case _: org.apache.spark.sql.types.VarcharType => new StringType() case ct: org.apache.spark.sql.types.CharType =>