官方 | 使用自定义 RateLimitingStrategy 优化异步接收器的吞吐量!

介绍

在设计 Flink 数据处理作业时,关键关注点之一是最大化作业吞吐量。Sink吞吐量是一个至关重要的因素,因为它可以决定整个作业的吞吐量。我们通常希望在不使目的地超载的情况下,在接收器中获得尽可能高的写入速率。但是,由于影响目标性能的因素在作业的生命周期内是可变的,因此接收器需要动态调整其写入速率。根据接收器的目的地,使用不同的 RateLimitingStrategy 有助于调整写入速率。

这篇文章解释了如何通过在基于 AsyncSinkBase (FLIP-171)构建的连接器上配置自定义 RateLimitingStrategy 来优化接收器吞吐量。在下面的部分中,我们介绍了 AsyncSinkBase 和 RateLimitingStrategy 背后的设计逻辑,然后我们将带您完成速率限制策略的两个示例实现,特别是 CongestionControlRateLimitingStrategy 和 TokenBucketRateLimitingStrategy。

AsyncSinkBase 的背景

在实现 AsyncSinkBase 时,我们的目标是通过提供用于至少一次处理的通用异步接收器功能来简化构建到自定义目标的新异步接收器。这使用户可以更轻松地将接收器写入自定义目的地,例如 Amazon Kinesis Data Streams 和 Amazon Kinesis Firehose。在撰写本文时,还正在开发Amazon DynamoDB ( FLIP-252 ) 的附加异步接收器。

AsyncSinkBase 提供处理异步请求和响应机制的核心实现。这包括重试失败的消息,决定何时将记录刷新到目的地,以及在检查点期间将未刷新的记录保存到状态。为了增加吞吐量,异步接收器还根据目的地的响应动态调整请求速率。在我们之前的 1.15 版本博客文章中阅读更多相关信息,或观看我们解释 Async Sink 设计的 FlinkForward 谈话录音

配置 AsyncSinkBase

在设计 AsyncSinkBase 时,我们希望用户能够根据他们的用例和需求调整他们的自定义连接器实现,而不必了解基本接收器本身的低级工作原理。

因此,作为 Flink 1.15 初始实现的一部分,我们公开了maxBatchSizemaxInFlightRequestsmaxBufferedRequests等配置maxBatchSizeInBytes以便用户可以调整接收器的刷新和写入行为。maxTimeInBufferMSmaxRecordSizeInBytes

在 Flink 1.16 中,我们进一步将这种可配置性扩展到 AsyncSinkBase ( FLIP-242 ) 使用的 RateLimitingStrategy。通过此更改,用户现在可以自定义 AsyncSinkBase 如何实时动态调整请求率以优化吞吐量,同时减轻背压。自定义示例包括更改用于缩放请求速率的数学函数、在速率调整之间实施冷静期或实施令牌桶 RateLimitingStrategy。

RateLimitingStrategy 接口背后的基本原理

public interface RateLimitingStrategy {

// Information provided to the RateLimitingStrategy
void registerInFlightRequest(RequestInfo requestInfo);
void registerCompletedRequest(ResultInfo resultInfo);

// Controls offered to the RateLimitingStrategy
boolean shouldBlock(RequestInfo requestInfo);
int getMaxBatchSize();

}

RateLimitingStrategy 接口背后有两个核心思想:

  • 信息方法:我们需要方法来为 RateLimitingStrategy 提供足够的信息来跟踪请求速率或发送消息的速率(每个请求可以包含多条消息)

  • 控制方法:我们还需要允许 RateLimitingStrategy 控制接收器请求速率的方法。

这些是我们在 RateLimitingStrategy 接口中看到的方法类型。使用registerInFlightRequest()registerCompletedRequest(),RateLimitingStrategy 有足够的信息来跟踪进行中的请求和消息的数量,以及这些请求的速率。

使用shouldBlock(),RateLimitingStrategy 可以决定推迟新请求,直到满足指定条件(例如,当前进行中的请求不得超过给定数量)。这允许 RateLimitingStrategy 控制到目的地的请求速率。它可以决定增加吞吐量或增加 Flink 作业图中的背压。

使用getMaxBatchSize(),RateLimitingStrategy 可以动态调整打包到单个请求中的消息数。如果请求大小影响目标的性能,这对于优化接收器吞吐量很有用。

实现自定义 RateLimitingStrategy

【示例1】CongestionControlRateLimitingStrategy

AsyncSinkBase 预打包了 CongestionControlRateLimitingStrategy。在本节中,我们将探讨其实现。

该策略仿照TCP 拥塞控制,旨在发现目的地的最高可能请求率。它通过增加请求率直到目的地限制接收器来实现这一点,此时它将降低请求率。

在这个 RateLimitingStrategy 中,我们想通过以下方式动态调整请求速率:

  • 随时设置飞行中请求的最大数量

  • 随时设置最大数量的飞行消息(每个请求可以包含多条消息)

  • 在每次成功请求后增加最大飞行消息数,以最大化请求率

  • 在请求不成功后减少最大传输消息数,以防止目的地过载

  • 如果有多个接收器子任务,则独立跟踪最大数量的飞行消息

