一、Flink介紹
Flink是一個開源的分佈式流處理引擎,支持高吞吐量、低延遲的數據流處理。Flink不僅支持無界流處理,還支持批處理。Flink提供了一套豐富的操作符,如map、filter、join等,允許用戶以高效的方式對數據流進行轉換和聚合。Flink本質上是一個基於維護狀態的引擎,其核心思想是將計算抽象為維護狀態,基於狀態的變化來實現計算。
下面是一個簡單的Flink應用程序:
DataStream text = env.socketTextStream("localhost", 9999);
DataStream mapResult = text
.map(new MapFunction() {
public Integer map(String value) {
return Integer.parseInt(value);
}
});
DataStream sum = mapResult
.keyBy(0)
.sum(1);
sum.print();
env.execute("Flink Streaming Java API Skeleton");
在上面的例子中,我們從socket接收輸入數據流,並將輸入流中的每條記錄解析為Integer類型。然後,我們按照記錄的第0個位置鍵入記錄流,並對值進行求和,並將其打印到控制台上。
二、Flink窗口
在實際情況中,我們往往需要對數據流進行分組,並運行窗口操作。Flink提供了多種類型的窗口,例如滾動窗口、滑動窗口等。下面是一個使用滑動窗口的例子:
DataStream<Tuple2> input = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("a", 3),
Tuple2.of("b", 4),
Tuple2.of("b", 5),
Tuple2.of("b", 6)
);
input
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1)
.print();
env.execute();
在上面的例子中,我們創建了一個輸入流,然後按照記錄的第0個元素進行鍵入。然後,我們使用滑動窗口來對數據進行處理。在此例子中,我們使用了SlidingProcessingTimeWindows,該窗口在每隔5秒鐘的時間間隔內處理最近10秒鐘的記錄。最後我們對所有記錄值進行求和,並將其打印到控制台上。
三、Flink狀態
在Flink中,狀態是一等公民。狀態允許用戶在運行時跟蹤和維護上下文信息,例如累計器值、排序狀態等。Flink提供了多種API來操作狀態,例如ValueState、ListState等。以下是一個使用ValueState的例子:
public class Average implements MapFunction<Tuple2, Tuple2>> {
@Override
public Tuple2 map(Tuple2 value) throws Exception {
Double avg = (double) (value.f1 / value.f0);
return Tuple2.of(value.f0, avg);
}
}
DataStream<Tuple2> input = env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L)
);
input.keyBy(0)
.map(new Average())
.keyBy(0)
.map(new RichMapFunction<Tuple2, Tuple2<Long, Tuple2>>() {
private transient ValueState<Tuple2> valueState;
@Override
public Tuple2<Long, Tuple2> map(Tuple2 input) throws Exception {
Tuple2 currentSum = valueState.value();
if (currentSum == null) {
currentSum = Tuple2.of(0.0, 0);
}
currentSum.f0 += input.f1;
currentSum.f1 += 1;
valueState.update(currentSum);
return Tuple2.of(input.f0, currentSum);
}
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<Tuple2> descriptor = new ValueStateDescriptor("average", Types.TUPLE(Types.DOUBLE, Types.INT));
valueState = getRuntimeContext().getState(descriptor);
}
})
.print();
env.execute();
在上面的例子中,我們創建了一個輸入流並按第0個元素鍵入輸入流。然後,我們計算一個每個鍵的平均值,並將其與輸入流中的下一個元素結合計算。在這個例子中,我們使用了ValueState對平均值進行跟蹤和維護。我們在Map函數中使用ValueStateDescriptor來創建狀態描述符,並在RichMapFunction中使用ValueState對狀態進行讀取和寫入。
四、Flink水印
在流處理中,由於輸入數據的亂序到達,我們需要推遲一段時間進行計算以獲取正確的結果。Flink的水印機制可以幫助我們解決這個問題。水印被Flink用於標記事件時間流的進度,允許Flink在計算上做出更好的決策。以下是一個使用Watermark的例子:
public class TimestampWithFailures {
private long timestamp;
private final boolean isNormal;
public TimestampWithFailures(long timestamp, boolean isNormal) {
this.timestamp = timestamp;
this.isNormal = isNormal;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public boolean isNormal() {
return isNormal;
}
}
public class WatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WatermarkStrategy strategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream stream = env.addSource(new SourceFunction() {
private Random rand = new Random();
@Override
public void run(SourceContext ctx) throws Exception {
int maxTimestamp = 102000;
int numFailures = 0;
while (!Thread.interrupted()) {
long timestamp = System.currentTimeMillis() - rand.nextInt(maxTimestamp);
if (rand.nextBoolean() || rand.nextBoolean()) {
ctx.collectWithTimestamp(new TimestampWithFailures(timestamp, true), timestamp);
} else {
numFailures++;
// only emit watermark if there were normal events emitted since last watermark
if (numFailures % 3 == 0) {
ctx.emitWatermark(new Watermark(timestamp - 500));
}
// emit event with a bit delay, but still within the bounds of the assigned watermark
ctx.collectWithTimestamp(new TimestampWithFailures(timestamp + 250, false), timestamp + 250);
}
}
}
@Override
public void cancel() {
}
});
stream
.assignTimestampsAndWatermarks(strategy)
.keyBy(event -> event.isNormal())
.countWindow(10)
.sum(1)
.print();
env.execute("Stress test with Watermarks");
}
}
在上面的例子中,我們模擬了一個隨機生成數據的源,其中一些數據處理成功,另一些會失敗。在處理失敗事件時,我們會發出水印,並將其與事件時間軸上稍後的普通事件一起傳輸。在這個例子中,我們使用了forBoundedOutOfOrderness WatermarkStrategy,該策略使用最近500毫秒內收到的最大事件時間戳。這個例子中的累加器在一個10個元素的窗口上進行計算。
五、Flink SQL
Flink SQL是Flink的一種高級抽象層,允許用戶使用SQL查詢語言對數據流和表進行操作。Flink SQL支持標準的SQL語法,並提供了一些Flink特定的擴展。下面是一個使用Flink SQL查詢語言的例子:
public class FlinkSqlExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource orderSource = env.addSource(new ExampleData.OrderSource());
Table orderTable = tableEnv.fromDataStream(orderSource, "user, product, amount, rowtime.rowtime");
tableEnv.registerTable("Orders", orderTable);
Table result = tableEnv.sqlQuery("SELECT TUMBLE_START(rowtime, INTERVAL '5' SECOND), product, sum(amount) as total_amount FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), product");
DataStream resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.print();
env.execute();
}
}
在上面的例子中,我們從一個數據源中讀取訂單流,並使用SQL查詢對訂單進行分組和聚合。其中,TUMBLE函數和TUMBLE_START函數用於定義滾動窗口,並將訂單按產品分組。最後,我們使用toAppendStream將結果集轉換為DataStream,並將其打印到控制台上。
原創文章,作者:OBDWO,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/331589.html
微信掃一掃
支付寶掃一掃