diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala index 4af0040fcef5..fdb3a68f2aae 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala @@ -664,4 +664,32 @@ abstract class IcebergSuite extends WholeStageTransformerSuite { assert(result.head.getString(1) == "test_data") } } + + test("assert_not_null with iceberg table") { + withTable("iceberg_not_null") { + spark.sql(""" + |CREATE TABLE iceberg_not_null (id BIGINT NOT NULL, name STRING NOT NULL) + |USING iceberg + |""".stripMargin) + // Insert non-null values should succeed with AssertNotNull offloaded. + spark.sql("INSERT INTO iceberg_not_null VALUES (1, 'a'), (2, 'b')") + runQueryAndCompare("SELECT * FROM iceberg_not_null") { + checkGlutenPlan[IcebergScanTransformer] + } + + // Insert from a query with nullable source columns. + spark.sql( + "INSERT INTO iceberg_not_null SELECT id + 10, CAST(id AS STRING) FROM iceberg_not_null") + val df = runQueryAndCompare("SELECT * FROM iceberg_not_null ORDER BY id") { _ => } + assert(df.count() == 4) + + // Insert null into NOT NULL column should throw. + val e = intercept[Exception] { + spark.sql("INSERT INTO iceberg_not_null VALUES (null, 'c')").collect() + } + assert( + e.getMessage.contains("null") || e.getMessage.contains("NOT_NULL") || + e.getCause != null && e.getCause.getMessage.contains("null")) + } + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index b13aced2a62c..77ad2dde0943 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -23,6 +23,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.execution.ScalarSubquery @@ -294,6 +295,7 @@ object ExpressionMappings { Sig[WidthBucket](WIDTH_BUCKET), Sig[ReplicateRows](REPLICATE_ROWS), Sig[RaiseError](RAISE_ERROR), + Sig[AssertNotNull](ASSERT_NOT_NULL), Sig[SparkVersion](VERSION), // Decimal Sig[UnscaledValue](UNSCALED_VALUE), diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 1207121da708..e134da830678 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -878,6 +878,13 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenBitmapExpressionsQuerySuite] enableSuite[GlutenEmptyInSuite] enableSuite[GlutenRuntimeNullChecksV2Writes] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("NOT NULL checks for atomic top-level fields (byName)") + .exclude("NOT NULL checks for atomic top-level fields (byPosition)") + .exclude("NOT NULL checks for nested struct fields (byName)") + .exclude("NOT NULL checks for nested struct fields (byPosition)") + .exclude("NOT NULL checks for nullable array with required element (byPosition)") + .exclude("not null checks for fields inside nullable array (byPosition)") enableSuite[GlutenTableOptionsConstantFoldingSuite] enableSuite[GlutenDeltaBasedMergeIntoTableSuite] enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 4f7c67daaad6..867b16d6d66a 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceV2FunctionSuite] enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite] enableSuite[GlutenDataSourceV2SQLSuiteV1Filter] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("CreateTableAsSelect: nullable schema") enableSuite[GlutenDataSourceV2SQLSuiteV2Filter] enableSuite[GlutenDataSourceV2Suite] // Rewrite the following tests in GlutenDataSourceV2Suite. @@ -785,6 +787,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFilteredScanSuite] enableSuite[GlutenFiltersSuite] enableSuite[GlutenInsertSuite] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") // the native write staing dir is differnt with vanilla Spark for coustom partition paths .exclude("SPARK-35106: Throw exception when rename custom partition paths returns false") .exclude("Stop task set if FileAlreadyExistsException was thrown") @@ -1103,21 +1107,38 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenBitmapExpressionsQuerySuite] enableSuite[GlutenEmptyInSuite] enableSuite[GlutenRuntimeNullChecksV2Writes] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("NOT NULL checks for atomic top-level fields (byName)") + .exclude("NOT NULL checks for atomic top-level fields (byPosition)") + .exclude("NOT NULL checks for nested struct fields (byName)") + .exclude("NOT NULL checks for nested struct fields (byPosition)") + .exclude("NOT NULL checks for nullable array with required element (byPosition)") + .exclude("not null checks for fields inside nullable array (byPosition)") enableSuite[GlutenTableOptionsConstantFoldingSuite] enableSuite[GlutenDeltaBasedMergeIntoTableSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite] // FIXME: complex type result mismatch .exclude("update nested struct fields") .exclude("update char/varchar columns") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("update with NOT NULL checks") enableSuite[GlutenDeltaBasedUpdateTableSuite] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("update with NOT NULL checks") enableSuite[GlutenGroupBasedMergeIntoTableSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenFileSourceCustomMetadataStructSuite] enableSuite[GlutenParquetFileMetadataStructRowIndexSuite] enableSuite[GlutenTableLocationSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 0dadfa1d0bd8..8cfd6835c2b7 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenDataSourceV2FunctionSuite] enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite] enableSuite[GlutenDataSourceV2SQLSuiteV1Filter] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("CreateTableAsSelect: nullable schema") enableSuite[GlutenDataSourceV2SQLSuiteV2Filter] enableSuite[GlutenDataSourceV2Suite] // Rewrite the following tests in GlutenDataSourceV2Suite. @@ -751,6 +753,8 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenFilteredScanSuite] enableSuite[GlutenFiltersSuite] enableSuite[GlutenInsertSuite] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") // the native write staing dir is differnt with vanilla Spark for coustom partition paths .exclude("SPARK-35106: Throw exception when rename custom partition paths returns false") .exclude("Stop task set if FileAlreadyExistsException was thrown") @@ -1089,21 +1093,39 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenBitmapExpressionsQuerySuite] enableSuite[GlutenEmptyInSuite] enableSuite[GlutenRuntimeNullChecksV2Writes] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("NOT NULL checks for atomic top-level fields (byName)") + .exclude("NOT NULL checks for atomic top-level fields (byPosition)") + .exclude("NOT NULL checks for nested struct fields (byName)") + .exclude("NOT NULL checks for nested struct fields (byPosition)") + .exclude("NOT NULL checks for nested structs, arrays, maps (byName)") + .exclude("NOT NULL checks for nullable array with required element (byPosition)") + .exclude("not null checks for fields inside nullable array (byPosition)") enableSuite[GlutenTableOptionsConstantFoldingSuite] enableSuite[GlutenDeltaBasedMergeIntoTableSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite] // FIXME: complex type result mismatch .exclude("update nested struct fields") .exclude("update char/varchar columns") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("update with NOT NULL checks") enableSuite[GlutenDeltaBasedUpdateTableSuite] + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("update with NOT NULL checks") enableSuite[GlutenGroupBasedMergeIntoTableSuite] // Replaced by Gluten versions that handle wrapped exceptions .excludeByPrefix("merge cardinality check with") + // Velox assert_not_null throws VeloxUserError instead of SparkRuntimeException + .exclude("merge with NOT NULL checks") enableSuite[GlutenFileSourceCustomMetadataStructSuite] enableSuite[GlutenParquetFileMetadataStructRowIndexSuite] enableSuite[GlutenTableLocationSuite] diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index f2ae5647b141..b168cf4fa250 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -348,6 +348,7 @@ object ExpressionNames { final val VERSION = "version" final val AT_LEAST_N_NON_NULLS = "at_least_n_non_nulls" final val ASSERT_TRUE = "assert_true" + final val ASSERT_NOT_NULL = "assert_not_null" final val NULLIF = "nullif" final val NVL = "nvl" final val NVL2 = "nvl2"