kafka快速實戰與基本原理詳解
背景
Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協 調的分布式消息系統,它的最大的特性就是可以實時地處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統 統、低延遲的實時系統、Storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫, Linkedin於2010年貢獻給了Apache基金會並成為頂級開源 項目。

Kafka基本概念
kafka是一個分布式的,分區的消息(官方稱之為commit log)服務。它提供一個消息系統應該具備的功能,但是確有着獨特的設計。可以這樣來說,Kafka借鑒了JMS規範的思想,但是確並沒有完全遵循JMS規範。
首先,讓我們來看一下基礎的消息(Message)相關術語
名稱 | 解釋 |
Broker | 消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka集群 |
Topic | Kafka根據topic對消息進行歸類,發布到Kafka集群的每條消息都需要指定一個topic |
Producer | 消息生產者,向Broker發送消息的客戶端 |
Consumer | 消息消費者,從Broker讀取消息的客戶端 |
ConsumerGroup | 每個Consumer屬於一個特定的Consumer Group,一條消息可以被多個不同的Consumer Group消費,但是一個Consumer Group中只能有一個Consumer能夠消費該消息 |
Partition | 物理上的概念,一個topic可以分為多個partition,每個partition內部消息是有序的 |
因此,從一個較高的層面上來看,producer通過網絡發送消息到Kafka集群,然後consumer來進行消費,如下圖:

服務端(brokers)和客戶端(producer、consumer)之間通信通過TCP協議來完成.
kafka基本使用
安裝前的環境準備
由於Kafka是用Scala語言開發的,運行在JVM上,因此在安裝Kafka之前需要先安裝JDK。
yum install java-1.8.0-openjdk* -y
kafka依賴zookeeper,所以需要先安裝zookeeper
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
cd apache-zookeeper-3.5.8-bin
cp conf/zoo_sample.cfg conf/zoo.cfg # 啟動zookeeper bin/zkServer.sh
start bin/zkCli.sh
ls / #查看zk的根目錄相關節點
第一步:下載安裝包
下載2.4.1 release版本,並解壓:
wget
https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz # 2.11是scala的版本,2.4.1是kafka的版本
tar -xzf kafka_2.11-2.4.1.tgz
cd kafka_2.11-2.4.1
第二步:修改配置
修改配置文件config/server.properties:
#broker.id屬性在kafka集群中必須要是唯一 broker.id=0 #kafka部署的機器ip和提供服務的端口號 listeners=
PLAINTEXT://192.168.65.60:9092 #kafka的消息存儲文件 log.dir=/usr/local/data/kafka-logs #kafka連接zookeeper的地址 zookeeper.connect=192.168.65.60:2181
第三步:啟動服務
現在來啟動kafka服務:
啟動腳本語法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路徑是一個強制的參數,-daemon表示以後台進程運行,否則ssh客戶端退出後,就會停止服務。(注意,在啟動kafka時會使用linux主機名關聯的ip地址,所以需要把主機名和linux的ip映射配置到本地host里,用vim /etc/hosts)
# 啟動kafka,運行日誌在logs目錄的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties
#後台啟動,不會打印日誌到控制台 或者用
bin/kafka-server-start.sh config/server.properties & # 我們進入zookeeper目錄通過zookeeper客戶端查看下zookeeper的目錄樹 bin/zkCli.sh ls / #查看zk的根目錄kafka相關節點
ls /brokers/ids #查看kafka節點 # 停止kafka bin/kafka-server-stop.sh
server.properties核心配置詳解:
Property | Default | Description |
broker.id | 0 | 每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為broker的“名字”,你可以選擇任意你喜歡的數字作為id,只要id是唯一的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放數據的路徑。這個路徑並不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當創建新partition時,都會選擇在包含最少partitions的路徑下進行。 |
listeners | PLAINTEXT://192.168.65.60:9092 | server接受客戶端連接的端口,ip配置kafka本機ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper連接字符串的格式為:hostname:port,此處hostname和port分別是ZooKeeper集群中某個節點的host和port;zookeeper如果是集群,連接方式為 hostname1:port1, hostname2:port2, hostname3:port3 |
log.retention.hours | 168 | 每個日誌文件刪除之前保存的時間。默認數據保存時間對所有topic都一樣。 |
num.partitions | 1 | 創建topic的默認分區數 |
default.replication.factor | 1 | 自動創建topic的默認副本數量,建議設置為大於等於2 |
min.insync.replicas | 1 | 當producer設置acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫數據都是成功的),如果這個數目沒有達到,producer發送消息會產生異常 |
delete.topic.enable | false | 是否允許刪除主題 |
第四步:創建主題
現在我們來創建一個名字為“test”的Topic,這個topic只有一個partition,並且備份因子也設置為1:
bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 1 –partitions 1 –topic test
現在我們可以通過以下命令來查看kafka中目前存在的topic
bin/kafka-topics.sh –list –zookeeper 192.168.65.60:2181
除了我們通過手工的方式創建Topic,當producer發布一個消息到某個指定的Topic,這個Topic如果不存在,就自動創建。
刪除主題
bin/kafka-topics.sh –delete –topic test –zookeeper 192.168.65.60:2181
第五步:發送消息
kafka自帶了一個producer命令客戶端,可以從本地文件中讀取內容,或者我們也可以以命令行中直接輸入內容,並將這些內容以消息的形式發送到kafka集群中。在默認情況下,每一個行會被當做成一個獨立的消息。
首先我們要運行發布消息的腳本,然後在命令中輸入要發送的消息的內容:
bin/kafka-console-producer.sh –broker-list 192.168.65.60:9092 –topic test >this is a msg >this is a another msg
第六步:消費消息
對於consumer,kafka同樣也攜帶了一個命令行客戶端,會將獲取到內容在命令中進行輸出,默認是消費最新的消息:
bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –topic test
如果想要消費之前的消息可以通過–from-beginning參數指定,如下命令:
bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –from-beginning –topic test
如果你是通過不同的終端窗口來運行以上的命令,你將會看到在producer終端輸入的內容,很快就會在consumer的終端窗口上顯示出來。
以上所有的命令都有一些附加的選項;當我們不攜帶任何參數運行命令的時候,將會顯示出這個命令的詳細用法。
消費多主題
bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –whitelist “test|test-2”
單播消費
一條消息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組裡即可
分別在兩個客戶端執行如下消費命令,然後往主題里發送消息,結果只有一個客戶端能收到消息
bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –consumer-property group.id=testGroup –topic test
多播消費
一條消息能被多個消費者消費的模式,類似publish-subscribe模式費,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現多播只要保證這些消費者屬於不同的消費組即可。我們再增加一個消費者,該消費者屬於testGroup-2消費組,結果兩個客戶端都能收到消息
bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –consumer-property group.id=testGroup-2 –topic test
查看消費組名
bin/kafka-consumer-groups.sh –bootstrap-server 192.168.65.60:9092 –list
查看消費組的消費偏移量
bin/kafka-consumer-groups.sh –bootstrap-server 192.168.65.60:9092 –describe –group testGroup

