【Flink】Flink源码阅读笔记(19)-FlinkSQL中流表Join的实现
1.概述
转载:
在使⽤ SQL 进⾏数据分析的过程中,关联查询是经常要使⽤到的操作。在传统的 OLTP 和 OLAP 领域中,关联查询的数据集都是有界的,因此可以依赖于缓存有界的数据集进⾏查询。但是在 Streaming SQL 中,针对 Stream Join Stream 的情况,由于关联查询的两侧都是连续⽆界的数据流,传统数据表中 Join 操作的实现和优化⽅式可能并不完全适⽤。在这篇⽂章中,我们将介绍双流 Join ⾯临的挑战,并对 Flink SQL 中双流 Join 的具体实现机制进⾏分析。
2.双流 Join 的挑战
在传统的数据库或批处理场景中 ,关联查询的数据集都是有限的,因此可以依赖于缓存有界的数据集,使⽤诸如 Nested-Loop Join,Sort-Merged Join 或者 Hash Join 等⽅法进⾏匹配查询。但是在 Streaming SQL 中,两个数据流的关联查询主要⾯临如下两个问题:⼀⽅⾯,数据流是⽆限的,缓存数据对 long-running 的任务⽽⾔会带来较⾼的存储和查询压⼒;另⼀⽅⾯,两侧数据流中消息到达的时间存在不⼀
致的情况,可能造成关联结果的缺失。
对于上述的第⼀个问题,为了保证关联结果的正确性,需要将数据流中所有历史数据缓存下来。随着
两个数据流中数据源源不断到来,缓存历史数据带来的开销越来越⼤,且每⼀条新到达的消息都会激发对另⼀侧历史数据的查询。为了解决该问题,⼀种⽅法是通过时间窗⼝将关联的数据范围限制在特定的时间范围内,即 Window Join(关于时间窗⼝可以参考之前的⽂章);另⼀种⽅法是,在存储开销和关联准确度⽅⾯做⼀下权衡,在缓存的历史数据上增加⽣存时间的限制,这样可以避免缓存的数据⽆限增长,但相应地可能会造成准确度的降低。
上述第⼆个问题,主要针对的是外连接的情况。由于两侧数据到达时间的不确定性,对于特定消息,可能出现 t1 时刻不存在匹配记录,⽽t2 (t2 > t1) 时刻存在匹配记录的情况。对于外连接,要求在不存在关联结果的情况下返回 NULL 值。因此为了保证关联结果的正确性,⼀种⽅式是通过时间窗⼝限制关联的数据范围,但这样就要求在窗⼝结束时才输出结果,会导致输出延迟;另⼀种⽅式是采取“撤销-更正”的⽅式,先输出 NULL 值,在后续关联记录到达时再撤销已输出的记录,修正为关联的正确结果,其缺点是会造成输出记录数的放⼤。
从上述的分析可以看出,时间窗⼝在关联查询中通过限制关联数据的范围,可以部分程度上解决 Streaming Join ⾯临的问题,其基本思路是将⽆限的数据流切分为有限的时间窗⼝。但时间窗⼝关联并不适合所有的情况,很多时候两个数据流的关联查询并不能限定在特定的时间窗⼝内;此外,时间
窗⼝关联还存在输出延迟的情况。
本⽂的后续部分将对 Flink SQL 中普通双流 Join 的实现机制加以介绍,Window Join 的实现机制将在后续的⽂章中进⾏分析。
3.双流 Join 的实现机制
⼀条 Join 语句的转换
⾸先,我们以⼀条简单的 Join 语句为例,跟踪⼀条 Join 语句的变换过程。
-- table A('a1, 'a2, 'a3)
-- table B('b1, 'b2, 'b3)
SELECT a1, b1 FROM A JOIN B ON a1 = b1 and a2 > b2
上述的 SQL 语句在经过解析后,被转换为如下的逻辑计划:
LogicalProject(a1=[$0], b1=[$3])
+- LogicalJoin(condition=[AND(=($0, $3),>($1, $4))], joinType=[inner])
:- LogicalTableScan(table=[[A, source:[TestTableSource(a1, a2, a3)]]])
+- LogicalTableScan(table=[[B, source:[TestTableSource(b1, b2, b3)]]])
这份逻辑计划⾸先被转换为 Flink SQL 内部的 RelNode,即:
FlinkLogicalCalc(select=[a1, b1])
+- FlinkLogicalJoin(condition=[AND(=($0, $2),>($1, $3))], joinType=[inner])
:- FlinkLogicalCalc(select=[a1, a2])
:+- FlinkLogicalTableSourceScan(table=[[A, source:[TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+- FlinkLogicalCalc(select=[b1, b2])
+- FlinkLogicalTableSourceScan(table=[[B, source:[TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
此后,经过⼀系列优化规则被优化为最终的执⾏计划,如下:
Calc(select=[a1, b1])
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1),>(a2, b2))], select=[a1, a2, b1, b2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a1]])
:+- Calc(select=[a1, a2])
:+- TableSourceScan(table=[[A, source:[TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3])
+- Exchange(distribution=[hash[b1]])多表left join
+- Calc(select=[b1, b2])
+- TableSourceScan(table=[[B, source:[TestTableSource(b1, b2, b3)]]], fields=[b1, b2, b3])
⾄此,逻辑计划的优化阶段结束,进⼊物理计划⽣成的阶段。
Flink SQL 会为 StreamExecJoin 操作⽣成⼀个 TwoInputTransformation 变换,内部算⼦为 StreamingJoinOperator,⽤于在两个数据流中匹配关联记录;为 StreamExecExchange 操作⽣成⼀
个 PartitionTransformation 变换,⽤来确定上游算⼦输出的记录转发到下游算⼦的分区。
4.两个重要的变换规则
在逻辑计划优化的过程中,有两个重要的规则需要关注,分别是 StreamExecJoinRule 和 FlinkExpandConversionRule。
顾名思义,StreamExecJoinRule 主要⽤于将 FlinkLogicalJoin 转换为 StreamExecJoin。但是这个变换是有条件限制的,即 FlinkLogicalJoin 的关联条件中不包含时间窗⼝。⾸先来看⼀下这个规则的匹配条件:
class StreamExecJoinRule
extends RelOptRule(
operand(classOf[FlinkLogicalJoin],
operand(classOf[FlinkLogicalRel],any()),
operand(classOf[FlinkLogicalRel],any())),
"StreamExecJoinRule"){
override def matches(call: RelOptRuleCall): Boolean ={
val join: FlinkLogicalJoin = l(0)
//关联结果是否需要从右表投射数据,SEMI JOIN 和 ANTI JOIN 不需要选择右表的数据
if(!JoinType.projectsRight){
// SEMI/ANTI JOIN 总是被转换为 StreamExecJoin
// SEMI/ANTI JOIN 总是被转换为 StreamExecJoin
return true
}
val left: FlinkLogicalRel = l(1).asInstanceOf[FlinkLogicalRel]
val right: FlinkLogicalRel = l(2).asInstanceOf[FlinkLogicalRel]
val tableConfig = Context.unwrap(classOf[FlinkContext]).getTableConfig
val joinRowType = RowType
//左表不⽀持 Temporal Table
if(left.isInstanceOf[FlinkLogicalSnapshot]){
throw new TableException(
"Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table.")
}
//不⽀持 Temporal Table JOIN
if(right.isInstanceOf[FlinkLogicalSnapshot]||
return false
}
//从关联条件中提取 1)时间窗⼝边界 2)其它条件
val (windowBounds, remainingPreds)= actWindowBoundsFromPredicate(
joinRowType,
tableConfig)
//存在窗⼝,则不适⽤于该规则
if(windowBounds.isDefined){
return false
}
//普通关联条件不能访问时间属性
// remaining predicate must not access time attributes
val remainingPredsAccessTime = remainingPreds.isDefined &&
WindowJoinUtil., joinRowType)
//RowTime 属性不能出现在普通 join 的关联条件中
//@see stackoverflow/questions/57181771/flink-rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join
val rowTimeAttrInOutput = FieldList
.exists(f => FlinkTypeFactory.Type))
if(rowTimeAttrInOutput){
throw new TableException(
"Rowtime attributes must not be in the input rows of a regular join. "+
"As a workaround you can cast the time attributes of input tables to TIMESTAMP before.")
}
// joins require an equality condition
// or a conjunctive predicate with at least one equality condition
// and disable outer joins with non-equality predicates(see FLINK-5520)
// And do not accept a FlinkLogicalTemporalTableSourceScan as right input
!remainingPredsAccessTime
}
}
其基本逻辑就是,在普通的双流 Join 中不⽀持 Temporal Table,不⽀持时间窗⼝,不⽀持访问时间属
性。这⾥需要注意的⼀点是,在普通的双流Join 中,Flink 没法保证关联结果按照时间先后顺序提交,会破坏时间属性的顺序,因此在普通的双流 Join 中关联条件不⽀持时间属性。
StreamExecJoinRule 会将 FlinkLogicalJoin 转换为 StreamexecJoin,但相应地,需要先对 FlinkLogicalJoin 的两个输⼊进⾏变换。在这⾥,会将 FlinkRelDistribution 这个 trait 下推到输⼊算⼦中。
FlinkRelDistribution ⽤于确定上游算⼦结果转发到下游算⼦的分区信息。例如,如果关联条件中存在等值关联条件,那么就会按照对应的关联键进⾏哈希分区,确保相同键的记录被转发到相同的 Task 中,即 FlinkRelDistribution.hash;⽽如果关联条件中不存在等值条件,那么所有的记录只能被转发到同⼀个 Task 中,即 FlinkRelDistribution.SINGLETON。
class StreamExecJoinRule{
override def onMatch(call: RelOptRuleCall): Unit ={
val join: FlinkLogicalJoin = l(0)
val left = Left
val right = Right
//根据是否存在等值关联条件确定 FlinkRelDistribution
def toHashTraitByColumns(
columns:Collection[_ <: Number],
inputTraitSets: RelTraitSet): RelTraitSet ={
val distribution =if(columns.isEmpty){
FlinkRelDistribution.SINGLETON
}else{
FlinkRelDistribution.hash(columns)
}
inputTraitSets
.replace(FlinkConventions.STREAM_PHYSICAL)
.
replace(distribution)
}
val joinInfo = join.analyzeCondition()
val (leftRequiredTrait, rightRequiredTrait)=(
toHashTraitByColumns(joinInfo.leftKeys, TraitSet),
toHashTraitByColumns(joinInfo.rightKeys, TraitSet))
val providedTraitSet = place(FlinkConventions.STREAM_PHYSICAL)
//变换输⼊
val newLeft: RelNode = vert(left, leftRequiredTrait)
val newRight: RelNode = vert(right, rightRequiredTrait)
//⽣成 StreamExecJoin
val newJoin =new StreamExecJoin(
providedTraitSet,
newLeft,
newRight,
}
}
对 FlinkRelDistribution 的匹配变换规则在 FlinkExpandConversionRule 中。FlinkExpandConversionR
ule 的作⽤是处理 RelDistribution 和RelCollation 这两种 trait,其中 RelDistribution 描述数据的物理分布情况,RelCollation 描述排序情况(通常在 Batch 模式下应⽤在ORDER BY 语句中)。
在 FlinkExpandConversionRule 中会为⽬标 trait 包含 FlinkRelDistribution 的变换⽣成⼀个 StreamExecExchange:
class FlinkExpandConversionRule(flinkConvention: Convention)
extends RelOptRule(
operand(classOf[AbstractConverter],
operand(classOf[RelNode], any)),
"FlinkExpandConversionRule"){
override def matches(call: RelOptRuleCall): Boolean ={
// from trait 和 to trait 不⼀致
val toTraitSet = l(0).asInstanceOf[AbstractConverter].getTraitSet
val fromTraitSet = l(1).asInstanceOf[RelNode].getTraitSet
!fromTraitSet.satisfies(toTraitSet)