一、背景介绍
在现今互联网时代,获取和处理实时数据已经成为数据处理领域的重要研究方向之一。对于实时数据的处理,流式计算框架成为了关键技术之一。而 Apache Flink 作为开源的流式计算框架,被广泛使用于实时数据的处理,这里加入Kafka,将Flink作为实时处理引擎,与Kafka集成,在Kafka中实时获取数据。
二、FlinkKafka整合介绍
整合Flink与Kafka的方式可分为两种,一种是使用 Flink 官方提供的Kafka Consumer API 直连Kafka,一种是使用 FlinkKafkaConnector 接入Kafka,下面将逐一介绍两种方式的具体实现。
三、使用 Flink 官方提供的 Kafka Consumer API 直连 Kafka
在此之前,需要确保读者对 Kafka 的基本概念以及Flink的 DataSet 和 DataStream 有一定的了解。如果您还不熟悉这两个东西,建议先去了解一下。
使用 Flink 的 Java 调用 Kafka 的 Consumer API 直连 Kafka,它需要我们手动实现一个 FlinkKafkaConsumer010 的一个匿名内部类,来指定我们要读取的 Kafka topic、消息反序列化的类以及 Kafka broker 的信息。具体代码实现如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer010(
"mytopic", //topic
new SimpleStringSchema(), // String 序列化
properties)); // Kafka 连接参数
kafkaSource.print();
env.execute("Flink Streaming Java API Skeleton");
}
}
四、使用 FlinkKafkaConnector 接入 Kafka
如果不想手动实现 Kafka Consumer API,也可以使用 Flink 提供的 FlinkKafkaConnector 来实现接入 Kafka。
下面是一个使用 FlinkKafkaConnector读取 Kafka 数据的例子,需要引用FlinkKafkaConnector依赖包和Kafka的依赖包:
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class KafkaSource {
private static final String brokerList = "localhost:9092";
private static final String topic = "test";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerList);
props.setProperty("group.id", "test");
DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer011(topic, new SimpleStringSchema(), props));
kafkaSource.print();
env.execute("KafkaSource");
}
}
五、Kafka 数据写入到 Flink
在 Streaming 处理过程中,除了从 Kafka 中读取数据之外,如果我们想把 Flink 中处理过的结果返回到 Kafka 中,那么就需要了解 Flink 内置的 Kafka Producer API 的使用。
这里我们使用 FlinkKafkaProducer010 来将结果写入到 Kafka 中。以下实例代码是将Flink 数据写入到 Kafka 中。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import java.util.Properties;
public class KafkaSink {
private static final String brokerList = "localhost:9092";
private static final String topic = "test";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokerList);
FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(topic,new SimpleStringSchema(),properties);
DataStream input = env.socketTextStream("localhost", 9000, "\n");
input.addSink(kafkaProducer);
env.execute("KafkaSink");
}
}
六、总结
本文主要对 FlinkKafka 整合的两种方式进行了详细的介绍。作为一种高效实时流处理解决方案,FlinkKafka已被广泛应用于大数据领域。本文介绍的两种整合方式均使用简单、效率高、易于控制,可以有效满足多种实时计算使用场景。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/283593.html
微信扫一扫
支付宝扫一扫