current-offset:當前消費組的已消費偏移量
log-end-offset:主題對應分區消息的結束偏移量(HW)
lag:當前消費組未消費的消息數
主題Topic和消息日誌Log
可以理解Topic是一個類別的名稱,同類消息發送到同一個Topic下面。對於每一個Topic,下面可以有多個分區(Partition)日誌文件:

Partition是一個有序的message序列,這些message按順序添加到一個叫做commit log的文件中。每個partition中的消息都有一個唯一的編號,稱之為offset,用來唯一標示某個分區中的message。
每個partition,都對應一個commit log文件。一個partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。
kafka一般不會刪除消息,不管這些消息有沒有被消費。只會根據配置的日誌保留時間(log.retention.hours)確認消息多久被刪除,默認保留最近一周的日誌消息。kafka的性能與保留的消息數據量大小沒有關係,因此保存大量的數據消息日誌信息不會有什麼影響。
每個consumer是基於自己在commit log中的消費進度(offset)來進行工作的。在kafka中,消費offset由consumer自己來維護;一般情況下我們按照順序逐條消費commit log中的消息,當然我可以通過指定offset來重複消費某些消息,或者跳過某些消息。
這意味kafka中的consumer對集群的影響是非常小的,添加一個或者減少一個consumer,對於集群或者其他consumer來說,都是沒有影響的,因為每個consumer維護各自的消費offset。
創建多個分區的主題:
bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 1 –partitions 2 –topic test1
查看下topic的情況
bin/kafka-topics.sh –describe –zookeeper 192.168.65.60:2181 –topic test1

