Skip to content

Commit 08833b8

Browse files
committed
Cleaner interface.
1 parent e7256b6 commit 08833b8

3 files changed

Lines changed: 19 additions & 13 deletions

File tree

src/execution_plans/distributed.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::common::require_one_child;
22
use crate::config_extension_ext::get_config_extension_propagation_headers;
33
use crate::distributed_planner::NetworkBoundaryExt;
4-
use crate::networking::{get_distributed_task_router, get_distributed_worker_resolver};
4+
use crate::networking::{RouterInfo, get_distributed_task_router, get_distributed_worker_resolver};
55
use crate::passthrough_headers::get_passthrough_headers;
66
use crate::protobuf::{DistributedCodec, tonic_status_to_datafusion_error};
77
use crate::stage::{ExecutionTask, Stage};
@@ -131,7 +131,11 @@ impl DistributedExec {
131131
.iter()
132132
.enumerate()
133133
.map(|(i, _)| {
134-
let url = task_router.route_task(start_idx, i, &urls);
134+
let url = task_router.route(RouterInfo {
135+
task_number: i,
136+
urls: urls.clone(),
137+
stage_seed: start_idx,
138+
});
135139
let execution_task = ExecutionTask {
136140
url: Some(url.clone()),
137141
};

src/networking/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub use channel_resolver::{
77
get_distributed_channel_resolver,
88
};
99
pub(crate) use channel_resolver::{ChannelResolverExtension, set_distributed_channel_resolver};
10-
pub use task_router::{TaskRouter, get_distributed_task_router};
10+
pub use task_router::{RouterInfo, TaskRouter, get_distributed_task_router};
1111
pub(crate) use task_router::{TaskRouterExtension, set_distributed_task_router};
1212

1313
pub use worker_resolver::{WorkerResolver, get_distributed_worker_resolver};

src/networking/task_router.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@ use datafusion::prelude::SessionConfig;
33
use std::sync::Arc;
44
use url::Url;
55

6-
/// Allows users to choose which worker should execute a given distributed task.
7-
pub trait TaskRouter {
8-
/// Returns the index of the worker that should execute the task. The default implementation
9-
/// assigns tasks to workers round-robin starting from start_index.
10-
fn route_fn(&self, start_index: usize, task_number: usize, num_urls: usize) -> usize {
11-
(start_index + task_number) % num_urls
12-
}
6+
pub struct RouterInfo {
7+
pub task_number: usize,
8+
pub urls: Vec<Url>,
9+
pub stage_seed: usize,
10+
}
1311

14-
/// Returns the url of the worker that should execute the task.
15-
fn route_task(&self, start_index: usize, task_number: usize, urls: &[Url]) -> Url {
16-
urls[self.route_fn(start_index, task_number, urls.len())].clone()
12+
/// Allows users to route tasks to worker nodes.
13+
pub trait TaskRouter {
14+
/// Returns the url of the worker that should execute the task. The default implementation
15+
/// assigns tasks to workers round-robin starting from stage_seed.
16+
fn route(&self, router_info: RouterInfo) -> Url {
17+
router_info.urls[router_info.stage_seed + router_info.task_number % router_info.urls.len()]
18+
.clone()
1719
}
1820
}
1921

0 commit comments

Comments
 (0)