一、Flink介紹
Flink是一個開源的分散式流處理引擎,支持高吞吐量、低延遲的數據流處理。Flink不僅支持無界流處理,還支持批處理。Flink提供了一套豐富的操作符,如map、filter、join等,允許用戶以高效的方式對數據流進行轉換和聚合。Flink本質上是一個基於維護狀態的引擎,其核心思想是將計算抽象為維護狀態,基於狀態的變化來實現計算。
下面是一個簡單的Flink應用程序:
DataStream text = env.socketTextStream("localhost", 9999); DataStream mapResult = text .map(new MapFunction() { public Integer map(String value) { return Integer.parseInt(value); } }); DataStream sum = mapResult .keyBy(0) .sum(1); sum.print(); env.execute("Flink Streaming Java API Skeleton");
在上面的例子中,我們從socket接收輸入數據流,並將輸入流中的每條記錄解析為Integer類型。然後,我們按照記錄的第0個位置鍵入記錄流,並對值進行求和,並將其列印到控制台上。
二、Flink窗口
在實際情況中,我們往往需要對數據流進行分組,並運行窗口操作。Flink提供了多種類型的窗口,例如滾動窗口、滑動窗口等。下面是一個使用滑動窗口的例子:
DataStream<Tuple2> input = env.fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("a", 3), Tuple2.of("b", 4), Tuple2.of("b", 5), Tuple2.of("b", 6) ); input .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .sum(1) .print(); env.execute();
在上面的例子中,我們創建了一個輸入流,然後按照記錄的第0個元素進行鍵入。然後,我們使用滑動窗口來對數據進行處理。在此例子中,我們使用了SlidingProcessingTimeWindows,該窗口在每隔5秒鐘的時間間隔內處理最近10秒鐘的記錄。最後我們對所有記錄值進行求和,並將其列印到控制台上。
三、Flink狀態
在Flink中,狀態是一等公民。狀態允許用戶在運行時跟蹤和維護上下文信息,例如累計器值、排序狀態等。Flink提供了多種API來操作狀態,例如ValueState、ListState等。以下是一個使用ValueState的例子:
public class Average implements MapFunction<Tuple2, Tuple2>> { @Override public Tuple2 map(Tuple2 value) throws Exception { Double avg = (double) (value.f1 / value.f0); return Tuple2.of(value.f0, avg); } } DataStream<Tuple2> input = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 6L) ); input.keyBy(0) .map(new Average()) .keyBy(0) .map(new RichMapFunction<Tuple2, Tuple2<Long, Tuple2>>() { private transient ValueState<Tuple2> valueState; @Override public Tuple2<Long, Tuple2> map(Tuple2 input) throws Exception { Tuple2 currentSum = valueState.value(); if (currentSum == null) { currentSum = Tuple2.of(0.0, 0); } currentSum.f0 += input.f1; currentSum.f1 += 1; valueState.update(currentSum); return Tuple2.of(input.f0, currentSum); } @Override public void open(Configuration config) throws Exception { ValueStateDescriptor<Tuple2> descriptor = new ValueStateDescriptor("average", Types.TUPLE(Types.DOUBLE, Types.INT)); valueState = getRuntimeContext().getState(descriptor); } }) .print(); env.execute();
在上面的例子中,我們創建了一個輸入流並按第0個元素鍵入輸入流。然後,我們計算一個每個鍵的平均值,並將其與輸入流中的下一個元素結合計算。在這個例子中,我們使用了ValueState對平均值進行跟蹤和維護。我們在Map函數中使用ValueStateDescriptor來創建狀態描述符,並在RichMapFunction中使用ValueState對狀態進行讀取和寫入。
四、Flink水印
在流處理中,由於輸入數據的亂序到達,我們需要推遲一段時間進行計算以獲取正確的結果。Flink的水印機制可以幫助我們解決這個問題。水印被Flink用於標記事件時間流的進度,允許Flink在計算上做出更好的決策。以下是一個使用Watermark的例子:
public class TimestampWithFailures { private long timestamp; private final boolean isNormal; public TimestampWithFailures(long timestamp, boolean isNormal) { this.timestamp = timestamp; this.isNormal = isNormal; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public boolean isNormal() { return isNormal; } } public class WatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); WatermarkStrategy strategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofMillis(500)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); DataStream stream = env.addSource(new SourceFunction() { private Random rand = new Random(); @Override public void run(SourceContext ctx) throws Exception { int maxTimestamp = 102000; int numFailures = 0; while (!Thread.interrupted()) { long timestamp = System.currentTimeMillis() - rand.nextInt(maxTimestamp); if (rand.nextBoolean() || rand.nextBoolean()) { ctx.collectWithTimestamp(new TimestampWithFailures(timestamp, true), timestamp); } else { numFailures++; // only emit watermark if there were normal events emitted since last watermark if (numFailures % 3 == 0) { ctx.emitWatermark(new Watermark(timestamp - 500)); } // emit event with a bit delay, but still within the bounds of the assigned watermark ctx.collectWithTimestamp(new TimestampWithFailures(timestamp + 250, false), timestamp + 250); } } } @Override public void cancel() { } }); stream .assignTimestampsAndWatermarks(strategy) .keyBy(event -> event.isNormal()) .countWindow(10) .sum(1) .print(); env.execute("Stress test with Watermarks"); } }
在上面的例子中,我們模擬了一個隨機生成數據的源,其中一些數據處理成功,另一些會失敗。在處理失敗事件時,我們會發出水印,並將其與事件時間軸上稍後的普通事件一起傳輸。在這個例子中,我們使用了forBoundedOutOfOrderness WatermarkStrategy,該策略使用最近500毫秒內收到的最大事件時間戳。這個例子中的累加器在一個10個元素的窗口上進行計算。
五、Flink SQL
Flink SQL是Flink的一種高級抽象層,允許用戶使用SQL查詢語言對數據流和表進行操作。Flink SQL支持標準的SQL語法,並提供了一些Flink特定的擴展。下面是一個使用Flink SQL查詢語言的例子:
public class FlinkSqlExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource orderSource = env.addSource(new ExampleData.OrderSource()); Table orderTable = tableEnv.fromDataStream(orderSource, "user, product, amount, rowtime.rowtime"); tableEnv.registerTable("Orders", orderTable); Table result = tableEnv.sqlQuery("SELECT TUMBLE_START(rowtime, INTERVAL '5' SECOND), product, sum(amount) as total_amount FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), product"); DataStream resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.print(); env.execute(); } }
在上面的例子中,我們從一個數據源中讀取訂單流,並使用SQL查詢對訂單進行分組和聚合。其中,TUMBLE函數和TUMBLE_START函數用於定義滾動窗口,並將訂單按產品分組。最後,我們使用toAppendStream將結果集轉換為DataStream,並將其列印到控制台上。
原創文章,作者:OBDWO,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/331589.html