- leader節點負責給定partition的所有讀寫請求。
- replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
- isr 是replicas的一個子集,它只列出當前還存活着的,並且已同步備份了該partition的節點。
增加topic的分區數量(目前kafka不支持減少分區):
bin/kafka-topics.sh -alter –partitions 3 –zookeeper 192.168.65.60:2181 –topic test
可以這麼來理解Topic,Partition和Broker
一個topic,代表邏輯上的一個業務數據集,比如按數據庫里不同表的數據操作消息區分放入不同topic,訂單相關操作消息放入訂單topic,用戶相關操作消息放入用戶topic,對於大型網站來說,後端數據都是海量的,訂單消息很可能是非常巨量的,比如有幾百個G甚至達到TB級別,如果把這麼多數據都放在一台機器上肯定會有容量限制問題,那麼就可以在topic內部劃分多個partition來分片存儲數據,不同的partition可以位於不同的機器上,每台機器上都運行一個Kafka的進程Broker。
為什麼要對Topic下數據進行分區存儲?
1、commit log文件會受到所在機器的文件系統大小的限制,分區之後可以將不同的分區放在不同的機器上,相當於對數據做了分布式存儲,理論上一個topic可以處理任意數量的數據。
2、為了提高並行度。
kafka集群實戰
對於kafka來說,一個單獨的broker意味着kafka集群中只有一個節點。要想增加kafka集群中的節點數量,只需要多啟動幾個broker實例即可。為了有更好的理解,現在我們在一台機器上同時啟動三個broker實例。
首先,我們需要建立好其他2個broker的配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
配置文件的需要修改的內容分別如下:
config/server-1.properties:
#broker.id屬性在kafka集群中必須要是唯一 broker.id=1 #kafka部署的機器ip和提供服務的端口號 listeners=
PLAINTEXT://192.168.65.60:9093 log.dir=/usr/local/data/kafka-logs-1 #kafka連接zookeeper的地址,要把多個kafka實例組成集群,對應連接的zookeeper必須相同 zookeeper.connect=192.168.65.60:2181
config/server-2.properties:
broker.id=2 listeners=PLAINTEXT://192.168.65.60:9094 log.dir=/usr/local/data/kafka-logs-2 zookeeper.connect=192.168.65.60:2181
目前我們已經有一個zookeeper實例和一個broker實例在運行了,現在我們只需要在啟動2個broker實例即可:
bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties
查看zookeeper確認集群節點是否都註冊成功:

創建一個新的topic,副本數設置為3,分區數設置為2:
bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 3 –partitions 2 –topic my-replicated-topic
查看下topic的情況
bin/kafka-topics.sh –describe –zookeeper 192.168.65.60:2181 –topic my-replicated-topic

