Skip to content

Commit 9fe425f

Browse files
authored
Merge pull request #38083 from Abacn/fix-flink-race
Ensure LazyFlinkSourceSplitEnumerator handle splits happens after initialized
2 parents 28a0cb1 + 5fc49f9 commit 9fe425f

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"revision": 1
2+
"revision": 2
33
}

runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/LazyFlinkSourceSplitEnumerator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.List;
2424
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
2526
import javax.annotation.Nullable;
2627
import org.apache.beam.runners.flink.FlinkPipelineOptions;
2728
import org.apache.beam.sdk.io.BoundedSource;
@@ -53,7 +54,8 @@ public class LazyFlinkSourceSplitEnumerator<T>
5354
private final PipelineOptions pipelineOptions;
5455
private final int numSplits;
5556
private final List<FlinkSourceSplit<T>> pendingSplits;
56-
private boolean splitsInitialized;
57+
private volatile boolean splitsInitialized;
58+
private final CountDownLatch initializationLatch = new CountDownLatch(1);
5759

5860
public LazyFlinkSourceSplitEnumerator(
5961
SplitEnumeratorContext<FlinkSourceSplit<T>> context,
@@ -90,13 +92,16 @@ public void initializeSplits() {
9092
return pendingSplits;
9193
} catch (Exception e) {
9294
throw new RuntimeException(e);
95+
} finally {
96+
initializationLatch.countDown();
9397
}
9498
},
9599
(sourceSplits, error) -> {
96100
if (error != null) {
97101
pendingSplits.addAll(sourceSplits);
98102
throw new RuntimeException("Failed to start source enumerator.", error);
99103
}
104+
splitsInitialized = true;
100105
});
101106
}
102107

@@ -113,6 +118,16 @@ public void handleSplitRequest(int subtask, @Nullable String hostname) {
113118
LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
114119
}
115120

121+
if (!splitsInitialized) {
122+
try {
123+
initializationLatch.await();
124+
} catch (InterruptedException e) {
125+
Thread.currentThread().interrupt();
126+
LOG.warn("Interrupted while waiting for splits initialization", e);
127+
return;
128+
}
129+
}
130+
116131
if (!pendingSplits.isEmpty()) {
117132
final FlinkSourceSplit<T> split = pendingSplits.remove(pendingSplits.size() - 1);
118133
context.assignSplit(split, subtask);

0 commit comments

Comments
 (0)