From f3663c2cca034ac5504eb5ede06c86d9b34b0699 Mon Sep 17 00:00:00 2001 From: gnehil Date: Thu, 26 Dec 2024 19:09:26 +0800 Subject: [PATCH 1/2] support v2 push down --- .../spark-doris-connector-it/pom.xml | 2 +- .../doris/spark/sql/DorisReaderITCase.scala | 26 ++++++ .../doris/spark/read/AbstractDorisScan.scala | 53 +++++++++++++ .../apache/doris/spark/read/DorisScan.scala | 28 +------ .../spark/read/DorisScanBuilderBase.scala | 17 +--- .../doris/spark/read/DorisScanBuilder.scala | 23 +++++- .../doris/spark/read/DorisScanBuilder.scala | 24 +++++- .../spark-doris-connector-spark-3.3/pom.xml | 5 ++ .../doris/spark/read/DorisScanBuilder.scala | 26 +++++- .../apache/doris/spark/read/DorisScanV2.scala | 32 ++++++++ .../read/expression/V2ExpressionBuilder.scala | 79 +++++++++++++++++++ .../expression/V2ExpressionBuilderTest.scala | 49 ++++++++++++ .../spark-doris-connector-spark-3.4/pom.xml | 5 ++ .../doris/spark/read/DorisScanBuilder.scala | 26 +++++- .../apache/doris/spark/read/DorisScanV2.scala | 32 ++++++++ .../read/expression/V2ExpressionBuilder.scala | 79 +++++++++++++++++++ .../expression/V2ExpressionBuilderTest.scala | 49 ++++++++++++ .../spark-doris-connector-spark-3.5/pom.xml | 5 ++ .../doris/spark/read/DorisScanBuilder.scala | 26 +++++- .../apache/doris/spark/read/DorisScanV2.scala | 32 ++++++++ .../read/expression/V2ExpressionBuilder.scala | 79 +++++++++++++++++++ .../expression/V2ExpressionBuilderTest.scala | 49 ++++++++++++ .../org/apache/doris/common/DorisScanV2.scala | 0 .../expression/V2ExpressionBuilder.scala | 0 24 files changed, 694 insertions(+), 52 deletions(-) create mode 100644 spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala create mode 100644 spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala create mode 100644 spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala create mode 100644 spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala diff --git a/spark-doris-connector/spark-doris-connector-it/pom.xml b/spark-doris-connector/spark-doris-connector-it/pom.xml index 9493c031..1fcae06d 100644 --- a/spark-doris-connector/spark-doris-connector-it/pom.xml +++ b/spark-doris-connector/spark-doris-connector-it/pom.xml @@ -97,7 +97,7 @@ org.apache.doris - spark-doris-connector-spark-3.1 + spark-doris-connector-spark-3.3 ${project.version} test diff --git a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala index a147a7d8..2d7930af 100644 --- a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala +++ b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala @@ -141,5 +141,31 @@ class DorisReaderITCase extends DorisTestBase { } else false } + @Test + @throws[Exception] + def testSQLSourceWithCondition(): Unit = { + initializeTable(TABLE_READ_TBL) + val session = SparkSession.builder().master("local[*]").getOrCreate() + session.sql( + s""" + |CREATE TEMPORARY VIEW test_source + |USING doris + |OPTIONS( + | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}", + | "fenodes"="${DorisTestBase.getFenodes}", + | "user"="${DorisTestBase.USERNAME}", + | "password"="${DorisTestBase.PASSWORD}" + |) + |""".stripMargin) + + val result = session.sql( + """ + |select name,age from test_source where age = 18 + |""".stripMargin).collect().toList.toString() + session.stop() + + assert("List([doris,18])".equals(result)) + } + } diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala new file mode 100644 index 00000000..f1666ada --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read + +import org.apache.doris.spark.client.entity.{Backend, DorisReaderPartition} +import org.apache.doris.spark.client.read.ReaderPartitionGenerator +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.types.StructType + +import scala.language.implicitConversions + +abstract class AbstractDorisScan(config: DorisConfig, schema: StructType) extends Scan with Batch with Logging { + + private val scanMode = ScanMode.valueOf(config.getValue(DorisOptions.READ_MODE).toUpperCase) + + override def readSchema(): StructType = schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters()).map(toInputPartition) + } + + + override def createReaderFactory(): PartitionReaderFactory = { + new DorisPartitionReaderFactory(readSchema(), scanMode, config) + } + + private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition = + DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, rp.getFilters) + + protected def compiledFilters(): Array[String] + +} + +case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String]) extends InputPartition diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScan.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScan.scala index b7157661..d52a82ad 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScan.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScan.scala @@ -17,39 +17,17 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.client.entity.{Backend, DorisReaderPartition} -import org.apache.doris.spark.client.read.ReaderPartitionGenerator import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.doris.spark.util.DorisDialects import org.apache.spark.internal.Logging -import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import scala.language.implicitConversions -class DorisScan(config: DorisConfig, schema: StructType, filters: Array[Filter]) extends Scan with Batch with Logging { - - private val scanMode = ScanMode.valueOf(config.getValue(DorisOptions.READ_MODE).toUpperCase) - - override def readSchema(): StructType = schema - - override def toBatch: Batch = this - - override def planInputPartitions(): Array[InputPartition] = { +class DorisScan(config: DorisConfig, schema: StructType, filters: Array[Filter]) extends AbstractDorisScan(config, schema) with Logging { + override protected def compiledFilters(): Array[String] = { val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) - val compiledFilters = filters.map(DorisDialects.compileFilter(_, inValueLengthLimit)).filter(_.isDefined).map(_.get) - ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters).map(toInputPartition) - } - - - override def createReaderFactory(): PartitionReaderFactory = { - new DorisPartitionReaderFactory(readSchema(), scanMode, config) + filters.map(DorisDialects.compileFilter(_, inValueLengthLimit)).filter(_.isDefined).map(_.get) } - - private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition = - DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, rp.getFilters) - } - -case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String]) extends InputPartition diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala index a6a97dc9..cec9890b 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisScanBuilderBase.scala @@ -24,24 +24,9 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType protected[spark] abstract class DorisScanBuilderBase(config: DorisConfig, schema: StructType) extends ScanBuilder - with SupportsPushDownFilters with SupportsPushDownRequiredColumns { - private var readSchema: StructType = schema - - private var pushDownPredicates: Array[Filter] = Array[Filter]() - - private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) - - override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates) - - override def pushFilters(filters: Array[Filter]): Array[Filter] = { - val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined) - this.pushDownPredicates = pushed - unsupported - } - - override def pushedFilters(): Array[Filter] = pushDownPredicates + protected var readSchema: StructType = schema override def pruneColumns(requiredSchema: StructType): Unit = { readSchema = StructType(requiredSchema.fields.filter(schema.contains(_))) diff --git a/spark-doris-connector/spark-doris-connector-spark-3.1/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.1/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala index 9e199af5..5c8e7162 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.1/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.1/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala @@ -17,7 +17,26 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.util.DorisDialects +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {} +class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) with SupportsPushDownFilters { + + private var pushDownPredicates: Array[Filter] = Array[Filter]() + + private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) + + override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates) + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined) + this.pushDownPredicates = pushed + unsupported + } + + override def pushedFilters(): Array[Filter] = pushDownPredicates + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.2/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.2/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala index 9e199af5..68241dfc 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.2/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.2/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala @@ -17,7 +17,27 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.util.DorisDialects +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {} +class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) + with SupportsPushDownFilters { + + private var pushDownPredicates: Array[Filter] = Array[Filter]() + + private val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) + + override def build(): Scan = new DorisScan(config, readSchema, pushDownPredicates) + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (pushed, unsupported) = filters.partition(DorisDialects.compileFilter(_, inValueLengthLimit).isDefined) + this.pushDownPredicates = pushed + unsupported + } + + override def pushedFilters(): Array[Filter] = pushDownPredicates + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/pom.xml b/spark-doris-connector/spark-doris-connector-spark-3.3/pom.xml index ecc71ed8..7a046a2f 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.3/pom.xml +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/pom.xml @@ -48,6 +48,11 @@ org.apache.spark spark-sql_${scala.major.version} + + org.junit.jupiter + junit-jupiter-api + test + \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala index 9e199af5..cc8ddd2a 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala @@ -17,7 +17,29 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownV2Filters} import org.apache.spark.sql.types.StructType -class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {} +class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) + with SupportsPushDownV2Filters { + + private var pushDownPredicates: Array[Predicate] = Array[Predicate]() + + private val expressionBuilder = new V2ExpressionBuilder(config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)) + + override def build(): Scan = new DorisScanV2(config, schema, pushDownPredicates) + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val (pushed, unsupported) = predicates.partition(predicate => { + Option(expressionBuilder.build(predicate)).isDefined + }) + this.pushDownPredicates = pushed + unsupported + } + + override def pushedPredicates(): Array[Predicate] = pushDownPredicates + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala new file mode 100644 index 00000000..634257ae --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read + +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.types.StructType + +class DorisScanV2(config: DorisConfig, schema: StructType, filters: Array[Predicate]) extends AbstractDorisScan(config, schema) with Logging { + override protected def compiledFilters(): Array[String] = { + val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) + val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit) + filters.map(e => Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get) + } +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala new file mode 100644 index 00000000..f13830cb --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read.expression + +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} +import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} + +class V2ExpressionBuilder(inValueLengthLimit: Int) { + + def build(predicate: Expression): String = { + predicate match { + case and: And => s"(${build(and.left())} AND ${build(and.right())})" + case or: Or => s"(${build(or.left())} OR ${build(or.right())})" + case not: Not => + not.child().name() match { + case "IS_NULL" => build(new GeneralScalarExpression("IS_NOT_NULL", not.children()(0).children())) + case "=" => build(new GeneralScalarExpression("!=", not.children()(0).children())) + case _ => s"NOT (${build(not.child())})" + } + case _: AlwaysTrue => "1=1" + case _: AlwaysFalse => "1=0" + case expr: Expression => + expr match { + case literal: Literal[_] => literal.toString + case namedRef: NamedReference => namedRef.toString + case e: GeneralScalarExpression => e.name() match { + case "IN" => + val expressions = e.children() + if (expressions.nonEmpty && expressions.length <= inValueLengthLimit) { + s"""`${build(expressions(0))}` IN (${expressions.slice(1, expressions.length).map(build).mkString(",")})""" + } else null + case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL" + case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL" + case "STARTS_WITH" => visitStartWith(build(e.children()(0)), build(e.children()(1))); + case "ENDS_WITH" => visitEndWith(build(e.children()(0)), build(e.children()(1))); + case "CONTAINS" => visitContains(build(e.children()(0)), build(e.children()(1))); + case "=" => s"`${build(e.children()(0))}` = ${build(e.children()(1))}" + case "!=" | "<>" => s"`${build(e.children()(0))}` != ${build(e.children()(1))}" + case "<" => s"`${build(e.children()(0))}` < ${build(e.children()(1))}" + case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" + case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" + case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case _ => null + } + } + } + } + + def visitStartWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '$value%'" + } + + def visitEndWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value'" + } + + def visitContains(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value%'" + } + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala new file mode 100644 index 00000000..fc29495c --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.3/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala @@ -0,0 +1,49 @@ +package org.apache.doris.spark.read.expression + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.spark.sql.sources._ +import org.junit.jupiter.api.{Assertions, Test} + +class V2ExpressionBuilderTest { + + @Test + def buildTest(): Unit = { + + val builder = new V2ExpressionBuilder(10) + Assertions.assertEquals(builder.build(EqualTo("c0", 1).toV2), "`c0` = 1") + Assertions.assertEquals(builder.build(Not(EqualTo("c1", 2)).toV2), "`c1` != 2") + Assertions.assertEquals(builder.build(GreaterThan("c2", 3.4).toV2), "`c2` > 3.4") + Assertions.assertEquals(builder.build(GreaterThanOrEqual("c3", 5.6).toV2), "`c3` >= 5.6") + Assertions.assertEquals(builder.build(LessThan("c4", 7.8).toV2), "`c4` < 7.8") + Assertions.assertEquals(builder.build(LessThanOrEqual("c5", BigDecimal(9.1011)).toV2), "`c5` <= 9.1011") + Assertions.assertEquals(builder.build(StringStartsWith("c6","a").toV2), "`c6` LIKE 'a%'") + Assertions.assertEquals(builder.build(StringEndsWith("c7", "b").toV2), "`c7` LIKE '%b'") + Assertions.assertEquals(builder.build(StringContains("c8", "c").toV2), "`c8` LIKE '%c%'") + Assertions.assertEquals(builder.build(In("c9", Array(12,13,14)).toV2), "`c9` IN (12,13,14)") + Assertions.assertEquals(builder.build(IsNull("c10").toV2), "`c10` IS NULL") + Assertions.assertEquals(builder.build(Not(IsNull("c11")).toV2), "`c11` IS NOT NULL") + Assertions.assertEquals(builder.build(And(EqualTo("c12", 15), EqualTo("c13", 16)).toV2), "(`c12` = 15 AND `c13` = 16)") + Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)") + Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1") + Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0") + Assertions.assertNull(builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2)) + + } + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/pom.xml b/spark-doris-connector/spark-doris-connector-spark-3.4/pom.xml index eeee285e..84b84a16 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.4/pom.xml +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/pom.xml @@ -48,6 +48,11 @@ org.apache.spark spark-sql_${scala.major.version} + + org.junit.jupiter + junit-jupiter-api + test + \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala index 9e199af5..cc8ddd2a 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala @@ -17,7 +17,29 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownV2Filters} import org.apache.spark.sql.types.StructType -class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {} +class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) + with SupportsPushDownV2Filters { + + private var pushDownPredicates: Array[Predicate] = Array[Predicate]() + + private val expressionBuilder = new V2ExpressionBuilder(config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)) + + override def build(): Scan = new DorisScanV2(config, schema, pushDownPredicates) + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val (pushed, unsupported) = predicates.partition(predicate => { + Option(expressionBuilder.build(predicate)).isDefined + }) + this.pushDownPredicates = pushed + unsupported + } + + override def pushedPredicates(): Array[Predicate] = pushDownPredicates + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala new file mode 100644 index 00000000..634257ae --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read + +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.types.StructType + +class DorisScanV2(config: DorisConfig, schema: StructType, filters: Array[Predicate]) extends AbstractDorisScan(config, schema) with Logging { + override protected def compiledFilters(): Array[String] = { + val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) + val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit) + filters.map(e => Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get) + } +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala new file mode 100644 index 00000000..f13830cb --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read.expression + +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} +import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} + +class V2ExpressionBuilder(inValueLengthLimit: Int) { + + def build(predicate: Expression): String = { + predicate match { + case and: And => s"(${build(and.left())} AND ${build(and.right())})" + case or: Or => s"(${build(or.left())} OR ${build(or.right())})" + case not: Not => + not.child().name() match { + case "IS_NULL" => build(new GeneralScalarExpression("IS_NOT_NULL", not.children()(0).children())) + case "=" => build(new GeneralScalarExpression("!=", not.children()(0).children())) + case _ => s"NOT (${build(not.child())})" + } + case _: AlwaysTrue => "1=1" + case _: AlwaysFalse => "1=0" + case expr: Expression => + expr match { + case literal: Literal[_] => literal.toString + case namedRef: NamedReference => namedRef.toString + case e: GeneralScalarExpression => e.name() match { + case "IN" => + val expressions = e.children() + if (expressions.nonEmpty && expressions.length <= inValueLengthLimit) { + s"""`${build(expressions(0))}` IN (${expressions.slice(1, expressions.length).map(build).mkString(",")})""" + } else null + case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL" + case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL" + case "STARTS_WITH" => visitStartWith(build(e.children()(0)), build(e.children()(1))); + case "ENDS_WITH" => visitEndWith(build(e.children()(0)), build(e.children()(1))); + case "CONTAINS" => visitContains(build(e.children()(0)), build(e.children()(1))); + case "=" => s"`${build(e.children()(0))}` = ${build(e.children()(1))}" + case "!=" | "<>" => s"`${build(e.children()(0))}` != ${build(e.children()(1))}" + case "<" => s"`${build(e.children()(0))}` < ${build(e.children()(1))}" + case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" + case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" + case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case _ => null + } + } + } + } + + def visitStartWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '$value%'" + } + + def visitEndWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value'" + } + + def visitContains(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value%'" + } + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala new file mode 100644 index 00000000..fc29495c --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.4/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala @@ -0,0 +1,49 @@ +package org.apache.doris.spark.read.expression + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.spark.sql.sources._ +import org.junit.jupiter.api.{Assertions, Test} + +class V2ExpressionBuilderTest { + + @Test + def buildTest(): Unit = { + + val builder = new V2ExpressionBuilder(10) + Assertions.assertEquals(builder.build(EqualTo("c0", 1).toV2), "`c0` = 1") + Assertions.assertEquals(builder.build(Not(EqualTo("c1", 2)).toV2), "`c1` != 2") + Assertions.assertEquals(builder.build(GreaterThan("c2", 3.4).toV2), "`c2` > 3.4") + Assertions.assertEquals(builder.build(GreaterThanOrEqual("c3", 5.6).toV2), "`c3` >= 5.6") + Assertions.assertEquals(builder.build(LessThan("c4", 7.8).toV2), "`c4` < 7.8") + Assertions.assertEquals(builder.build(LessThanOrEqual("c5", BigDecimal(9.1011)).toV2), "`c5` <= 9.1011") + Assertions.assertEquals(builder.build(StringStartsWith("c6","a").toV2), "`c6` LIKE 'a%'") + Assertions.assertEquals(builder.build(StringEndsWith("c7", "b").toV2), "`c7` LIKE '%b'") + Assertions.assertEquals(builder.build(StringContains("c8", "c").toV2), "`c8` LIKE '%c%'") + Assertions.assertEquals(builder.build(In("c9", Array(12,13,14)).toV2), "`c9` IN (12,13,14)") + Assertions.assertEquals(builder.build(IsNull("c10").toV2), "`c10` IS NULL") + Assertions.assertEquals(builder.build(Not(IsNull("c11")).toV2), "`c11` IS NOT NULL") + Assertions.assertEquals(builder.build(And(EqualTo("c12", 15), EqualTo("c13", 16)).toV2), "(`c12` = 15 AND `c13` = 16)") + Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)") + Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1") + Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0") + Assertions.assertNull(builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2)) + + } + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/pom.xml b/spark-doris-connector/spark-doris-connector-spark-3.5/pom.xml index 2f498b4d..ccccc66f 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.5/pom.xml +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/pom.xml @@ -48,6 +48,11 @@ org.apache.spark spark-sql_${scala.major.version} + + org.junit.jupiter + junit-jupiter-api + test + \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala index 9e199af5..cc8ddd2a 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanBuilder.scala @@ -17,7 +17,29 @@ package org.apache.doris.spark.read -import org.apache.doris.spark.config.DorisConfig +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownV2Filters} import org.apache.spark.sql.types.StructType -class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) {} +class DorisScanBuilder(config: DorisConfig, schema: StructType) extends DorisScanBuilderBase(config, schema) + with SupportsPushDownV2Filters { + + private var pushDownPredicates: Array[Predicate] = Array[Predicate]() + + private val expressionBuilder = new V2ExpressionBuilder(config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)) + + override def build(): Scan = new DorisScanV2(config, schema, pushDownPredicates) + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val (pushed, unsupported) = predicates.partition(predicate => { + Option(expressionBuilder.build(predicate)).isDefined + }) + this.pushDownPredicates = pushed + unsupported + } + + override def pushedPredicates(): Array[Predicate] = pushDownPredicates + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala new file mode 100644 index 00000000..634257ae --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/DorisScanV2.scala @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read + +import org.apache.doris.spark.config.{DorisConfig, DorisOptions} +import org.apache.doris.spark.read.expression.V2ExpressionBuilder +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.types.StructType + +class DorisScanV2(config: DorisConfig, schema: StructType, filters: Array[Predicate]) extends AbstractDorisScan(config, schema) with Logging { + override protected def compiledFilters(): Array[String] = { + val inValueLengthLimit = config.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT) + val v2ExpressionBuilder = new V2ExpressionBuilder(inValueLengthLimit) + filters.map(e => Option[String](v2ExpressionBuilder.build(e))).filter(_.isDefined).map(_.get) + } +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala new file mode 100644 index 00000000..f13830cb --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.read.expression + +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And, Not, Or} +import org.apache.spark.sql.connector.expressions.{Expression, GeneralScalarExpression, Literal, NamedReference} + +class V2ExpressionBuilder(inValueLengthLimit: Int) { + + def build(predicate: Expression): String = { + predicate match { + case and: And => s"(${build(and.left())} AND ${build(and.right())})" + case or: Or => s"(${build(or.left())} OR ${build(or.right())})" + case not: Not => + not.child().name() match { + case "IS_NULL" => build(new GeneralScalarExpression("IS_NOT_NULL", not.children()(0).children())) + case "=" => build(new GeneralScalarExpression("!=", not.children()(0).children())) + case _ => s"NOT (${build(not.child())})" + } + case _: AlwaysTrue => "1=1" + case _: AlwaysFalse => "1=0" + case expr: Expression => + expr match { + case literal: Literal[_] => literal.toString + case namedRef: NamedReference => namedRef.toString + case e: GeneralScalarExpression => e.name() match { + case "IN" => + val expressions = e.children() + if (expressions.nonEmpty && expressions.length <= inValueLengthLimit) { + s"""`${build(expressions(0))}` IN (${expressions.slice(1, expressions.length).map(build).mkString(",")})""" + } else null + case "IS_NULL" => s"`${build(e.children()(0))}` IS NULL" + case "IS_NOT_NULL" => s"`${build(e.children()(0))}` IS NOT NULL" + case "STARTS_WITH" => visitStartWith(build(e.children()(0)), build(e.children()(1))); + case "ENDS_WITH" => visitEndWith(build(e.children()(0)), build(e.children()(1))); + case "CONTAINS" => visitContains(build(e.children()(0)), build(e.children()(1))); + case "=" => s"`${build(e.children()(0))}` = ${build(e.children()(1))}" + case "!=" | "<>" => s"`${build(e.children()(0))}` != ${build(e.children()(1))}" + case "<" => s"`${build(e.children()(0))}` < ${build(e.children()(1))}" + case "<=" => s"`${build(e.children()(0))}` <= ${build(e.children()(1))}" + case ">" => s"`${build(e.children()(0))}` > ${build(e.children()(1))}" + case ">=" => s"`${build(e.children()(0))}` >= ${build(e.children()(1))}" + case _ => null + } + } + } + } + + def visitStartWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '$value%'" + } + + def visitEndWith(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value'" + } + + def visitContains(l: String, r: String): String = { + val value = r.substring(1, r.length - 1) + s"`$l` LIKE '%$value%'" + } + +} diff --git a/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala new file mode 100644 index 00000000..fc29495c --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-spark-3.5/src/test/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilderTest.scala @@ -0,0 +1,49 @@ +package org.apache.doris.spark.read.expression + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.spark.sql.sources._ +import org.junit.jupiter.api.{Assertions, Test} + +class V2ExpressionBuilderTest { + + @Test + def buildTest(): Unit = { + + val builder = new V2ExpressionBuilder(10) + Assertions.assertEquals(builder.build(EqualTo("c0", 1).toV2), "`c0` = 1") + Assertions.assertEquals(builder.build(Not(EqualTo("c1", 2)).toV2), "`c1` != 2") + Assertions.assertEquals(builder.build(GreaterThan("c2", 3.4).toV2), "`c2` > 3.4") + Assertions.assertEquals(builder.build(GreaterThanOrEqual("c3", 5.6).toV2), "`c3` >= 5.6") + Assertions.assertEquals(builder.build(LessThan("c4", 7.8).toV2), "`c4` < 7.8") + Assertions.assertEquals(builder.build(LessThanOrEqual("c5", BigDecimal(9.1011)).toV2), "`c5` <= 9.1011") + Assertions.assertEquals(builder.build(StringStartsWith("c6","a").toV2), "`c6` LIKE 'a%'") + Assertions.assertEquals(builder.build(StringEndsWith("c7", "b").toV2), "`c7` LIKE '%b'") + Assertions.assertEquals(builder.build(StringContains("c8", "c").toV2), "`c8` LIKE '%c%'") + Assertions.assertEquals(builder.build(In("c9", Array(12,13,14)).toV2), "`c9` IN (12,13,14)") + Assertions.assertEquals(builder.build(IsNull("c10").toV2), "`c10` IS NULL") + Assertions.assertEquals(builder.build(Not(IsNull("c11")).toV2), "`c11` IS NOT NULL") + Assertions.assertEquals(builder.build(And(EqualTo("c12", 15), EqualTo("c13", 16)).toV2), "(`c12` = 15 AND `c13` = 16)") + Assertions.assertEquals(builder.build(Or(EqualTo("c14", 17), EqualTo("c15", 18)).toV2), "(`c14` = 17 OR `c15` = 18)") + Assertions.assertEquals(builder.build(AlwaysTrue.toV2), "1=1") + Assertions.assertEquals(builder.build(AlwaysFalse.toV2), "1=0") + Assertions.assertNull(builder.build(In("c19", Array(19,20,21,22,23,24,25,26,27,28,29)).toV2)) + + } + +} diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala new file mode 100644 index 00000000..e69de29b diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala new file mode 100644 index 00000000..e69de29b From 8c4dd4668b0a441f5bb35468ba7e61ca8f8c8df2 Mon Sep 17 00:00:00 2001 From: gnehil Date: Tue, 31 Dec 2024 18:50:39 +0800 Subject: [PATCH 2/2] change spark 3 it case dependency module with spark version variable and add workflow --- .github/workflows/run-itcase-12.yml | 6 +++++- .github/workflows/run-itcase-20.yml | 6 +++++- spark-doris-connector/spark-doris-connector-it/pom.xml | 2 +- .../src/main/java/org/apache/doris/common/DorisScanV2.scala | 0 .../doris/common/expression/V2ExpressionBuilder.scala | 0 5 files changed, 11 insertions(+), 3 deletions(-) delete mode 100644 spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala delete mode 100644 spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala diff --git a/.github/workflows/run-itcase-12.yml b/.github/workflows/run-itcase-12.yml index fddcda73..fd283577 100644 --- a/.github/workflows/run-itcase-12.yml +++ b/.github/workflows/run-itcase-12.yml @@ -42,6 +42,10 @@ jobs: run: | cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" - - name: Run ITCases for spark 3 + - name: Run ITCases for spark 3.1 run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" + + - name: Run ITCases for spark 3.3 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:1.2.7.1_x86" diff --git a/.github/workflows/run-itcase-20.yml b/.github/workflows/run-itcase-20.yml index d16d810e..b0f31c09 100644 --- a/.github/workflows/run-itcase-20.yml +++ b/.github/workflows/run-itcase-20.yml @@ -42,7 +42,11 @@ jobs: run: | cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" - - name: Run ITCases for spark 3 + - name: Run ITCases for spark 3.1 run: | cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" + + - name: Run ITCases for spark 3.3 + run: | + cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase" -Dimage="adamlee489/doris:2.0.3" \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-it/pom.xml b/spark-doris-connector/spark-doris-connector-it/pom.xml index 1fcae06d..9797fb60 100644 --- a/spark-doris-connector/spark-doris-connector-it/pom.xml +++ b/spark-doris-connector/spark-doris-connector-it/pom.xml @@ -97,7 +97,7 @@ org.apache.doris - spark-doris-connector-spark-3.3 + spark-doris-connector-spark-${spark.major.version} ${project.version} test diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/DorisScanV2.scala deleted file mode 100644 index e69de29b..00000000 diff --git a/spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala b/spark-load/spark-load-core/src/main/java/org/apache/doris/common/expression/V2ExpressionBuilder.scala deleted file mode 100644 index e69de29b..00000000