如何给 Flink Print SQL Connector 添加随机取样功能?

Flink 提供了 Print SQL Connector 可以让我们非常方便的把数据打印到标准输出.有助于我们测试 SQL 任务,检验数据的正确性.

但是在生产环境中,上游的数据量是非常大的,如果直接把数据输出的话,可能会把标准输出文件打满,造成页面卡死的情况,反而不利于我们观测数据,所以我们可以对 Print SQL Connector 进行简单的改造,加一个随机取样的参数控制数据输出.

Flink 提供了 Print SQL Connector 可以让我们非常方便的把数据打印到标准输出.有助于我们测试 SQL 任务,检验数据的正确性.

但是在生产环境中,上游的数据量是非常大的,如果直接把数据输出的话,可能会把标准输出文件打满,造成页面卡死的情况,反而不利于我们观测数据,所以我们可以对 Print SQL Connector 进行简单的改造,加一个随机取样的参数控制数据输出.

直接把 Print SQL Connector 相关的代码复制出来.

PrintRateTableSinkFactory

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package flink.stream.connector.print;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

import static flink.stream.connector.print.PrintConnectorOptions.PRINT_RATE;
import static org.apache.flink.connector.print.table.PrintConnectorOptions.PRINT_IDENTIFIER;
import static org.apache.flink.connector.print.table.PrintConnectorOptions.STANDARD_ERROR;

/**
 * Print table sink factory writing every row to the standard output or standard error stream. It is
 * designed for: - easy test for streaming job. - very useful in production debugging.
 *
 * <p>Four possible format options: {@code PRINT_IDENTIFIER}:taskId> output <- {@code
 * PRINT_IDENTIFIER} provided, parallelism > 1 {@code PRINT_IDENTIFIER}> output <- {@code
 * PRINT_IDENTIFIER} provided, parallelism == 1 taskId> output <- no {@code PRINT_IDENTIFIER}
 * provided, parallelism > 1 output <- no {@code PRINT_IDENTIFIER} provided, parallelism == 1
 *
 * <p>output string format is "$RowKind[f0, f1, f2, ...]", example is: "+I[1, 1]".
 */
@Internal
public class PrintRateTableSinkFactory implements DynamicTableSinkFactory {
    // 简单修改
    public static final String IDENTIFIER = "print-rate";

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet<>();
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(PRINT_IDENTIFIER);
        options.add(STANDARD_ERROR);
        options.add(FactoryUtil.SINK_PARALLELISM);
        // 添加到 options
        options.add(PRINT_RATE);
        return options;
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new PrintSink(
                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
                context.getCatalogTable().getPartitionKeys(),
                options.get(PRINT_IDENTIFIER),
                options.get(STANDARD_ERROR),
                options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null),
                options.get(PRINT_RATE));
    }

    private static class PrintSink implements DynamicTableSink, SupportsPartitioning {

        private final DataType type;
        private String printIdentifier;
        private final boolean stdErr;
        private final @Nullable Integer parallelism;
        private final List<String> partitionKeys;
        private Map<String, String> staticPartitions = new LinkedHashMap<>();
        private @Nullable Float printRate;

        private PrintSink(
                DataType type,
                List<String> partitionKeys,
                String printIdentifier,
                boolean stdErr,
                Integer parallelism,
                Float printRate) {
            this.type = type;
            this.partitionKeys = partitionKeys;
            this.printIdentifier = printIdentifier;
            this.stdErr = stdErr;
            this.parallelism = parallelism;
            this.printRate = printRate;
        }

        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }

        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
            DataStructureConverter converter = context.createDataStructureConverter(type);
            staticPartitions.forEach(
                    (key, value) -> {
                        printIdentifier = null != printIdentifier ? printIdentifier + ":" : "";
                        printIdentifier += key + "=" + value;
                    });
            return SinkFunctionProvider.of(
                    new RowDataPrintFunction(converter, printIdentifier, stdErr, printRate), parallelism);
        }

        @Override
        public DynamicTableSink copy() {
            return new PrintSink(type, partitionKeys, printIdentifier, stdErr, parallelism, printRate);
        }

        @Override
        public String asSummaryString() {
            return "Print to " + (stdErr ? "System.err" : "System.out");
        }

        @Override
        public void applyStaticPartition(Map<String, String> partition) {
            // make it a LinkedHashMap to maintain partition column order
            staticPartitions = new LinkedHashMap<>();
            for (String partitionCol : partitionKeys) {
                if (partition.containsKey(partitionCol)) {
                    staticPartitions.put(partitionCol, partition.get(partitionCol));
                }
            }
        }
    }

    /**
     * Implementation of the SinkFunction converting {@link RowData} to string and passing to {@link
     * PrintSinkFunction}.
     */
    private static class RowDataPrintFunction extends RichSinkFunction<RowData> {

        private static final long serialVersionUID = 1L;

        private final DataStructureConverter converter;
        private final PrintSinkOutputWriter<String> writer;
        private final Float printRate;

        private RowDataPrintFunction(
                DataStructureConverter converter, String printIdentifier, boolean stdErr, Float printRate) {
            this.converter = converter;
            this.writer = new PrintSinkOutputWriter<>(printIdentifier, stdErr);
            this.printRate = printRate;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
            writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
        }

        @Override
        public void invoke(RowData value, Context context) {
            if (ThreadLocalRandom.current().nextFloat() < this.printRate) {
                Object data = converter.toExternal(value);
                assert data != null;
                writer.write(data.toString());
            }
        }
    }
}

