Skip to content

Commit 0901331

Browse files
authored
Add a FailoverChannel wrapper on top of IsolationChannel to maintain a set of primary and failover channel. (#37840)
1 parent ceed48c commit 0901331

9 files changed

Lines changed: 1149 additions & 76 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@
8787
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
8888
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
8989
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
90+
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
9091
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
9192
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
93+
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.FailoverChannel;
9294
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
9395
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
9496
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
@@ -113,6 +115,7 @@
113115
import org.apache.beam.sdk.metrics.MetricsEnvironment;
114116
import org.apache.beam.sdk.util.construction.CoderTranslation;
115117
import org.apache.beam.sdk.values.WindowedValues;
118+
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
116119
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
117120
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
118121
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
@@ -382,7 +385,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
382385
MemoryMonitor memoryMonitor,
383386
GrpcDispatcherClient dispatcherClient) {
384387
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
385-
ChannelCache channelCache = createChannelCache(options, checkNotNull(configFetcher));
388+
ChannelCache channelCache =
389+
createChannelCache(options, checkNotNull(configFetcher), dispatcherClient);
386390
@SuppressWarnings("methodref.receiver.bound")
387391
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
388392
FanOutStreamingEngineWorkerHarness.create(
@@ -785,20 +789,32 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options)
785789
}
786790

787791
private static ChannelCache createChannelCache(
788-
DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) {
792+
DataflowWorkerHarnessOptions workerOptions,
793+
ComputationConfig.Fetcher configFetcher,
794+
GrpcDispatcherClient dispatcherClient) {
789795
ChannelCache channelCache =
790796
ChannelCache.create(
791797
(currentFlowControlSettings, serviceAddress) -> {
792-
// IsolationChannel will create and manage separate RPC channels to the same
793-
// serviceAddress.
798+
// IsolationChannel wrapping FailoverChannel so that each active RPC gets its own
799+
// FailoverChannel instance. The fallback channel is created lazily, at most once,
800+
// only if failover is actually needed.
794801
return IsolationChannel.create(
795802
() ->
796-
remoteChannel(
797-
serviceAddress,
798-
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
799-
currentFlowControlSettings),
803+
FailoverChannel.create(
804+
remoteChannel(
805+
serviceAddress,
806+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
807+
currentFlowControlSettings),
808+
() ->
809+
remoteChannel(
810+
dispatcherClient.getDispatcherEndpoints().iterator().next(),
811+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
812+
currentFlowControlSettings),
813+
MoreCallCredentials.from(
814+
new VendoredCredentialsAdapter(workerOptions.getGcpCredential()))),
800815
currentFlowControlSettings.getOnReadyThresholdBytes());
801816
});
817+
802818
configFetcher
803819
.getGlobalConfigHandle()
804820
.registerConfigObserver(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -410,15 +410,18 @@ private GlobalDataStreamSender getOrCreateGlobalDataSteam(
410410
}
411411

412412
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint endpoint) {
413+
GetWorkRequest.Builder getWorkRequestBuilder =
414+
GetWorkRequest.newBuilder()
415+
.setClientId(jobHeader.getClientId())
416+
.setJobId(jobHeader.getJobId())
417+
.setProjectId(jobHeader.getProjectId())
418+
.setWorkerId(jobHeader.getWorkerId());
419+
endpoint.workerToken().ifPresent(getWorkRequestBuilder::setBackendWorkerToken);
420+
413421
WindmillStreamSender windmillStreamSender =
414422
WindmillStreamSender.create(
415423
WindmillConnection.from(endpoint, this::createWindmillStub),
416-
GetWorkRequest.newBuilder()
417-
.setClientId(jobHeader.getClientId())
418-
.setJobId(jobHeader.getJobId())
419-
.setProjectId(jobHeader.getProjectId())
420-
.setWorkerId(jobHeader.getWorkerId())
421-
.build(),
424+
getWorkRequestBuilder.build(),
422425
GetWorkBudget.noBudget(),
423426
streamFactory,
424427
workItemScheduler,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
108108
: randomlySelectNextStub(windmillServiceStubs));
109109
}
110110

111-
ImmutableSet<HostAndPort> getDispatcherEndpoints() {
111+
public ImmutableSet<HostAndPort> getDispatcherEndpoints() {
112112
return dispatcherStubs.get().dispatcherEndpoints();
113113
}
114114

0 commit comments

Comments
 (0)