- leader節點負責給定partition的所有讀寫請求,同一個主題不同分區leader副本一般不一樣(為了容災)
- replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
- isr 是replicas的一個子集,它只列出當前還存活着的,並且已同步備份了該partition的節點。
kafka將很多集群關鍵信息記錄在zookeeper里,保證自己的無狀態,從而在水平擴容時非常方便。
集群消費
log的partitions分布在kafka集群中不同的broker上,每個broker可以請求備份其他broker上partition上的數據。kafka集群支持配置一個partition備份的數量。
針對每個partition,都有一個broker起到“leader”的作用,0個或多個其他的broker作為“follwers”的作用。leader處理所有的針對這個partition的讀寫請求,而followers被動複製leader的結果,不提供讀寫(主要是為了保證多副本數據與消費的一致性)。如果這個leader失效了,其中的一個follower將會自動地變成新的leader。
Producers
生產者將消息發送到topic中去,同時負責選擇將message發送到topic的哪一個partition中。通過round-robin做簡單的負載均衡。也可以根據消息中的某一個關鍵字來進行區分。通常第二種方式使用得更多。
Consumers
傳統的消息傳遞模式有2種:隊列( queue) 和(publish-subscribe)
- queue模式:多個consumer從服務器中讀取數據,消息只會到達一個consumer。
- publish-subscribe模式:消息會被廣播給所有的consumer。
Kafka基於這2種模式提供了一種consumer的抽象概念:consumer group。
- queue模式:所有的consumer都位於同一個consumer group 下。
- publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。
由2個broker組成的kafka集群,某個主題總共有4個partition(P0-P3),分別位於不同的broker上。這個集群由2個Consumer Group消費, A有2個consumer instances ,B有4個。
通常一個topic會有幾個consumer group,每個consumer group都是一個邏輯上的訂閱者( logical subscriber )。每個consumer group由多個consumer instance組成,從而達到可擴展和容災的功能。
消費順序
一個partition同一個時刻在一個consumer group中只能有一個consumer instance在消費,從而保證消費順序。
consumer group中的consumer instance的數量不能比一個Topic中的partition的數量多,否則,多出來的consumer消費不到消息。
Kafka只在partition的範圍內保證消息消費的局部順序性,不能在同一個topic中的多個partition中保證總的消費順序性。
如果有在總體上保證消費順序的需求,那麼我們可以通過將topic的partition數量設置為1,將consumer group中的consumer instance數量也設置為1,但是這樣會影響性能,所以kafka的順序消費很少用。
Java客戶端訪問Kafka
引入maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
消息發送端代碼(生產者)
package com.yundasys.usercenter.collect.api.vo.req;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @program: usercenter-portrait-collect
* @description: MsgProducer
* @author: yxh-word
* @create: 2021-07-14
* @version: v1.0.0 創建文件, yxh-word, 2021-07-14
**/
public class MsgProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
/*
發出消息持久化機制參數
(1)acks=0: 表示producer不需要等待任何broker確認收到消息的回復,就可以繼續發送下一條消息。性能最高,但是最容易丟消息。
(2)acks=1: 至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一
條消息。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
(3)acks=-1或all: 需要等待 min.insync.replicas(默認為1,推薦配置大於等於2) 這個參數配置的副本個數都成功寫入日誌,這種策略會保證
只要有一個備份存活就不會丟失數據。這是最強的數據保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。
*/
/*props.put(ProducerConfig.ACKS_CONFIG, "1");
*//*
發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重複發送,比如網絡抖動,所以需要在
接收者那邊做好消息接收的冪等性處理
*//*
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重試間隔設置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
//設置發送消息的本地緩衝區,如果設置了該緩衝區,消息會先發送到本地緩衝區,可以提高消息發送性能,默認值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
*//*
kafka本地線程會從緩衝區取數據,批量發送到broker,
設置批量發送消息的大小,默認值是16384,即16kb,就是說一個batch滿了16kb就發送出去
*//*
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
*//*
默認值是0,意思就是消息必須立即被發送,但這樣會影響性能
一般設置10毫秒左右,就是說這個消息發送完後會進入本地的一個batch,如果10毫秒內,這個batch滿了16kb就會隨batch一起被發送出去
如果10毫秒內,batch沒滿,那麼也必須把消息發送出去,不能讓消息的發送延遲時間太長
*//*
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/
//把發送的key從字符串序列化為字節數組
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把發送消息value從字符串序列化為字節數組
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int msgNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 1; i <= msgNum; i++) {
Order order = new Order(i, 100 + i, 1, 1000.00);
//指定發送分區
/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
//未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息發送成功的同步阻塞方法
/*RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());*/
//異步回調方式發送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發送消息失敗:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
}
});
//送積分 TODO
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
}
消費者代碼
package com.yundasys.usercenter.collect.api.vo.req;
import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @program: usercenter-portrait-collect
* @description: MsgConsumer
* @author: yxh-word
* @create: 2021-07-14
* @version: v1.0.0 創建文件, yxh-word, 2021-07-14
**/
public class MsgConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
// 消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那麼應該如何消費
latest(默認) :只消費自己啟動之後發送到主題的消息
earliest:第一次從頭開始消費,以後按照消費offset記錄繼續消費,這個需要區別於consumer.seekToBeginning(每次都從頭開始消費)
*/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/*
consumer給broker發送心跳的間隔時間,broker接收到心跳如果此時有rebalance發生會通過心跳響應將
rebalance方案下發給consumer,這個時間可以稍微短一點
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/*
服務端broker多久感知不到一個consumer心跳就認為他故障了,會將其踢出消費組,
對應的Partition也會被重新分配給其他consumer,默認是10秒
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的條數,如果消費者處理速度很快,可以設置大點,如果處理速度一般,可以設置小點
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/*
如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,
會將其踢出消費組,將分區分配給別的consumer消費
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消費指定分區
//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消費
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/
//指定offset消費
/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/
//從指定時間點開始消費
/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//從1小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根據消費里的timestamp確定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}*/
while (true) {
/*
* poll() API 是拉取消息的長輪詢
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
/*if (records.count() > 0) {
// 手動同步提交offset,當前線程會阻塞直到offset提交成功
// 一般使用同步提交,因為提交之後一般也沒有什麼邏輯代碼了
consumer.commitSync();
// 手動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理後面的程序邏輯
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}*/
}
}
}
Spring Boot整合Kafka
引入spring boot kafka依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
application.yml配置如下:
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
producer: # 生產者
retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
# RECORD
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
# BATCH
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
# TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
# COUNT
# TIME | COUNT 有一個條件滿足時提交
# COUNT_TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
# MANUAL
# 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
生產者代碼:
package com.yundasys.usercenter.collect.api.vo.req;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* @program: usercenter-portrait-collect
* @description: KafkaController
* @author: yxh-word
* @create: 2021-07-14
* @version: v1.0.0 創建文件, yxh-word, 2021-07-14
**/
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
}
消費者代碼:
package com.yundasys.usercenter.collect.api.vo.req;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
/**
* @program: usercenter-portrait-collect
* @description: MyConsumer
* @author: yxh-word
* @create: 2021-07-14
* @version: v1.0.0 創建文件, yxh-word, 2021-07-14
**/
public class MyConsumer {
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同組下的消費者個數,就是並發消費數,必須小於等於分區總數
* @param record
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動提交offset
ack.acknowledge();
}
/*//配置多個消費組
@KafkaListener(topics = "my-replicated-topic",groupId = "likeGroup")
public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
ack.acknowledge();
}*/
}
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/251173.html