Skip to content

Commit e7256b6

Browse files
committed
Cleaning.
1 parent a78cbbb commit e7256b6

3 files changed

Lines changed: 43 additions & 43 deletions

File tree

src/distributed_ext.rs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,6 @@ pub trait DistributedExt: Sized {
229229
resolver: T,
230230
);
231231

232-
/// Allows users to select the worker that should execute each task when a distributed query
233-
/// is about to run.
234-
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(self, router: T)
235-
-> Self;
236-
237-
/// Same as [DistributedExt::with_distributed_task_router] but with an in-place mutation.
238-
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
239-
240232
/// This is what tells Distributed DataFusion how to build a Worker gRPC client out of a worker URL.
241233
///
242234
/// There's a default implementation that caches the Worker client instances so that there's
@@ -333,6 +325,14 @@ pub trait DistributedExt: Sized {
333325
estimator: T,
334326
);
335327

328+
/// Allows users to select the worker that should execute each task when a distributed query
329+
/// is about to run.
330+
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(self, router: T)
331+
-> Self;
332+
333+
/// Same as [DistributedExt::with_distributed_task_router] but with an in-place mutation.
334+
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
335+
336336
/// Sets the maximum number of files each task in a stage with a FileScanConfig node will
337337
/// handle. Reducing this number will increment the amount of tasks. By default, this
338338
/// is close to the number of cores in the machine.
@@ -572,10 +572,6 @@ impl DistributedExt for SessionConfig {
572572
set_distributed_worker_resolver(self, resolver);
573573
}
574574

575-
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T) {
576-
set_distributed_task_router(self, router);
577-
}
578-
579575
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
580576
&mut self,
581577
resolver: T,
@@ -590,6 +586,10 @@ impl DistributedExt for SessionConfig {
590586
set_distributed_task_estimator(self, estimator)
591587
}
592588

