一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
文章目录
- 你可以学到啥
- 测试代码
- 背景知识
- SQL转变流程图
- 问题
你可以学到啥
- SQL如何一步步变成执行计划的
- 有哪些优化器,哪些优化规则
- calcite 和flink 如何结合的
测试代码
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);
背景知识
需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识
SQL转变流程图
SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
问题
- 问题1,FlinkBatchProgram
所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
-
问题2,calcite 优化器和flink 如何结合的
logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
剩下的优化器HepPlanner,HepPlanner 完全使用规则。 -
问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的 -
问题4,物理计划如何生成执行代码的?
BatchPhysicalTableSourceScan 类
class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
- 问题5,为啥aa_user 表被广播,哪里实现的?
BatchPhysicalHashJoinRule 规则实现的
核心代码
val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}
主要就是实现
def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}
最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类
class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts* TableScan RelNode* @param mq* RelMetadataQuery instance* @param index* the index of the given column* @return* the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}
ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数
private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}