一、背景介紹
在現今互聯網時代,獲取和處理實時數據已經成為數據處理領域的重要研究方向之一。對於實時數據的處理,流式計算框架成為了關鍵技術之一。而 Apache Flink 作為開源的流式計算框架,被廣泛使用於實時數據的處理,這裡加入Kafka,將Flink作為實時處理引擎,與Kafka集成,在Kafka中實時獲取數據。
二、FlinkKafka整合介紹
整合Flink與Kafka的方式可分為兩種,一種是使用 Flink 官方提供的Kafka Consumer API 直連Kafka,一種是使用 FlinkKafkaConnector 接入Kafka,下面將逐一介紹兩種方式的具體實現。
三、使用 Flink 官方提供的 Kafka Consumer API 直連 Kafka
在此之前,需要確保讀者對 Kafka 的基本概念以及Flink的 DataSet 和 DataStream 有一定的了解。如果您還不熟悉這兩個東西,建議先去了解一下。
使用 Flink 的 Java 調用 Kafka 的 Consumer API 直連 Kafka,它需要我們手動實現一個 FlinkKafkaConsumer010 的一個匿名內部類,來指定我們要讀取的 Kafka topic、消息反序列化的類以及 Kafka broker 的信息。具體代碼實現如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; public class FlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer010( "mytopic", //topic new SimpleStringSchema(), // String 序列化 properties)); // Kafka 連接參數 kafkaSource.print(); env.execute("Flink Streaming Java API Skeleton"); } }
四、使用 FlinkKafkaConnector 接入 Kafka
如果不想手動實現 Kafka Consumer API,也可以使用 Flink 提供的 FlinkKafkaConnector 來實現接入 Kafka。
下面是一個使用 FlinkKafkaConnector讀取 Kafka 數據的例子,需要引用FlinkKafkaConnector依賴包和Kafka的依賴包:
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class KafkaSource { private static final String brokerList = "localhost:9092"; private static final String topic = "test"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", brokerList); props.setProperty("group.id", "test"); DataStreamSource kafkaSource = env.addSource(new FlinkKafkaConsumer011(topic, new SimpleStringSchema(), props)); kafkaSource.print(); env.execute("KafkaSource"); } }
五、Kafka 數據寫入到 Flink
在 Streaming 處理過程中,除了從 Kafka 中讀取數據之外,如果我們想把 Flink 中處理過的結果返回到 Kafka 中,那麼就需要了解 Flink 內置的 Kafka Producer API 的使用。
這裡我們使用 FlinkKafkaProducer010 來將結果寫入到 Kafka 中。以下實例代碼是將Flink 數據寫入到 Kafka 中。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import java.util.Properties; public class KafkaSink { private static final String brokerList = "localhost:9092"; private static final String topic = "test"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", brokerList); FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(topic,new SimpleStringSchema(),properties); DataStream input = env.socketTextStream("localhost", 9000, "\n"); input.addSink(kafkaProducer); env.execute("KafkaSink"); } }
六、總結
本文主要對 FlinkKafka 整合的兩種方式進行了詳細的介紹。作為一種高效實時流處理解決方案,FlinkKafka已被廣泛應用於大數據領域。本文介紹的兩種整合方式均使用簡單、效率高、易於控制,可以有效滿足多種實時計算使用場景。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/283593.html