一、FlinkState简介
FlinkState 是 Apache Flink 中,用与表示和处理状态(state)的一个核心组件。在流式计算中,状态是处理逐步发展的关键。在传统计算模型中,每个计算任务都有自己的状态,而在 Flink 流式计算框架中,所有的计算任务共享一个状态。因此,FlinkState 能够轻松的应对大规模、高并发、低延迟、容错性的计算需求。
二、FlinkState的核心特点
1、分布式:FlinkState允许分布式地存储和访问状态,避免了单个节点故障导致状态丢失的问题。
2、高可用:在分布式存储的基础上,FlinkState 提供了高可用性的保证。当存在节点故障时,FlinkState 能够使用备份节点快速恢复状态。
3、容错性:FlinkState 具有自动的快照机制,能够在接受到故障恢复请求时,快速恢复计算任务的状态。
4、高性能:FlinkState 提供了快速的数据读写能力,能够保证高并发、低延迟的计算需求。
三、FlinkState在Flink流式计算中的应用
1、FlinkState的模式
FlinkState 模式有 4 种:
ValueState
ListState
MapState
ReducingState
其中:
ValueState:保存单个Java对象(类型为T)的状态。
ListState:保存一个Java对象(类型为T)列表的状态。
MapState:保存键值对的状态,键是一个Java对象(类型为K),值是另一个Java对象(类型为V)。
ReducingState:状态类型为T的集合进行固定操作的结果。
2、简化计算
使用 FlinkState 可以简化一些计算任务。例如,我们要在流中筛选出不同的用户数据,然后计算用户的平均值。在传统的计算模型中,我们需要维护两个状态:用户数量及其对应的总值。而在 Flink 流式计算中,我们可以定义一个 Sum 状态,在 Sum 状态中,保存当前流的总和即可。这样可以避免在计算过程中不断判定用户数据的状态,大大简化计算。以下是实现代码示例:
public static class AvgFunction extends RichFlatMapFunction<Tuple2, Tuple2> {
private transient ValueState<Tuple2> sum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2> descriptor =
new ValueStateDescriptor("sum", TypeInformation.of(new TypeHint<Tuple2>() {}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2 value, Collector<Tuple2> out) throws Exception {
Tuple2 currentSum = sum.value();
if (currentSum == null) {
currentSum = Tuple2.of(0, 0);
}
currentSum.f0 += 1;
currentSum.f1 += value.f1;
sum.update(currentSum);
if (currentSum.f0 >= 3) {
double avg = (double) currentSum.f1 / currentSum.f0;
out.collect(Tuple2.of(value.f0, avg));
sum.clear();
}
}
}
3、统计任务
在一些统计任务中,需要维护某些 Key 的状态,记录它们的经过时间后出现的次数。比如我们可以用 FlinkState 实现一个简单的登录任务,记录某个用户在几小时内登录了几次,以下是实现代码示例:
public static class LoginCount extends RichFlatMapFunction<Tuple2, Tuple2> {
private static final long HOUR_MS = 60 * 60 * 1000;
private static final long SECOND_MS = 1000;
private transient MapState countMap;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor countDesc = new MapStateDescriptor("count", Long.class, Integer.class);
countMap = getRuntimeContext().getMapState(countDesc);
}
@Override
public void flatMap(Tuple2 event, Collector<Tuple2> out) throws Exception {
long hour = event.f1 / HOUR_MS;
int count = 1;
Integer oldCount = countMap.get(hour);
if (oldCount != null) {
count += oldCount;
}
countMap.put(hour, count);
int sum = 0;
for (Integer integer : countMap.values()) {
sum += integer;
}
out.collect(Tuple2.of(event.f0, sum));
}
}
4、跨任务状态共享
在 Flink 流式计算中,多个任务可能需要共享一些状态,例如,在一个事件流系统中,多个流都需要同时接受数据。在这种情况下,我们可以使用 Flink 的 Broadcast State 来共享状态。以下是实现代码示例:
public static final MapStateDescriptor BC_DESC = new MapStateDescriptor("broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
public static class SplitStream extends ProcessFunction {
private transient MapState broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
broadcastState = getRuntimeContext().getMapState(BC_DESC);
}
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
//从广播状态中获取特定信息做相应处理
String bcInfo = broadcastState.get("bcInfo");
if (StringUtils.isNotBlank(bcInfo)) {
out.collect(value + " " + bcInfo);
}
}
}
public static class BroadcastStream extends RichMapFunction<Tuple2, Tuple2> {
private transient MapState broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
broadcastState = getRuntimeContext().getMapState(BC_DESC);
//将bcInfo信息放入广播状态中
broadcastState.put("bcInfo", "broadcastInfo");
}
@Override
public Tuple2 map(Tuple2 value) throws Exception {
return Tuple2.of(value.f1, value.f0.toString());
}
}
总结
本文详细介绍了 FlinkState 的特点、优点、常见模式以及应用场景,以及精简计算、统计任务、跨任务状态共享案例撰写的实现代码。
原创文章,作者:SDAT,如若转载,请注明出处:https://www.506064.com/n/148330.html