PrintConnectorOptions

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package flink.stream.connector.print;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;

import static org.apache.flink.configuration.ConfigOptions.key;

/** Options for the Print sink connector. */
@PublicEvolving
public class PrintConnectorOptions {

    public static final ConfigOption<String> PRINT_IDENTIFIER =
            key("print-identifier")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "Message that identify print and is prefixed to the output of the value.");

    public static final ConfigOption<Boolean> STANDARD_ERROR =
            key("standard-error")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "True, if the format should print to standard error instead of standard out.");

    public static final ConfigOption<Float> PRINT_RATE =
            key("print-rate")
                    .floatType()
                    .defaultValue(0.0001F)
                    .withDescription(
                            "Controls the printing rate of data");

    private PrintConnectorOptions() {}
}

首先在 PrintConnectorOptions 配置里面添加 PRINT_RATE 属性,用来控制随机取样,默认值是 0.0001.

然后在 PrintRateTableSinkFactory 中把 connector 的唯一标识符 IDENTIFIER 改成 print-rate,其实不改也是可以的,只是为了和默认的 Print 做区分.

在 PrintRateTableSinkFactory#optionalOptions 方法里面加入我们添加的属性 PRINT_RATE.

@Override
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(PRINT_IDENTIFIER);
        options.add(STANDARD_ERROR);
        options.add(FactoryUtil.SINK_PARALLELISM);
        options.add(PRINT_RATE);
        return options;
    }

然后把这个参数传入到下面的 PrintSink 最后传入到 RowDataPrintFunction 里面,最终在 invoke 方法里面添加随机取样的逻辑.

@Override
        public void invoke(RowData value, Context context) {
            if (ThreadLocalRandom.current().nextFloat() < this.printRate) {
                Object data = converter.toExternal(value);
                assert data != null;
                writer.write(data.toString());
            }
        }

到这里代码就修改完了,非常简单,一共不到 10 行代码.

最后还要把 PrintRateTableSinkFactory 添加到 META-INF/services 下的配置文件中,因为 Flink 是用 Java SPI 机制加载这些 connector 的.

image-20220517171203441

最后来测试一下修改后的 connector,先把打完的 jar 包上传到服务器的 flink/lib 目录下面.创建初始化脚本和 SQL 文件.

init.sql

SET 'parallelism.default' = '8';
SET 'taskmanager.memory.network.fraction' = '0.01';
SET 'pipeline.name' = 'test-print-rate';
SET 'sql-client.verbose' = 'true';

test_print_rate.sql

CREATE TABLE kafka_table (
name string,
age int,
city string,
ts BIGINT,
proctime as PROCTIME(),
rt as TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND
)
WITH (
    'connector' = 'kafka',
    'topic' = 'test',
    'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  
    'properties.group.id' = 'jason_flink_test', 
    'scan.startup.mode' = 'latest-offset', 
    'format' = 'json', 
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'false' 
);


CREATE TABLE print_table
(
f1 TIMESTAMP(3),
f2 TIMESTAMP(3),
f3 BIGINT,
f4 STRING
)
WITH (
'connector' = 'print-rate',
'standard-error' = 'false',
'print-rate' = '0.01',
'sink.parallelism' = '4'
);

insert into print_table
select
  window_start,
  window_end,
  count(name),
  name
from table(HOP(table kafka_table,descriptor(proctime),interval '30' second, interval '1' HOUR))
group by window_start,
  window_end,
  name;

这里用的是上面改造的 print-rate connector,可以通过 ‘print-rate’ = ‘xxx’ 来控制随机取样.

提交任务

sql-client.sh -i init.sql -f test_print_rate.sql

image-20220517173556340

任务提交成功后,先向 kafka 里写入数据,然后到 TM 的 Stdout 里面看下打印的数据.

image-20220517173722413

可以看到数据确实做了随机取样,因为如果用默认的 Print Connector 的话,每条数据都会打印出来,因为 key 都是不一样的.这样打印的数据就会减少很多,当上游数据量非常大时,也不会造成什么问题.

0 0 投票数
文章评分

本文来自投稿,不代表从大数据到人工智能立场,如若转载,请注明出处:https://lrting.top/backend/bigdata/6687/

(0)
上一篇 2022-06-20 23:00
下一篇 2022-06-21 20:25

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x