Skip to content

Commit d8a6909

Browse files
committed
key into partition
1 parent d7bb223 commit d8a6909

3 files changed

Lines changed: 164 additions & 0 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,10 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
683683
PTransformMatchers.groupWithShardableStates(),
684684
new GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
685685
this)));
686+
overridesBuilder.add(
687+
PTransformOverride.of(
688+
KafkaIO.Read.KEYED_BY_PARTITION_MATCHER,
689+
new KeyedByPartitionOverride.StreamingKeyedByPartitionOverrideFactory(this)));
686690

687691
overridesBuilder
688692
.add(
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow;
19+
20+
import java.util.Map;
21+
import org.apache.beam.sdk.io.kafka.KafkaIO;
22+
import org.apache.beam.sdk.io.kafka.KafkaRecord;
23+
import org.apache.beam.sdk.runners.AppliedPTransform;
24+
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
25+
import org.apache.beam.sdk.transforms.PTransform;
26+
import org.apache.beam.sdk.util.construction.PTransformReplacements;
27+
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
28+
import org.apache.beam.sdk.values.KV;
29+
import org.apache.beam.sdk.values.PCollection;
30+
import org.apache.beam.sdk.values.TupleTag;
31+
32+
@SuppressWarnings({
33+
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
34+
})
35+
public class KeyedByPartitionOverride {
36+
37+
static class StreamingKeyedByPartitionOverrideFactory<K, V>
38+
implements PTransformOverrideFactory<
39+
PCollection<KafkaRecord<K, V>>,
40+
PCollection<KV<Integer, V>>,
41+
KafkaIO.Read.KeyedByPartition<K, V>> {
42+
43+
private final DataflowRunner runner;
44+
45+
StreamingKeyedByPartitionOverrideFactory(DataflowRunner runner) {
46+
this.runner = runner;
47+
}
48+
49+
@Override
50+
public PTransformReplacement<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>>
51+
getReplacementTransform(
52+
AppliedPTransform<
53+
PCollection<KafkaRecord<K, V>>,
54+
PCollection<KV<Integer, V>>,
55+
KafkaIO.Read.KeyedByPartition<K, V>>
56+
transform) {
57+
return PTransformReplacement.of(
58+
PTransformReplacements.getSingletonMainInput(transform),
59+
new StreamingKeyedByPartition<>(
60+
runner,
61+
transform.getTransform(),
62+
PTransformReplacements.getSingletonMainOutput(transform)));
63+
}
64+
65+
@Override
66+
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
67+
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<Integer, V>> newOutput) {
68+
return ReplacementOutputs.singleton(outputs, newOutput);
69+
}
70+
}
71+
72+
static class StreamingKeyedByPartition<K, V>
73+
extends PTransform<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>> {
74+
75+
private final transient DataflowRunner runner;
76+
private final KafkaIO.Read.KeyedByPartition<K, V> originalTransform;
77+
private final transient PCollection<KV<Integer, V>> originalOutput;
78+
79+
public StreamingKeyedByPartition(
80+
DataflowRunner runner,
81+
KafkaIO.Read.KeyedByPartition<K, V> original,
82+
PCollection<KV<Integer, V>> output) {
83+
this.runner = runner;
84+
this.originalTransform = original;
85+
this.originalOutput = output;
86+
}
87+
88+
@Override
89+
public PCollection<KV<Integer, V>> expand(PCollection<KafkaRecord<K, V>> input) {
90+
// Record the output PCollection of the original transform since the new output will be
91+
// replaced by the original one when the replacement transform is wired to other nodes in the
92+
// graph, although the old and the new outputs are effectively the same.
93+
runner.maybeRecordPCollectionPreservedKeys(originalOutput);
94+
System.out.println("StreamingKeyedByPartition override");
95+
return input.apply(originalTransform);
96+
}
97+
}
98+
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.beam.sdk.options.StreamingOptions;
6464
import org.apache.beam.sdk.options.ValueProvider;
6565
import org.apache.beam.sdk.runners.AppliedPTransform;
66+
import org.apache.beam.sdk.runners.PTransformMatcher;
6667
import org.apache.beam.sdk.runners.PTransformOverride;
6768
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
6869
import org.apache.beam.sdk.schemas.JavaFieldSchema;
@@ -1575,6 +1576,10 @@ public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
15751576
return new TypedWithoutMetadata<>(this);
15761577
}
15771578

1579+
public PTransform<PBegin, PCollection<KV<Integer, V>>> keyedByPartition() {
1580+
return new ValuesKeyedByPartition<>(this);
1581+
}
1582+
15781583
public PTransform<PBegin, PCollection<Row>> externalWithMetadata() {
15791584
return new RowsWithMetadata<>(this);
15801585
}
@@ -1818,6 +1823,28 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
18181823
}
18191824
}
18201825

1826+
@Internal
1827+
public static final PTransformMatcher KEYED_BY_PARTITION_MATCHER =
1828+
PTransformMatchers.classEqualTo(KeyedByPartition.class);
1829+
1830+
public static class KeyedByPartition<K, V>
1831+
extends PTransform<PCollection<KafkaRecord<K, V>>, PCollection<KV<Integer, V>>> {
1832+
1833+
@Override
1834+
public PCollection<KV<Integer, V>> expand(PCollection<KafkaRecord<K, V>> input) {
1835+
return input.apply(
1836+
"Repartition",
1837+
ParDo.of(
1838+
new DoFn<KafkaRecord<K, V>, KV<Integer, V>>() {
1839+
@ProcessElement
1840+
public void processElement(ProcessContext ctx) {
1841+
ctx.output(
1842+
KV.of(ctx.element().getPartition(), ctx.element().getKV().getValue()));
1843+
}
1844+
}));
1845+
}
1846+
}
1847+
18211848
private abstract static class AbstractReadFromKafka<K, V>
18221849
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
18231850
Read<K, V> kafkaRead;
@@ -2170,6 +2197,41 @@ public void populateDisplayData(DisplayData.Builder builder) {
21702197
}
21712198
}
21722199

2200+
public static class ValuesKeyedByPartition<K, V>
2201+
extends PTransform<PBegin, PCollection<KV<Integer, V>>> {
2202+
private final Read<K, V> read;
2203+
2204+
ValuesKeyedByPartition(Read<K, V> read) {
2205+
super("KafkaIO.Read");
2206+
this.read = read;
2207+
}
2208+
2209+
static class Builder<K, V>
2210+
implements ExternalTransformBuilder<
2211+
Read.External.Configuration, PBegin, PCollection<KV<Integer, V>>> {
2212+
2213+
@Override
2214+
public PTransform<PBegin, PCollection<KV<Integer, V>>> buildExternal(
2215+
Read.External.Configuration config) {
2216+
Read.Builder<K, V> readBuilder = new AutoValue_KafkaIO_Read.Builder<>();
2217+
Read.Builder.setupExternalBuilder(readBuilder, config);
2218+
2219+
return readBuilder.build().keyedByPartition();
2220+
}
2221+
}
2222+
2223+
@Override
2224+
public PCollection<KV<Integer, V>> expand(PBegin begin) {
2225+
return begin.apply(read).apply("KeyedByPartition", new Read.KeyedByPartition<>());
2226+
}
2227+
2228+
@Override
2229+
public void populateDisplayData(DisplayData.Builder builder) {
2230+
super.populateDisplayData(builder);
2231+
read.populateDisplayData(builder);
2232+
}
2233+
}
2234+
21732235
/**
21742236
* A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes
21752237
* Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more

0 commit comments

Comments
 (0)