一、FlinkState簡介
FlinkState 是 Apache Flink 中,用與表示和處理狀態(state)的一個核心組件。在流式計算中,狀態是處理逐步發展的關鍵。在傳統計算模型中,每個計算任務都有自己的狀態,而在 Flink 流式計算框架中,所有的計算任務共享一個狀態。因此,FlinkState 能夠輕鬆的應對大規模、高並發、低延遲、容錯性的計算需求。
二、FlinkState的核心特點
1、分布式:FlinkState允許分布式地存儲和訪問狀態,避免了單個節點故障導致狀態丟失的問題。
2、高可用:在分布式存儲的基礎上,FlinkState 提供了高可用性的保證。當存在節點故障時,FlinkState 能夠使用備份節點快速恢復狀態。
3、容錯性:FlinkState 具有自動的快照機制,能夠在接受到故障恢復請求時,快速恢復計算任務的狀態。
4、高性能:FlinkState 提供了快速的數據讀寫能力,能夠保證高並發、低延遲的計算需求。
三、FlinkState在Flink流式計算中的應用
1、FlinkState的模式
FlinkState 模式有 4 種:
ValueState
ListState
MapState
ReducingState
其中:
ValueState:保存單個Java對象(類型為T)的狀態。
ListState:保存一個Java對象(類型為T)列表的狀態。
MapState:保存鍵值對的狀態,鍵是一個Java對象(類型為K),值是另一個Java對象(類型為V)。
ReducingState:狀態類型為T的集合進行固定操作的結果。
2、簡化計算
使用 FlinkState 可以簡化一些計算任務。例如,我們要在流中篩選出不同的用戶數據,然後計算用戶的平均值。在傳統的計算模型中,我們需要維護兩個狀態:用戶數量及其對應的總值。而在 Flink 流式計算中,我們可以定義一個 Sum 狀態,在 Sum 狀態中,保存當前流的總和即可。這樣可以避免在計算過程中不斷判定用戶數據的狀態,大大簡化計算。以下是實現代碼示例:
public static class AvgFunction extends RichFlatMapFunction<Tuple2, Tuple2> {
private transient ValueState<Tuple2> sum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2> descriptor =
new ValueStateDescriptor("sum", TypeInformation.of(new TypeHint<Tuple2>() {}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2 value, Collector<Tuple2> out) throws Exception {
Tuple2 currentSum = sum.value();
if (currentSum == null) {
currentSum = Tuple2.of(0, 0);
}
currentSum.f0 += 1;
currentSum.f1 += value.f1;
sum.update(currentSum);
if (currentSum.f0 >= 3) {
double avg = (double) currentSum.f1 / currentSum.f0;
out.collect(Tuple2.of(value.f0, avg));
sum.clear();
}
}
}
3、統計任務
在一些統計任務中,需要維護某些 Key 的狀態,記錄它們的經過時間後出現的次數。比如我們可以用 FlinkState 實現一個簡單的登錄任務,記錄某個用戶在幾小時內登錄了幾次,以下是實現代碼示例:
public static class LoginCount extends RichFlatMapFunction<Tuple2, Tuple2> {
private static final long HOUR_MS = 60 * 60 * 1000;
private static final long SECOND_MS = 1000;
private transient MapState countMap;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor countDesc = new MapStateDescriptor("count", Long.class, Integer.class);
countMap = getRuntimeContext().getMapState(countDesc);
}
@Override
public void flatMap(Tuple2 event, Collector<Tuple2> out) throws Exception {
long hour = event.f1 / HOUR_MS;
int count = 1;
Integer oldCount = countMap.get(hour);
if (oldCount != null) {
count += oldCount;
}
countMap.put(hour, count);
int sum = 0;
for (Integer integer : countMap.values()) {
sum += integer;
}
out.collect(Tuple2.of(event.f0, sum));
}
}
4、跨任務狀態共享
在 Flink 流式計算中,多個任務可能需要共享一些狀態,例如,在一個事件流系統中,多個流都需要同時接受數據。在這種情況下,我們可以使用 Flink 的 Broadcast State 來共享狀態。以下是實現代碼示例:
public static final MapStateDescriptor BC_DESC = new MapStateDescriptor("broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
public static class SplitStream extends ProcessFunction {
private transient MapState broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
broadcastState = getRuntimeContext().getMapState(BC_DESC);
}
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
//從廣播狀態中獲取特定信息做相應處理
String bcInfo = broadcastState.get("bcInfo");
if (StringUtils.isNotBlank(bcInfo)) {
out.collect(value + " " + bcInfo);
}
}
}
public static class BroadcastStream extends RichMapFunction<Tuple2, Tuple2> {
private transient MapState broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
broadcastState = getRuntimeContext().getMapState(BC_DESC);
//將bcInfo信息放入廣播狀態中
broadcastState.put("bcInfo", "broadcastInfo");
}
@Override
public Tuple2 map(Tuple2 value) throws Exception {
return Tuple2.of(value.f1, value.f0.toString());
}
}
總結
本文詳細介紹了 FlinkState 的特點、優點、常見模式以及應用場景,以及精簡計算、統計任務、跨任務狀態共享案例撰寫的實現代碼。
原創文章,作者:SDAT,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/148330.html