Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,94 @@ trait MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests
(3, 75, "newdep")).toDF("pk", "salary", "dep")
)

// When assigning s.bonus to existing t.salary and source.salary has a wider type (long) than
// target.salary (int), no evolution should occur because the assignment uses s.bonus, not
// s.salary. The type mismatch on the same-named column should be irrelevant.
testEvolution("source has extra column with type mismatch on existing column -" +
"should not evolve when assigning from differently named source column")(
targetData = {
val schema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", IntegerType),
StructField("dep", StringType)
))
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row(1, 100, "hr"),
Row(2, 200, "software")
)), schema)
},
sourceData = {
val schema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", LongType),
StructField("dep", StringType),
StructField("bonus", LongType)
))
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row(2, 150L, "dummy", 50L),
Row(3, 250L, "dummy", 75L)
)), schema)
},
clauses = Seq(
update(set = "salary = s.bonus"),
insert(values = "(pk, salary, dep) VALUES (s.pk, s.bonus, 'newdep')")
),
expected = Seq(
(1, 100, "hr"),
(2, 50, "software"),
(3, 75, "newdep")).toDF("pk", "salary", "dep"),
expectedWithoutEvolution = Seq(
(1, 100, "hr"),
(2, 50, "software"),
(3, 75, "newdep")).toDF("pk", "salary", "dep"),
expectedSchema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", IntegerType),
StructField("dep", StringType)
)),
expectedSchemaWithoutEvolution = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", IntegerType),
StructField("dep", StringType)
))
)

// When assigning s.bonus (StringType) to target salary (IntegerType), the types are
// incompatible. This should fail both with and without schema evolution because the explicit
// assignment has mismatched types regardless of evolution.
testEvolution("source has extra column with type mismatch on existing column -" +
"should fail when assigning from incompatible source column")(
targetData = {
val schema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", IntegerType),
StructField("dep", StringType)
))
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row(1, 100, "hr"),
Row(2, 200, "software")
)), schema)
},
sourceData = {
val schema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("salary", LongType),
StructField("dep", StringType),
StructField("bonus", StringType)
))
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row(2, 150L, "dummy", "fifty"),
Row(3, 250L, "dummy", "seventy-five")
)), schema)
},
clauses = Seq(
update(set = "salary = s.bonus"),
insert(values = "(pk, salary, dep) VALUES (s.pk, s.bonus, 'newdep')")
),
expectErrorContains = "Cannot safely cast",
expectErrorWithoutEvolutionContains = "Cannot safely cast"
)

// No evolution when using named_struct to construct value without referencing new field
testNestedStructsEvolution("source has extra struct field -" +
"no evolution when not directly referencing new field - INSERT")(
Expand Down