该策略意味着我们将从低请求率(慢启动)开始,但会积极增加请求率直到目的地限制我们,这使我们能够发现可能的最高请求率。如果目的地的条件发生变化(例如,另一个客户端开始写入同一目的地),它还将调整请求速率。如果目标在达到带宽限制后实施流量整形和节流(例如 Amazon Kinesis Data Streams、Amazon Kinesis Data Firehose),则此策略效果很好。

首先,我们实施信息方法来跟踪飞行中请求和飞行中消息的数量。

public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
// ...
@Override
public void registerInFlightRequest(RequestInfo requestInfo) {
currentInFlightRequests++;
currentInFlightMessages += requestInfo.getBatchSize();
}

@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
currentInFlightRequests = Math.max(0, currentInFlightRequests - 1);
currentInFlightMessages = Math.max(0, currentInFlightMessages - resultInfo.getBatchSize());

if (resultInfo.getFailedMessages() > 0) {
maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
} else {
maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
}
}
// ...
}

然后我们实现控制方法来动态调整请求率。

我们保留 maxInFlightMessages 和 maxInFlightRequests 的当前值,并在达到 maxInFlightRequests 或 maxInFlightMessages 时推迟所有新请求。

每次请求完成时,CongestionControlRateLimitingStrategy 都会检查响应中是否有任何失败消息。如果有,它将减少 maxInFlightMessages。如果没有失败消息,它将增加 maxInFlightMessages。这使我们可以间接控制消息写入目的地的速率。

附注:默认的 CongestionControlRateLimitingStrategy 使用 Additive Increase / Multiplicative Decrease (AIMD) 缩放策略。这也用于 TCP 拥塞控制,通过缓慢增加写入速率来避免目标过载,但如果受到限制则快速后退。

public class CongestionControlRateLimitingStrategy implements RateLimitingStrategy {
// ...
@Override
public void registerCompletedRequest(ResultInfo resultInfo) {
// ...
if (resultInfo.getFailedMessages() > 0) {
maxInFlightMessages = scalingStrategy.scaleDown(maxInFlightMessages);
} else {
maxInFlightMessages = scalingStrategy.scaleUp(maxInFlightMessages);
}
}

public boolean shouldBlock(RequestInfo requestInfo) {
return currentInFlightRequests >= maxInFlightRequests
|| (currentInFlightMessages + requestInfo.getBatchSize() > maxInFlightMessages);
}
// ...
}

【示例2】TokenBucketRateLimitingStrategy

CongestionControlRateLimitingStrategy 相当激进,并且依赖于强大的服务器端速率限制策略。如果我们没有强大的服务器端速率限制策略,我们可以实施客户端速率限制策略。

作为一个例子,我们可以看看令牌桶限速策略该策略允许我们设置接收器的确切速率(例如每秒请求数、每秒消息数)。如果限制设置正确,我们将完全避免目的地超载。

在此策略中,我们要执行以下操作:

  • 实施具有给定初始令牌数(例如 10)的 TokenBucket。这些令牌以给定的速率重新填充(例如每秒 1 个令牌)。

  • 在准备异步请求时,我们检查令牌桶是否有足够的令牌。如果没有,我们将推迟请求。

让我们看一个示例实现:

public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {

private final Bucket bucket;

public TokenBucketRateLimitingStrategy() {
Refill refill = Refill.intervally(1, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(10, refill);
this.bucket = Bucket4j.builder()
.addLimit(limit)
.build();
}

// ... (information methods not needed)

@Override
public boolean shouldBlock(RequestInfo requestInfo) {
return bucket.tryConsume(requestInfo.getBatchSize());
}

}

在上面的示例中,我们使用Bucket4j库的令牌桶实现。我们还将 1 条消息映射到 1 个令牌。由于我们的令牌桶大小为 10 个令牌,填充速率为每秒 1 个令牌,因此我们可以确定不会超过 10 条消息的突发,也不会超过每秒 1 条消息的恒定速率。

如果我们知道如果超过每秒 1 条消息的速率我们的目标将不正常地进行故障转移,或者如果我们有意限制我们的接收器的吞吐量以为写入同一目标的其他客户端提供更高的带宽,这将很有用。

指定自定义 RateLimitingStrategy

要指定自定义 RateLimitingStrategy,我们必须在传递给 AsyncSinkWriter 的构造函数的 AsyncSinkWriterConfiguration 中指定它。例如:

class MyCustomSinkWriterInputT> extends AsyncSinkWriterInputT, MyCustomRequestEntry> {

MyCustomSinkWriter(
ElementConverterInputT, MyCustomRequestEntry> elementConverter,
Sink.InitContext context,
CollectionBufferedRequestStateMyCustomRequestEntry>> states) {
super(
elementConverter,
context,
AsyncSinkWriterConfiguration.builder()
// ...
.setRateLimitingStrategy(new TokenBucketRateLimitingStrategy())
.build(),
states);
}

}

概括

从 Apache Flink 1.16 开始,我们可以自定义 RateLimitingStrategy,用于在运行时动态调整 Async Sink 的行为。这允许用户根据特定用例和需求调整他们的连接器实现,而无需了解基本接收器的低级工作原理。

我们希望此扩展程序对您有用。如果您有任何反馈,请随时与我们联系!

0 0 投票数
文章评分

本文转载自flink,原文链接:https://mp.weixin.qq.com/s/sNBYAqQqZHb6AMTImBx2HA。

(0)
上一篇 2022-12-06 16:36
下一篇 2022-12-09 15:25

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x