Skip to content

Commit 289d515

Browse files
authored
Merge pull request #38081 from ahmedabu98/cp_38077
CP portable date java and model changes
2 parents 0a2244c + 7933c10 commit 289d515

File tree

7 files changed

+33
-12
lines changed

7 files changed

+33
-12
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run ",
3-
"modification": 0
3+
"modification": 1
44
}

model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,13 @@ message LogicalTypes {
174174
// A variable-length string with its maximum length as the argument.
175175
VAR_CHAR = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
176176
"beam:logical_type:var_char:v1"];
177+
178+
// A URN for Date type
179+
// - Representation type: INT64
180+
// - A date without a timezone, represented by the number of days
181+
// since the epoch.
182+
DATE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
183+
"beam:logical_type:date:v1"];
177184
}
178185
}
179186

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,8 @@ public boolean typesEqual(FieldType other) {
974974
getLogicalType().getIdentifier(), other.getLogicalType().getIdentifier())) {
975975
return false;
976976
}
977-
if (!getLogicalType().getArgumentType().equals(other.getLogicalType().getArgumentType())) {
977+
if (!Objects.equals(
978+
getLogicalType().getArgumentType(), other.getLogicalType().getArgumentType())) {
978979
return false;
979980
}
980981
if (!Row.Equals.deepEquals(

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.schemas.Schema.FieldType;
4444
import org.apache.beam.sdk.schemas.Schema.LogicalType;
4545
import org.apache.beam.sdk.schemas.Schema.TypeName;
46+
import org.apache.beam.sdk.schemas.logicaltypes.Date;
4647
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
4748
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
4849
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
@@ -113,6 +114,7 @@ private static String getLogicalTypeUrn(String identifier) {
113114
.put(VariableBytes.IDENTIFIER, VariableBytes.class)
114115
.put(FixedString.IDENTIFIER, FixedString.class)
115116
.put(VariableString.IDENTIFIER, VariableString.class)
117+
.put(Date.IDENTIFIER, Date.class)
116118
.build();
117119

118120
public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Date.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
package org.apache.beam.sdk.schemas.logicaltypes;
1919

2020
import java.time.LocalDate;
21+
import org.apache.beam.model.pipeline.v1.RunnerApi;
22+
import org.apache.beam.model.pipeline.v1.SchemaApi;
2123
import org.apache.beam.sdk.schemas.Schema;
24+
import org.checkerframework.checker.nullness.qual.Nullable;
2225

2326
/**
2427
* A date without a time-zone.
@@ -30,23 +33,20 @@
3033
* incrementing count of days where day 0 is 1970-01-01 (ISO).
3134
*/
3235
public class Date implements Schema.LogicalType<LocalDate, Long> {
33-
public static final String IDENTIFIER = "beam:logical_type:date:v1";
36+
public static final String IDENTIFIER =
37+
SchemaApi.LogicalTypes.Enum.DATE
38+
.getValueDescriptor()
39+
.getOptions()
40+
.getExtension(RunnerApi.beamUrn);
3441

3542
@Override
3643
public String getIdentifier() {
3744
return IDENTIFIER;
3845
}
3946

40-
// unused
4147
@Override
42-
public Schema.FieldType getArgumentType() {
43-
return Schema.FieldType.STRING;
44-
}
45-
46-
// unused
47-
@Override
48-
public String getArgument() {
49-
return "";
48+
public Schema.@Nullable FieldType getArgumentType() {
49+
return null;
5050
}
5151

5252
@Override

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.math.BigDecimal;
2727
import java.nio.charset.StandardCharsets;
28+
import java.time.LocalDate;
2829
import java.time.LocalDateTime;
2930
import java.util.ArrayList;
3031
import java.util.HashMap;
@@ -142,6 +143,7 @@ public static Iterable<Schema> data() {
142143
Field.of("decimal", FieldType.DECIMAL), Field.of("datetime", FieldType.DATETIME)))
143144
.add(Schema.of(Field.of("fixed_bytes", FieldType.logicalType(FixedBytes.of(24)))))
144145
.add(Schema.of(Field.of("micros_instant", FieldType.logicalType(new MicrosInstant()))))
146+
.add(Schema.of(Field.of("date", FieldType.logicalType(SqlTypes.DATE))))
145147
.add(Schema.of(Field.of("python_callable", FieldType.logicalType(new PythonCallable()))))
146148
.add(
147149
Schema.of(
@@ -388,6 +390,7 @@ public static Iterable<Row> data() {
388390
.add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000)))
389391
.add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str"))
390392
.add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1)))
393+
.add(simpleRow(FieldType.logicalType(SqlTypes.DATE), LocalDate.of(2000, 1, 3)))
391394
.add(simpleNullRow(FieldType.STRING))
392395
.add(simpleNullRow(FieldType.INT32))
393396
.add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32)))

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.sql.SQLException;
3939
import java.sql.Time;
4040
import java.sql.Timestamp;
41+
import java.time.LocalDate;
4142
import java.util.ArrayList;
4243
import java.util.Arrays;
4344
import java.util.Calendar;
@@ -294,6 +295,13 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller(
294295
return (element, ps, i, fieldWithIndex) -> {
295296
ps.setBigDecimal(i + 1, element.getDecimal(fieldWithIndex.getIndex()));
296297
};
298+
} else if (logicalTypeName.equals(
299+
org.apache.beam.sdk.schemas.logicaltypes.Date.IDENTIFIER)) {
300+
return (element, ps, i, fieldWithIndex) -> {
301+
LocalDate value =
302+
element.getLogicalTypeValue(fieldWithIndex.getIndex(), LocalDate.class);
303+
ps.setDate(i + 1, value == null ? null : Date.valueOf(value));
304+
};
297305
} else if (logicalTypeName.equals("DATE")) {
298306
return (element, ps, i, fieldWithIndex) -> {
299307
ReadableDateTime value = element.getDateTime(fieldWithIndex.getIndex());

0 commit comments

Comments
 (0)