Hudi在构建流式数据湖方面具有领先地位。Flink作为真正的流处理引擎,与Hudi搭配是理所应当的事情了。但是目前Hudi MOR表压缩功能除了在线压缩以外,并不能通过SQL实现手动压缩。目前的实现方式为:
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.15-bundle_2.12-x.x.x.jar --path hdfs://xxx:9000/table
本文介绍如何扩展Flink引擎的SQL解析能力,使其具备直接使用SQL实现Hudi MOR表压缩的功能。修改后,通过下述Flink SQL即可实现Hudi MOR表压缩,(procedure call 参考Spark的call语法):
procedure call compact a with ('path'='hdfs://bigdata:9000/tmp/t1_20220810_6', 'schedule'='false');
本文基于Flink 1.15分支修改:https://github.com/apache/flink/tree/release-1.15
修改后的代码地址为:https://git.lrting.top/xiaozhch5/flink/-/tree/release-1.15-compile
修改SQL Parser
我们知道Flink 基于calcite解析SQL,所以根据Flink原有写法扩展即可。具体来说,修改地方如下。
新增SqlCallCompactTable类,类内容如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.sql.parser.ddl;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import javax.annotation.Nonnull;
import java.util.List;
import static java.util.Objects.requireNonNull;
/**
* Abstract class to call compact table like PROCEDURE CALL COMPACT [[catalogName.]
* dataBasesName].tableName
*/
public class SqlCallCompactTable extends SqlCall {
public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("PROCEDURE CALL COMPACT", SqlKind.OTHER_DDL);
protected final SqlIdentifier tableIdentifier;
protected final SqlNodeList propertyList;
public SqlCallCompactTable(
SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
super(pos);
this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
this.propertyList = requireNonNull(propertyList, "with property should no be null");
}
@Nonnull
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
public SqlIdentifier getTableName() {
return tableIdentifier;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("PROCEDURE CALL");
writer.keyword("COMPACT");
tableIdentifier.unparse(writer, leftPrec, rightPrec);
if (this.propertyList.size() > 0) {
writer.keyword("WITH");
SqlWriter.Frame withFrame = writer.startList("(", ")");
for (SqlNode property : propertyList) {
printIndent(writer);
property.unparse(writer, leftPrec, rightPrec);
}
writer.newlineAndIndent();
writer.endList(withFrame);
}
}
protected void printIndent(SqlWriter writer) {
writer.sep(",", false);
writer.newlineAndIndent();
writer.print(" ");
}
public SqlNodeList getPropertyList() {
return propertyList;
}
@Nonnull
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableIdentifier);
}
}
修改Parser.tdd
在imports中新增:
"org.apache.flink.sql.parser.ddl.SqlCallCompactTable"
在statementParserMethods中新增:
"SqlCallCompactTable()"
在parserImpls.ftl中新增:
SqlCallCompactTable SqlCallCompactTable() :
{
SqlParserPos pos;
SqlIdentifier tableIdentifier;
SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
<PROCEDURE> <CALL> <COMPACT> { pos = getPos(); }
tableIdentifier = CompoundIdentifier()
<WITH>
propertyList = TableProperties()
{
return new SqlCallCompactTable(
pos,
tableIdentifier,
propertyList);
}
}
新增CallCompactOperation
在flink-table-api-java项目中org/apache/flink/table/operations/目录下新增CallCompactOperation类,具体类的内容如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.operations;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
/** Call Compact Operation */
public class CallCompactOperation implements ModifyOperation {
private final ObjectIdentifier tableIdentifier;
private final QueryOperation child;
private final String basePath;
private final String schedule;
public CallCompactOperation(
ObjectIdentifier tableIdentifier,
QueryOperation child,
String basePath,
String schedule) {
this.tableIdentifier = tableIdentifier;
this.child = child;
this.basePath = basePath;
this.schedule = schedule;
}
public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}
@Override
public QueryOperation getChild() {
return this.child;
}
public String getBasePath() {
return this.basePath;
}
public String getSchedule() {
return schedule;
}
@Override
public <T> T accept(ModifyOperationVisitor<T> modifyOperationVisitor) {
return null;
}
@Override
public String asSummaryString() {
return "PROCEDURE CALL COMPACT TABLE: "
+ this.tableIdentifier.getCatalogName()
+ "."
+ this.tableIdentifier.getDatabaseName()
+ "."
+ this.tableIdentifier.getObjectName();
}
}
修改table planner
修改SqlToOperationConverter类新增convertCallCompact方法:
private Operation convertCallCompact(SqlCallCompactTable sqlCallCompactTable) {
String catalogName = this.catalogManager.getCurrentCatalog();
String databaseName = this.catalogManager.getCurrentDatabase();
SqlNodeList sqlNodeList = sqlCallCompactTable.getPropertyList();
List<SqlNode> sqlNodes = sqlNodeList.getList();
String basePath = "";
String schedule = "false";
for (SqlNode sqlNode : sqlNodes) {
if ("path".equals(((SqlTableOption) sqlNode).getKeyString())) {
basePath = ((SqlTableOption) sqlNode).getValueString();
} else if ("schedule".equals(((SqlTableOption) sqlNode).getKeyString())) {
schedule = ((SqlTableOption) sqlNode).getValueString();
}
}
return new CallCompactOperation(
ObjectIdentifier.of(
catalogName, databaseName, sqlCallCompactTable.getTableName().toString()),
null,
basePath,
schedule);
}
修改convertValidatedSqlNode方法,新增如下判断:
if (validated instanceof SqlCallCompactTable) {
return Optional.of(converter.convertCallCompact((SqlCallCompactTable) validated));
}
具体截图如下:
修改TableEnvironmentImpl
864行,修改executeInternal方法,新增如下判断条件:
if (operation instanceof CallCompactOperation) {
try {
String catalog = catalogManager.getCurrentCatalog();
String database = catalogManager.getCurrentDatabase();
String table =
((CallCompactOperation) operation).getTableIdentifier().getObjectName();
String basePath = ((CallCompactOperation) operation).getBasePath();
String schedule = ((CallCompactOperation) operation).getSchedule();
System.out.println("Call Compact on " + catalog + "." + database + "." + table);
FlinkCompactionConfig cfg = new FlinkCompactionConfig();
cfg.path = basePath;
cfg.schedule = "true".equals(schedule);
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
HoodieFlinkCompactor.AsyncCompactionService service =
new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
new HoodieFlinkCompactor(service).start(cfg.serviceMode);
} catch (Exception e) {
throw new ValidationException("Call Compact failed", e);
}
return TableResultImpl.TABLE_RESULT_OK;
}
具体截图如下:
新增hudi-flink包
修改flink-table-api-java项目pom文件,新增hudi-flink包:
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink</artifactId>
<version>${hudi.version)</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
Flink打包
将Flink重新打包之后,即可解析PROCEDURE CALL COMPACT语法。
示例
将打包好后的Flink放到测试环境,执行如下SQL,其中path表示表路径,schedule表示是否生成压缩计划,如果不指定schedule的话,默认为false。
procedure call compact a with ('path'='hdfs://bigdata:9000/tmp/t1_20220810_6', 'schedule'='false');
对已经生成压缩计划的hudi mor表执行压缩任务,我们不指定schedule参数,可以看到任务运行成功,并执行了压缩任务:
SQL提交脚本
https://git.lrting.top/xiaozhch5/flink-sql-submit
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/9188/