一、簡介
Flink Kafka Consumer是Flink中針對Kafka數據源編寫的一個控制台消費程序。其主要作用是從Kafka中消費數據,將消費的數據轉換成Flink中的DataStream數據流,然後通過Flink的各種運算元進行數據的處理和分析。
二、使用方式
使用Flink Kafka Consumer非常簡單,只需要在Flink任務中先引入flink-connector-kafka_2.12依賴,然後使用下面的方式創建一個KafkaConsumer:
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("topic-name", new SimpleStringSchema(), props);
其中,topic-name是要消費的Kafka Topic名稱, SimpleStringSchema是指這個Kafka Topic的數據是以字元串的形式進行編碼,props是一個Properties對象,用於設置KafkaConsumer的一些配置信息,例如消費者組名稱、Kafka Broker地址等等。
一旦創建好FlinkKafkaConsumer,我們就可以使用Flink的DataStream API從Kafka中消費數據了:
DataStream stream = env.addSource(kafkaConsumer);
這樣,我們就可以通過stream對消費到的Kafka Topic數據進行各種分析和處理了。
三、常用參數詳解
1. properties文件的配置
在Flink任務中使用FlinkKafkaConsumer消費Kafka Topic時,需要通過設置Properties對象來配置Kafka Consumer的各種參數。下面是一些常用的參數配置:
- bootstrap.servers:指定Kafka Broker地址,格式是host:port,多個Broker地址用逗號隔開。
- group.id:指定Consumer Group的名稱。
- auto.offset.reset:指定Consumer在沒有offset的情況下,從何處開始消費,可以選擇earliest或latest。
- enable.auto.commit:指定是否啟用Auto Commit功能。
- fetch.max.bytes:指定每次從Kafka Broker拉取數據的最大位元組數。
2. 反序列化器的配置
FlinkKafkaConsumer需要將Kafka Topic中的數據解碼成Flink中的java對象,這個過程可以使用Kafka提供的反序列化器來完成。Flink提供了各種反序列化器,例如SimpleStringSchema、JSONDeserializationSchema等等,常用的反序列化器配置如下:
- SimpleStringSchema:用於將字元串數據解析成String類型。
- JSONDeserializationSchema:用於將JSON數據解析成POJO對象。
- AvroDeserializationSchema:用於將Avro數據解析成POJO對象。
3. 消費位置的配置
FlinkKafkaConsumer支持多種不同的消費位置,例如從最早的Offset開始消費、從最新的Offset開始消費、從指定的Offset開始消費等等。我們可以通過配置KafkaConsumer的一個屬性來指定消費位置。
- auto.offset.reset:當第一次啟動一個Consumer時,如果沒有指定一個初始的消費位置,那麼Consumer會自動根據這個屬性來設置消費位置。
- assign和subscribe:通過手動指定Topic Partition的Offset來設置消費位置。
- seek:在運行時動態修改Consumer的消費位置。
四、完整代碼示例
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class FlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("my-topic", new SimpleStringSchema(), props); DataStream stream = env.addSource(consumer); // do some processing on the stream stream.print(); env.execute("Flink Kafka Consumer Demo"); } }
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/195951.html