diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala index fb8b1ee1fe3f4..ad74661438ebb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala @@ -144,6 +144,94 @@ trait MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests (3, 75, "newdep")).toDF("pk", "salary", "dep") ) + // When assigning s.bonus to existing t.salary and source.salary has a wider type (long) than + // target.salary (int), no evolution should occur because the assignment uses s.bonus, not + // s.salary. The type mismatch on the same-named column should be irrelevant. + testEvolution("source has extra column with type mismatch on existing column -" + + "should not evolve when assigning from differently named source column")( + targetData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, 100, "hr"), + Row(2, 200, "software") + )), schema) + }, + sourceData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", LongType), + StructField("dep", StringType), + StructField("bonus", LongType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, 150L, "dummy", 50L), + Row(3, 250L, "dummy", 75L) + )), schema) + }, + clauses = Seq( + update(set = "salary = s.bonus"), + insert(values = "(pk, salary, dep) VALUES (s.pk, s.bonus, 'newdep')") + ), + expected = Seq( + (1, 100, "hr"), + (2, 50, "software"), + (3, 75, "newdep")).toDF("pk", "salary", "dep"), + expectedWithoutEvolution = Seq( + (1, 100, "hr"), + (2, 50, "software"), + (3, 75, "newdep")).toDF("pk", "salary", "dep"), + expectedSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType) + )), + expectedSchemaWithoutEvolution = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType) + )) + ) + + // When assigning s.bonus (StringType) to target salary (IntegerType), the types are + // incompatible. This should fail both with and without schema evolution because the explicit + // assignment has mismatched types regardless of evolution. + testEvolution("source has extra column with type mismatch on existing column -" + + "should fail when assigning from incompatible source column")( + targetData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, 100, "hr"), + Row(2, 200, "software") + )), schema) + }, + sourceData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", LongType), + StructField("dep", StringType), + StructField("bonus", StringType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, 150L, "dummy", "fifty"), + Row(3, 250L, "dummy", "seventy-five") + )), schema) + }, + clauses = Seq( + update(set = "salary = s.bonus"), + insert(values = "(pk, salary, dep) VALUES (s.pk, s.bonus, 'newdep')") + ), + expectErrorContains = "Cannot safely cast", + expectErrorWithoutEvolutionContains = "Cannot safely cast" + ) + // No evolution when using named_struct to construct value without referencing new field testNestedStructsEvolution("source has extra struct field -" + "no evolution when not directly referencing new field - INSERT")(