589+
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T) {
590+
set_distributed_task_router(self, router);
591+
}
592+
593593
fn set_distributed_files_per_task(
594594
&mut self,
595595
files_per_task: usize,
@@ -689,10 +689,6 @@ impl DistributedExt for SessionConfig {
689689
#[expr($;self)]
690690
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
691691

692-
#[call(set_distributed_task_router)]
693-
#[expr($;self)]
694-
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
695-
696692
#[call(set_distributed_channel_resolver)]
697693
#[expr($;self)]
698694
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
@@ -701,6 +697,10 @@ impl DistributedExt for SessionConfig {
701697
#[expr($;self)]
702698
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
703699

700+
#[call(set_distributed_task_router)]
701+
#[expr($;self)]
702+
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
703+
704704
#[call(set_distributed_files_per_task)]
705705
#[expr($?;Ok(self))]
706706
fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
@@ -768,11 +768,6 @@ impl DistributedExt for SessionStateBuilder {
768768
#[expr($;self)]
769769
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
770770

771-
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
772-
#[call(set_distributed_task_router)]
773-
#[expr($;self)]
774-
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
775-
776771
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
777772
#[call(set_distributed_channel_resolver)]
778773
#[expr($;self)]
@@ -783,6 +778,11 @@ impl DistributedExt for SessionStateBuilder {
783778
#[expr($;self)]
784779
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
785780

781+
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
782+
#[call(set_distributed_task_router)]
783+
#[expr($;self)]
784+
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
785+
786786
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
787787
#[call(set_distributed_files_per_task)]
788788
#[expr($?;Ok(self))]
@@ -859,11 +859,6 @@ impl DistributedExt for SessionState {
859859
#[expr($;self)]
860860
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
861861

862-
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
863-
#[call(set_distributed_task_router)]
864-
#[expr($;self)]
865-
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
866-
867862
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
868863
#[call(set_distributed_channel_resolver)]
869864
#[expr($;self)]
@@ -874,6 +869,11 @@ impl DistributedExt for SessionState {
874869
#[expr($;self)]
875870
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
876871

872+
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
873+
#[call(set_distributed_task_router)]
874+
#[expr($;self)]
875+
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(mut self, router: T) -> Self;
876+
877877
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
878878
#[call(set_distributed_files_per_task)]
879879
#[expr($?;Ok(self))]
@@ -950,11 +950,6 @@ impl DistributedExt for SessionContext {
950950
#[expr($;self)]
951951
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
952952

953-
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
954-
#[call(set_distributed_task_router)]
955-
#[expr($;self)]
956-
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(self, router: T) -> Self;
957-
958953
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
959954
#[call(set_distributed_channel_resolver)]
960955
#[expr($;self)]
@@ -965,6 +960,11 @@ impl DistributedExt for SessionContext {
965960
#[expr($;self)]
966961
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;
967962

963+
fn set_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(&mut self, router: T);
964+
#[call(set_distributed_task_router)]
965+
#[expr($;self)]
966+
fn with_distributed_task_router<T: TaskRouter + Send + Sync + 'static>(self, router: T) -> Self;
967+
968968
fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
969969
#[call(set_distributed_files_per_task)]
970970
#[expr($?;Ok(self))]

src/execution_plans/distributed.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,12 @@ impl DistributedExec {
7676
})
7777
}
7878

79-
/// Prepares the distributed plan for execution, which implies:
80-
/// 1. Perform some worker assignation, choosing randomly from the given URLs and assigning one
81-
/// URL per task.
82-
/// 2. Sending the sliced subplans to the assigned URLs. For each URL assigned to a task, a
83-
/// network call feeding the subplan is necessary.
84-
/// 3. In each network boundary, set the input plan to `None`. That way, network boundaries
85-
/// become nodes without children and traversing them will not go further down in.
79+
/// Prepares the distributed plan by applying these steps to each network boundary:
80+
/// 1. Perform worker assignation according to the task router.
81+
/// 2. Send the sliced subplans to the assigned URLs. For each URL assigned to a task,
82+
/// perform a network call feeding the subplan to the specified URL.
83+
/// 3. Set the node's input plan to `None`. That way, network boundaries appear as leaf
84+
/// nodes, halting further traversal during execution.
8685
fn prepare_plan(&self, ctx: &Arc<TaskContext>) -> Result<PreparedPlan> {
8786
let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
8887
let task_router = get_distributed_task_router(ctx.session_config());
@@ -118,7 +117,7 @@ impl DistributedExec {
118117
return internal_err!("Plan is not set for stage {}", stage.num);
119118
};
120119

121-
// Right now, we assign random workers to tasks. This might change in the future.
120+
// The default task router assigns workers to tasks using a random seed.
122121
let start_idx = rand::rng().random_range(0..urls.len());
123122

124123
// This assumes the plan is the same for all the tasks within a stage. This is fine for
@@ -149,15 +148,15 @@ impl DistributedExec {
149148
let plan_send_latency = Arc::clone(&plan_send_latency);
150149
let ctx = Arc::clone(ctx);
151150
// Spawns the task that feeds this subplan to this worker. There will be as
152-
// many as this spawned tasks as workers.
151+
// many spawned tasks as workers.
153152
join_set.spawn(async move {
154153
send_plan_task(ctx, url, request).await?;
155154
plan_send_latency.record(&start);
156155
Ok(())
157156
});
158-
Ok(execution_task)
157+
execution_task
159158
})
160-
.collect::<Result<Vec<_>>>()?;
159+
.collect::<Vec<_>>();
161160

162161
Ok(Transformed::yes(plan.with_input_stage(Stage {
163162
query_id: stage.query_id,

src/networking/task_router.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use url::Url;
55

66
/// Allows users to choose which worker should execute a given distributed task.
77
pub trait TaskRouter {
8-
/// Returns the url of the worker that should execute the task.
8+
/// Returns the index of the worker that should execute the task. The default implementation
9+
/// assigns tasks to workers round-robin starting from start_index.
910
fn route_fn(&self, start_index: usize, task_number: usize, num_urls: usize) -> usize {
1011
(start_index + task_number) % num_urls
1112
}

0 commit comments

Comments
 (0)