Skip to content

Commit 6c520b7

Browse files
committed
Add affinity key to interface.
1 parent 669f262 commit 6c520b7

7 files changed

Lines changed: 35 additions & 13 deletions

File tree

src/execution_plans/benchmarks/shuffle_bench.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,11 @@ impl ShuffleBench {
131131
num: 0,
132132
plan: None,
133133
tasks: (0..self.producer_tasks)
134-
.map(|i| ExecutionTask {
135-
url: Some(Url::parse(&format!("http://localhost:{i}")).unwrap()),
134+
.map(|i| {
135+
ExecutionTask::new(
136+
Some(Url::parse(&format!("http://localhost:{i}")).unwrap()),
137+
None,
138+
)
136139
})
137140
.collect(),
138141
};

src/execution_plans/distributed.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,15 @@ impl DistributedExec {
130130
.tasks
131131
.iter()
132132
.enumerate()
133-
.map(|(i, _)| {
133+
.map(|(i, task)| {
134134
let url = task_router.route(RouterInfo {
135135
task_number: i,
136136
urls: urls.clone(),
137137
stage_seed: start_idx,
138+
affinity_key: task.affinity_key.clone(),
138139
});
139-
let execution_task = ExecutionTask {
140-
url: Some(url.clone()),
141-
};
140+
let execution_task =
141+
ExecutionTask::new(Some(url.clone()), task.affinity_key.clone());
142142
let request = SetPlanRequest {
143143
plan_proto: bytes.clone(),
144144
task_count: stage.tasks.len() as _,

src/execution_plans/network_coalesce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ impl NetworkCoalesceExec {
120120
query_id,
121121
num,
122122
plan: Some(input),
123-
tasks: vec![ExecutionTask { url: None }; input_task_count],
123+
tasks: vec![ExecutionTask::default(); input_task_count],
124124
},
125125
worker_connections: WorkerConnectionPool::new(input_task_count),
126126
metrics_collection: Default::default(),

src/execution_plans/network_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ impl NetworkShuffleExec {
162162
query_id,
163163
num,
164164
plan: Some(transformed.data),
165-
tasks: vec![ExecutionTask { url: None }; input_task_count],
165+
tasks: vec![ExecutionTask::default(); input_task_count],
166166
},
167167
worker_connections: WorkerConnectionPool::new(input_task_count),
168168
properties: input.properties().clone(),

src/networking/task_router.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,17 @@ use url::Url;
55

66
pub struct RouterInfo {
77
pub task_number: usize,
8-
pub urls: Vec<Url>,
98
pub stage_seed: usize,
9+
pub urls: Vec<Url>,
10+
pub affinity_key: Option<Vec<u8>>,
1011
}
1112

12-
/// Allows users to route tasks to worker nodes.
13+
/// Allows users to consistently route tasks to URLs. Defining a custom routing function may be
14+
/// useful for users that want to ensure that e.g. repeated file reads are directed to the same
15+
/// physical machines to allow for proper caching rather than repeatedly reading from object storage.
16+
///
17+
/// To implement this, include routing information as bytes in `affinity_key`. These bytes can
18+
/// then be hashed to consistently point to a specific URL.
1319
pub trait TaskRouter {
1420
/// Returns the url of the worker that should execute the task. The default implementation
1521
/// assigns tasks to workers round-robin starting from stage_seed.

src/protobuf/distributed_codec.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,9 @@ pub struct ExecutionTaskProto {
344344
/// unassigned.
345345
#[prost(string, optional, tag = "1")]
346346
pub url_str: Option<String>,
347+
/// An optional affinity key. Used in the task router for consistent routing.
348+
#[prost(bytes, optional, tag = "2")]
349+
pub affinity_key: Option<Vec<u8>>,
347350
}
348351

349352
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -499,6 +502,7 @@ fn encode_tasks(tasks: &[ExecutionTask]) -> Vec<ExecutionTaskProto> {
499502
.iter()
500503
.map(|task| ExecutionTaskProto {
501504
url_str: task.url.as_ref().map(|v| v.to_string()),
505+
affinity_key: task.affinity_key.clone(),
502506
})
503507
.collect()
504508
}
@@ -514,6 +518,7 @@ fn decode_tasks(tasks: Vec<ExecutionTaskProto>) -> Result<Vec<ExecutionTask>, Da
514518
Url::parse(&u).map_err(|_| internal_datafusion_err!("Invalid URL: {u}"))
515519
})
516520
.transpose()?,
521+
affinity_key: task.affinity_key,
517522
})
518523
})
519524
.collect()

src/stage.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,19 @@ pub struct Stage {
8181
pub tasks: Vec<ExecutionTask>,
8282
}
8383

84-
#[derive(Debug, Clone)]
84+
#[derive(Debug, Clone, Default)]
8585
pub struct ExecutionTask {
8686
/// The url of the worker that will execute this task. A None value is interpreted as
8787
/// unassigned.
8888
pub(crate) url: Option<Url>,
89+
// Affinity key that can be used for consistent routing with the task router.
90+
pub(crate) affinity_key: Option<Vec<u8>>,
91+
}
92+
93+
impl ExecutionTask {
94+
pub(crate) fn new(url: Option<Url>, affinity_key: Option<Vec<u8>>) -> Self {
95+
Self { url, affinity_key }
96+
}
8997
}
9098

9199
#[derive(Debug, Clone, PartialEq)]
@@ -118,7 +126,7 @@ impl Stage {
118126
query_id,
119127
num,
120128
plan: Some(plan),
121-
tasks: vec![ExecutionTask { url: None }; n_tasks],
129+
tasks: vec![ExecutionTask::default(); n_tasks],
122130
}
123131
}
124132
}
@@ -439,7 +447,7 @@ pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
439447
query_id: Default::default(),
440448
num: max_num + 1,
441449
plan: Some(plan.clone()),
442-
tasks: vec![ExecutionTask { url: None }],
450+
tasks: vec![ExecutionTask::default()],
443451
};
444452
all_stages.insert(0, &head_stage);
445453

0 commit comments

Comments
 (0)