一、Flink介绍
Flink是一个开源的分布式流处理引擎,支持高吞吐量、低延迟的数据流处理。Flink不仅支持无界流处理,还支持批处理。Flink提供了一套丰富的操作符,如map、filter、join等,允许用户以高效的方式对数据流进行转换和聚合。Flink本质上是一个基于维护状态的引擎,其核心思想是将计算抽象为维护状态,基于状态的变化来实现计算。
下面是一个简单的Flink应用程序:
DataStream text = env.socketTextStream("localhost", 9999);
DataStream mapResult = text
.map(new MapFunction() {
public Integer map(String value) {
return Integer.parseInt(value);
}
});
DataStream sum = mapResult
.keyBy(0)
.sum(1);
sum.print();
env.execute("Flink Streaming Java API Skeleton");
在上面的例子中,我们从socket接收输入数据流,并将输入流中的每条记录解析为Integer类型。然后,我们按照记录的第0个位置键入记录流,并对值进行求和,并将其打印到控制台上。
二、Flink窗口
在实际情况中,我们往往需要对数据流进行分组,并运行窗口操作。Flink提供了多种类型的窗口,例如滚动窗口、滑动窗口等。下面是一个使用滑动窗口的例子:
DataStream<Tuple2> input = env.fromElements(
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("a", 3),
Tuple2.of("b", 4),
Tuple2.of("b", 5),
Tuple2.of("b", 6)
);
input
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1)
.print();
env.execute();
在上面的例子中,我们创建了一个输入流,然后按照记录的第0个元素进行键入。然后,我们使用滑动窗口来对数据进行处理。在此例子中,我们使用了SlidingProcessingTimeWindows,该窗口在每隔5秒钟的时间间隔内处理最近10秒钟的记录。最后我们对所有记录值进行求和,并将其打印到控制台上。
三、Flink状态
在Flink中,状态是一等公民。状态允许用户在运行时跟踪和维护上下文信息,例如累计器值、排序状态等。Flink提供了多种API来操作状态,例如ValueState、ListState等。以下是一个使用ValueState的例子:
public class Average implements MapFunction<Tuple2, Tuple2>> {
@Override
public Tuple2 map(Tuple2 value) throws Exception {
Double avg = (double) (value.f1 / value.f0);
return Tuple2.of(value.f0, avg);
}
}
DataStream<Tuple2> input = env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L)
);
input.keyBy(0)
.map(new Average())
.keyBy(0)
.map(new RichMapFunction<Tuple2, Tuple2<Long, Tuple2>>() {
private transient ValueState<Tuple2> valueState;
@Override
public Tuple2<Long, Tuple2> map(Tuple2 input) throws Exception {
Tuple2 currentSum = valueState.value();
if (currentSum == null) {
currentSum = Tuple2.of(0.0, 0);
}
currentSum.f0 += input.f1;
currentSum.f1 += 1;
valueState.update(currentSum);
return Tuple2.of(input.f0, currentSum);
}
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<Tuple2> descriptor = new ValueStateDescriptor("average", Types.TUPLE(Types.DOUBLE, Types.INT));
valueState = getRuntimeContext().getState(descriptor);
}
})
.print();
env.execute();
在上面的例子中,我们创建了一个输入流并按第0个元素键入输入流。然后,我们计算一个每个键的平均值,并将其与输入流中的下一个元素结合计算。在这个例子中,我们使用了ValueState对平均值进行跟踪和维护。我们在Map函数中使用ValueStateDescriptor来创建状态描述符,并在RichMapFunction中使用ValueState对状态进行读取和写入。
四、Flink水印
在流处理中,由于输入数据的乱序到达,我们需要推迟一段时间进行计算以获取正确的结果。Flink的水印机制可以帮助我们解决这个问题。水印被Flink用于标记事件时间流的进度,允许Flink在计算上做出更好的决策。以下是一个使用Watermark的例子:
public class TimestampWithFailures {
private long timestamp;
private final boolean isNormal;
public TimestampWithFailures(long timestamp, boolean isNormal) {
this.timestamp = timestamp;
this.isNormal = isNormal;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public boolean isNormal() {
return isNormal;
}
}
public class WatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WatermarkStrategy strategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
DataStream stream = env.addSource(new SourceFunction() {
private Random rand = new Random();
@Override
public void run(SourceContext ctx) throws Exception {
int maxTimestamp = 102000;
int numFailures = 0;
while (!Thread.interrupted()) {
long timestamp = System.currentTimeMillis() - rand.nextInt(maxTimestamp);
if (rand.nextBoolean() || rand.nextBoolean()) {
ctx.collectWithTimestamp(new TimestampWithFailures(timestamp, true), timestamp);
} else {
numFailures++;
// only emit watermark if there were normal events emitted since last watermark
if (numFailures % 3 == 0) {
ctx.emitWatermark(new Watermark(timestamp - 500));
}
// emit event with a bit delay, but still within the bounds of the assigned watermark
ctx.collectWithTimestamp(new TimestampWithFailures(timestamp + 250, false), timestamp + 250);
}
}
}
@Override
public void cancel() {
}
});
stream
.assignTimestampsAndWatermarks(strategy)
.keyBy(event -> event.isNormal())
.countWindow(10)
.sum(1)
.print();
env.execute("Stress test with Watermarks");
}
}
在上面的例子中,我们模拟了一个随机生成数据的源,其中一些数据处理成功,另一些会失败。在处理失败事件时,我们会发出水印,并将其与事件时间轴上稍后的普通事件一起传输。在这个例子中,我们使用了forBoundedOutOfOrderness WatermarkStrategy,该策略使用最近500毫秒内收到的最大事件时间戳。这个例子中的累加器在一个10个元素的窗口上进行计算。
五、Flink SQL
Flink SQL是Flink的一种高级抽象层,允许用户使用SQL查询语言对数据流和表进行操作。Flink SQL支持标准的SQL语法,并提供了一些Flink特定的扩展。下面是一个使用Flink SQL查询语言的例子:
public class FlinkSqlExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource orderSource = env.addSource(new ExampleData.OrderSource());
Table orderTable = tableEnv.fromDataStream(orderSource, "user, product, amount, rowtime.rowtime");
tableEnv.registerTable("Orders", orderTable);
Table result = tableEnv.sqlQuery("SELECT TUMBLE_START(rowtime, INTERVAL '5' SECOND), product, sum(amount) as total_amount FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), product");
DataStream resultSet = tableEnv.toAppendStream(result, Row.class);
resultSet.print();
env.execute();
}
}
在上面的例子中,我们从一个数据源中读取订单流,并使用SQL查询对订单进行分组和聚合。其中,TUMBLE函数和TUMBLE_START函数用于定义滚动窗口,并将订单按产品分组。最后,我们使用toAppendStream将结果集转换为DataStream,并将其打印到控制台上。
原创文章,作者:OBDWO,如若转载,请注明出处:https://www.506064.com/n/331589.html
微信扫一扫
支付宝扫一扫