一、Kafka簡介
Apache Kafka是一個基於發布/訂閱模式的分布式流處理平台,由LinkedIn開發,以高吞吐量、可擴展性、持久性等特點而聞名。它主要由Kafka Broker、Producer、Consumer三個部分組成。
二、Kafka安裝
1、安裝Java環境(Kafka需要Java 8或更高版本);
2、下載Kafka二進制文件,並解壓;
3、配置Kafka環境變量;
4、啟動Zookeeper:Kafka使用Zookeeper來維護集群狀態;
5、啟動Kafka Server。
三、Kafka Producer
Kafka Producer是將數據發布到Kafka Broker的組件。下面是Java版本的Kafka Producer示例代碼:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
以上代碼會將0到99的整數按順序發送到名為“my-topic”的topic中。
四、Kafka Consumer
Kafka Consumer是從Kafka Broker上訂閱數據的組件。下面是Java版本的Kafka Consumer示例代碼:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
consumer.close();
以上代碼會消費“my-topic”這個topic的所有消息,並打印出消息的offset、key和value。
五、Kafka Stream
Kafka Stream是一個輕量級的庫,它將源數據流轉換為另一個數據流,並可以進行聚合、過濾、分組等操作。下面是一個簡單的Kafka Stream示例代碼:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("my-input-topic");
source.filter((key, value) -> value.contains("kafka"))
.mapValues(value -> value.toUpperCase())
.to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
以上代碼將消費“my-input-topic”這個topic上的消息,如果消息內容中包含“kafka”,則將消息轉換為大寫字母,並發布到“my-output-topic”這個topic上。
六、Kafka Connect
Kafka Connect是一個用於將數據從外部系統導入到Kafka或導出到外部系統的工具。它支持多種數據源和數據目標,例如MySQL、Elasticsearch、HDFS等。以下是一個簡單的Kafka Connect配置文件示例:
name=my-source-task
connector.class=FileStreamSource
file=/path/to/file.txt
topic=my-topic
以上配置會將“/path/to/file.txt”這個文件的內容導入到Kafka的“my-topic”這個topic中。
七、總結
本文對Kafka的各個組件進行了介紹,並給出了Java代碼示例。希望本文可以幫助讀者快速上手Kafka,並在實踐中體驗到Kafka的優越性能和可擴展性。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/303418.html