Skip to content

Commit 01070e6

Browse files
Vincent-Wooklion26reswqaRocMarshal
authored
[FLINK-39275][docs] Fix some issues in the "Performance Tuning" document (#27893)
- Add anchor links to performance tuning sections - Update configuration and add MultiJoin optimization details Co-authored-by: klion26 <qcx978132955@gmail.com> Co-authored-by: Weijie Guo <reswqa@163.com> Co-authored-by: Yuepeng Pan <panyuepeng@apache.org>
1 parent 9b3037b commit 01070e6

1 file changed

Lines changed: 125 additions & 13 deletions

File tree

docs/content.zh/docs/dev/table/tuning.md

Lines changed: 125 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
---
2-
title: "Performance Tuning"
32
title: "性能调优"
43
weight: 11
54
type: docs
@@ -25,16 +24,19 @@ specific language governing permissions and limitations
2524
under the License.
2625
-->
2726

28-
# Performance Tuning
27+
<a name="performance-tuning"></a>
28+
29+
# 性能调优
2930

3031
SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。
3132

3233
在这一页,我们将介绍一些实用的优化选项以及流式聚合和普通连接的内部原理,它们在某些情况下能带来很大的提升。
3334

3435
{{< hint info >}}
35-
目前 [分组聚合] ({{< ref "docs/sql/reference/queries/group-agg" >}}) 和 [窗口表值函数聚合]({{< ref "docs/sql/reference/queries/window-agg" >}}) (会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
36+
目前 [分组聚合]({{< ref "docs/sql/reference/queries/group-agg" >}}) 和 [窗口表值函数聚合]({{< ref "docs/sql/reference/queries/window-agg" >}}) (会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
3637
{{< /hint >}}
3738

39+
<a name="minibatch-aggregation"></a>
3840

3941
## MiniBatch 聚合
4042

@@ -49,8 +51,8 @@ MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子
4951
默认情况下,对于无界聚合算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled``table.exec.mini-batch.allow-latency``table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
5052

5153
{{< hint info >}}
52-
MiniBatch optimization is always enabled for [Window TVF Aggregation]({{< ref "docs/sql/reference/queries/window-agg" >}}), regardless of the above configuration.
53-
Window TVF aggregation buffer records in [managed memory]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory) instead of JVM Heap, so there is no risk of overloading GC or OOM issues.
54+
MiniBatch 优化对于 [Window TVF 聚合]({{< ref "docs/sql/reference/queries/window-agg" >}})始终启用,不受上述配置影响。
55+
Window TVF 聚合将记录缓冲在[托管内存]({{< ref "docs/deployment/memory/mem_setup_tm">}}#managed-memory)中,而非 JVM 堆内存,因此不存在 GC 过载或 OOM 的风险。
5456
{{< /hint >}}
5557

5658
下面的例子显示如何启用这些选项。
@@ -97,6 +99,8 @@ configuration.set("table.exec.mini-batch.size", "5000") # the maximum number of
9799
{{< /tab >}}
98100
{{< /tabs >}}
99101

102+
<a name="local-global-aggregation"></a>
103+
100104
## Local-Global 聚合
101105

102106
Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:
@@ -124,12 +128,12 @@ GROUP BY color
124128
TableEnvironment tEnv = ...;
125129

126130
// access flink configuration
127-
Configuration configuration = tEnv.getConfig().getConfiguration();
131+
TableConfig configuration = tEnv.getConfig();
128132
// set low-level key-value options
129-
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
130-
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
131-
configuration.setString("table.exec.mini-batch.size", "5000");
132-
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
133+
configuration.set("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
134+
configuration.set("table.exec.mini-batch.allow-latency", "5 s");
135+
configuration.set("table.exec.mini-batch.size", "5000");
136+
configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
133137
```
134138
{{< /tab >}}
135139
{{< tab "Scala" >}}
@@ -162,6 +166,8 @@ configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE") # enable tw
162166
{{< /tab >}}
163167
{{< /tabs >}}
164168

169+
<a name="split-distinct-aggregation"></a>
170+
165171
## 拆分 distinct 聚合
166172

167173
Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。
@@ -198,7 +204,9 @@ GROUP BY day
198204

199205
注意:上面是可以从这个优化中受益的最简单的示例。除此之外,Flink 还支持拆分更复杂的聚合查询,例如,多个具有不同 distinct key (例如 `COUNT(DISTINCT a), SUM(DISTINCT b)` )的 distinct 聚合,可以与其他非 distinct 聚合(例如 `SUM``MAX``MIN``COUNT` )一起使用。
200206

201-
<span class="label label-danger">注意</span> 当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。
207+
{{< hint info >}}
208+
当前,拆分优化不支持包含用户定义的 AggregateFunction 聚合。
209+
{{< /hint >}}
202210

203211
下面的例子显示了如何启用拆分 distinct 聚合优化。
204212

@@ -231,6 +239,8 @@ t_env.get_config().set("table.optimizer.distinct-agg.split.enabled", "true") # e
231239
{{< /tab >}}
232240
{{< /tabs >}}
233241

242+
<a name="use-filter-modifier-on-distinct-aggregates"></a>
243+
234244
## 在 distinct 聚合上使用 FILTER 修饰符
235245

236246
在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 `CASE WHEN`,例如:
@@ -259,6 +269,8 @@ GROUP BY day
259269

260270
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 `user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
261271

272+
<a name="minibatch-regular-joins"></a>
273+
262274
## MiniBatch Regular Joins
263275

264276
默认情况下,regular join 算子是逐条处理输入的记录,即:(1)根据当前输入记录的 join key 关联对方状态中的记录,(2)根据当前记录写入或者撤回状态中的记录,(3)根据当前的输入记录和关联到的记录输出结果。
@@ -288,7 +300,101 @@ ON a.id = b.id
288300

289301
默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled``table.exec.mini-batch.allow-latency``table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
290302

291-
{{< top >}}
303+
<a name="multiple-regular-joins"></a>
304+
305+
## Multiple Regular Joins
306+
307+
{{< label Streaming >}}
308+
309+
在流处理场景中,包含多个非时态 Regular Join 的 Flink 作业,往往因状态过大而频繁出现运行不稳定和性能下降的问题。究其根本,是由于多个 Join 串联后产生的中间状态,往往远超原始输入数据本身的规模。为此,Flink 2.1 引入了全新的 Multi Join 算子,这一优化专为存在记录膨胀和大量中间状态的 Join 流水线而设计,旨在大幅压缩状态规模、提升整体性能。该算子通过同时处理多个输入流来完成跨表 Join,从根本上消除了在多表 Join 链路中存储中间状态的必要性。这种“零中间状态”的处理方式以降低状态规模为核心目标,在某些场景下可显著减少资源消耗、提升运行稳定性。当然,这一方案本质上是一种以算力换存储的权衡取舍,中间状态不再持久化保存,而是在需要时按需重新计算。
310+
311+
在大多数 Join 操作中,相当一部分处理时间都花费在从状态中读取记录上。MultiJoin 算子的效率在很大程度上取决于中间状态的大小以及公共 Join Key 的选择性。在一种常见场景中——即流水线出现记录膨胀(每次 Join 产生的数据量和记录数均多于上一次)时,MultiJoin 算子的优势更为突出。这是因为它使算子所需操作的状态始终保持在较小的规模,从而让算子运行更加稳定。即便某条 Join 链路实际产生的状态比原始记录还要小,MultiJoin 算子整体上仍然占用更少的状态。不过在这种特殊情况下,二元 Join 反而可能表现更好,因为最终几个 Join 所需操作的状态本身已经很小。
312+
313+
<a name="the-multijoin-operator"></a>
314+
315+
### MultiJoin 算子
316+
317+
MultiJoin 算子的主要优势:
318+
319+
1) 状态规模大幅缩减:得益于零中间状态机制,状态占用显著降低。
320+
2) 链式 Join 性能提升:在存在记录膨胀的场景下,整体处理性能得到明显改善。
321+
3) 稳定性增强:状态随处理记录数呈线性增长,而非像二元 Join 那样呈多项式级增长。
322+
323+
此外,使用 MultiJoin 替代二元 Join 的流水线通常具有更快地初始化和故障恢复速度,这得益于更小的状态规模以及更少的算子节点。
324+
325+
<a name="when-to-enable-the-multijoin"></a>
326+
327+
### 何时启用 MultiJoin?
328+
329+
如果作业中存在多个 Join 共享至少一个公共 Join Key 的 Join 操作,并且中间 Join 产生的中间状态比原始输入数据源还要大,建议考虑开启 MultiJoin 算子。
330+
331+
推荐使用场景:
332+
- 公共 Join Key 具有较高选择性(即每个键对应的记录数较少)
333+
- 包含多个链式 Join 且中间状态很大的 SQL 语句
334+
- 公共 Join Key 没有明显的数据倾斜
335+
- Join 操作产生了大量状态(状态规模达 50 GB 及以上)
336+
337+
如果公共 Join Key 的选择性较低(即大量记录共享相同键值),MultiJoin 算子所需的中间状态重新计算将对性能产生严重影响。在此类场景下,建议使用二元 Join,因为二元 Join 会利用所有 Join Key 对数据进行分区,从而避免重算影响性能。
338+
339+
<a name="how-to-enable-the-multijoin"></a>
340+
341+
### 如何启用 MultiJoin?
342+
343+
要对所有符合条件的 Join 全局启用此优化,请设置以下配置:
344+
345+
```sql
346+
SET 'table.optimizer.multi-join.enabled' = 'true';
347+
```
348+
349+
或者,你可以使用 `MULTI_JOIN` hint 为特定表启用 MultiJoin 算子:
350+
351+
```sql
352+
SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
353+
JOIN t2 ON t1.id = t2.id
354+
JOIN t3 ON t1.id = t3.id;
355+
```
356+
357+
Hint 方式允许你有针对性地将 MultiJoin 优化应用于特定查询块,而无需全局启用。有关 `MULTI_JOIN` hint 的更多详情,请参阅 [Join Hints]({{< ref "docs/sql/reference/queries/hints" >}}#multi_join)。需要注意的是,配置项的优先级高于 hint。
358+
359+
重要提示:该功能目前处于实验性阶段,后续可能会进行优化或引入破坏性变更。当前仅支持流式 INNER/LEFT Join。由于记录分区的机制,Join 条件之间至少需要一个公共 Key,详见下方示例:
360+
361+
- 支持:A JOIN B ON A.key = B.key JOIN C ON A.key = C.key(按 key 分区)
362+
- 支持:A JOIN B ON A.key = B.key JOIN C ON B.key = C.key(通过传递性 key 分区)
363+
- 不支持:A JOIN B ON A.key1 = B.key1 JOIN C ON B.key2 = C.key2(没有单个 key 可以将 A、B、C 同时分区到单个算子中。这将被拆分为多个 MultiJoin 算子)
364+
365+
<a name="multijoin-operator-example---benchmark"></a>
366+
367+
### MultiJoin 算子示例 - 基准测试
368+
369+
以下是默认二元 Join 与 MultiJoin 算子之间的 10-way 基准测试对比。可以在第一部分观察到中间状态的数量,第二部分观察算子达到 100% 忙碌时处理的记录数,第三部分是 checkpoint。
370+
371+
{{< img src="/fig/table-streaming/multijoin_operator.png" height="100%" >}}
372+
373+
对于上面这个涉及记录放大的 10-way join,可以看到有显著提升。这里有一些大概数据:
374+
375+
- 性能:当两者都处于 100% 忙碌时,处理记录量提升 2 倍到超过 100 倍。
376+
- 状态大小:中间状态大小缩小 3 倍到超过 1000 倍。
377+
378+
MultiJoin 算子的总状态始终更小。在这种情况下,初始性能相同,但随着中间状态增长,二元 Join 的性能逐渐下降,而 Multi Join 保持稳定并表现更出色。
379+
380+
这个 10-way Join 通用基准测试使用以下配置运行:每个 tenant_id 1 条记录(高选择性),10 个 upsert Kafka topic,并行度 10,每个 topic 每秒 1 条记录。我们使用了基于 RocksDB 的未对齐 checkpoint 和增量 checkpoint。每个作业运行在 8GB 进程内存的 TaskManager 中,1GB 堆外内存和 20% 网络内存。JobManager 有 4GB 进程内存。主机包含 M1 处理器芯片,32GB RAM 和 1TB SSD。sink 使用 blackhole connector,因此我们只对 Join 进行基准测试。用于生成基准测试数据的 SQL 结构如下:
381+
382+
```sql
383+
INSERT INTO JoinResultsMJ
384+
SELECT *all fields*
385+
FROM TenantKafka t
386+
LEFT JOIN SuppliersKafka s ON t.tenant_id = s.tenant_id AND ...
387+
LEFT JOIN ProductsKafka p ON t.tenant_id = p.tenant_id AND ...
388+
LEFT JOIN CategoriesKafka c ON t.tenant_id = c.tenant_id AND ...
389+
LEFT JOIN OrdersKafka o ON t.tenant_id = o.tenant_id AND ...
390+
LEFT JOIN CustomersKafka cust ON t.tenant_id = cust.tenant_id AND ...
391+
LEFT JOIN WarehousesKafka w ON t.tenant_id = w.tenant_id AND ...
392+
LEFT JOIN ShippingKafka sh ON t.tenant_id = sh.tenant_id AND ...
393+
LEFT JOIN PaymentKafka pay ON t.tenant_id = pay.tenant_id AND ...
394+
LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
395+
```
396+
397+
<a name="delta-joins"></a>
292398

293399
## Delta Joins
294400

@@ -301,6 +407,8 @@ ON a.id = b.id
301407
1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。
302408
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/engine-flink/delta-joins/#flink-version-support)
303409

410+
<a name="working-principle"></a>
411+
304412
### 工作原理
305413

306414
在 Flink 中,regular join 将来自两个输入端的所有输入数据存储在状态中,以确保当对侧的数据到达时,能够正确地匹配对应的记录。
@@ -309,6 +417,8 @@ ON a.id = b.id
309417

310418
{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
311419

420+
<a name="important-configurations"></a>
421+
312422
### 关键参数
313423

314424
Delta join 优化默认启用。您可以通过设置以下配置手动禁用此功能:
@@ -327,7 +437,7 @@ SET 'table.optimizer.delta-join.strategy' = 'NONE';
327437

328438
详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
329439

330-
<a name="supported-features-and-limitations" />
440+
<a name="supported-features-and-limitations"></a>
331441

332442
### 支持的功能和限制
333443

@@ -346,3 +456,5 @@ SET 'table.optimizer.delta-join.strategy' = 'NONE';
346456
4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。
347457
5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。
348458
6. 所有 project 和 filter 都不能包含**非确定性函数**
459+
460+
{{< top >}}

0 commit comments

Comments
 (0)