kafka簡單的入門案例「kafka應用實例」

kafka快速實戰與基本原理詳解

背景

Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協 調的分布式消息系統,它的最大的特性就是可以實時地處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統 統、低延遲的實時系統、Storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫, Linkedin於2010年貢獻給了Apache基金會並成為頂級開源 項目。

kafka快速實戰與基本原理詳解

Kafka基本概念

kafka是一個分布式的,分區的消息(官方稱之為commit log)服務。它提供一個消息系統應該具備的功能,但是確有着獨特的設計。可以這樣來說,Kafka借鑒了JMS規範的思想,但是確並沒有完全遵循JMS規範。

首先,讓我們來看一下基礎的消息(Message)相關術語

名稱解釋
Broker消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka集群
TopicKafka根據topic對消息進行歸類,發布到Kafka集群的每條消息都需要指定一個topic
Producer消息生產者,向Broker發送消息的客戶端
Consumer消息消費者,從Broker讀取消息的客戶端
ConsumerGroup每個Consumer屬於一個特定的Consumer Group,一條消息可以被多個不同的Consumer Group消費,但是一個Consumer Group中只能有一個Consumer能夠消費該消息
Partition物理上的概念,一個topic可以分為多個partition,每個partition內部消息是有序的

因此,從一個較高的層面上來看,producer通過網絡發送消息到Kafka集群,然後consumer來進行消費,如下圖:

kafka快速實戰與基本原理詳解

服務端(brokers)和客戶端(producer、consumer)之間通信通過TCP協議來完成.

kafka基本使用

安裝前的環境準備

由於Kafka是用Scala語言開發的,運行在JVM上,因此在安裝Kafka之前需要先安裝JDK。

yum install java-1.8.0-openjdk* -y

kafka依賴zookeeper,所以需要先安裝zookeeper

wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz

cd apache-zookeeper-3.5.8-bin

cp conf/zoo_sample.cfg conf/zoo.cfg # 啟動zookeeper bin/zkServer.sh

start bin/zkCli.sh

ls / #查看zk的根目錄相關節點

第一步:下載安裝包

下載2.4.1 release版本,並解壓:

wget
https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz # 2.11是scala的版本,2.4.1是kafka的版本

tar -xzf kafka_2.11-2.4.1.tgz

cd kafka_2.11-2.4.1

第二步:修改配置

修改配置文件config/server.properties:

#broker.id屬性在kafka集群中必須要是唯一 broker.id=0 #kafka部署的機器ip和提供服務的端口號 listeners=
PLAINTEXT://192.168.65.60:9092 #kafka的消息存儲文件 log.dir=/usr/local/data/kafka-logs #kafka連接zookeeper的地址 zookeeper.connect=192.168.65.60:2181

第三步:啟動服務

現在來啟動kafka服務:

啟動腳本語法:kafka-server-start.sh [-daemon] server.properties

可以看到,server.properties的配置路徑是一個強制的參數,-daemon表示以後台進程運行,否則ssh客戶端退出後,就會停止服務。(注意,在啟動kafka時會使用linux主機名關聯的ip地址,所以需要把主機名和linux的ip映射配置到本地host里,用vim /etc/hosts)

# 啟動kafka,運行日誌在logs目錄的server.log文件里

bin/kafka-server-start.sh -daemon config/server.properties

#後台啟動,不會打印日誌到控制台 或者用

bin/kafka-server-start.sh config/server.properties & # 我們進入zookeeper目錄通過zookeeper客戶端查看下zookeeper的目錄樹 bin/zkCli.sh ls / #查看zk的根目錄kafka相關節點

ls /brokers/ids #查看kafka節點 # 停止kafka bin/kafka-server-stop.sh

server.properties核心配置詳解:

PropertyDefaultDescription
broker.id0每個broker都可以用一個唯一的非負整數id進行標識;這個id可以作為broker的“名字”,你可以選擇任意你喜歡的數字作為id,只要id是唯一的即可。
log.dirs/tmp/kafka-logskafka存放數據的路徑。這個路徑並不是唯一的,可以是多個,路徑之間只需要使用逗號分隔即可;每當創建新partition時,都會選擇在包含最少partitions的路徑下進行。
listenersPLAINTEXT://192.168.65.60:9092server接受客戶端連接的端口,ip配置kafka本機ip即可
zookeeper.connectlocalhost:2181zooKeeper連接字符串的格式為:hostname:port,此處hostname和port分別是ZooKeeper集群中某個節點的host和port;zookeeper如果是集群,連接方式為 hostname1:port1, hostname2:port2, hostname3:port3
log.retention.hours168每個日誌文件刪除之前保存的時間。默認數據保存時間對所有topic都一樣。
num.partitions1創建topic的默認分區數
default.replication.factor1自動創建topic的默認副本數量,建議設置為大於等於2
min.insync.replicas1當producer設置acks為-1時,min.insync.replicas指定replicas的最小數目(必須確認每一個repica的寫數據都是成功的),如果這個數目沒有達到,producer發送消息會產生異常
delete.topic.enablefalse是否允許刪除主題

第四步:創建主題

現在我們來創建一個名字為“test”的Topic,這個topic只有一個partition,並且備份因子也設置為1:

bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 1 –partitions 1 –topic test

現在我們可以通過以下命令來查看kafka中目前存在的topic

bin/kafka-topics.sh –list –zookeeper 192.168.65.60:2181

除了我們通過手工的方式創建Topic,當producer發布一個消息到某個指定的Topic,這個Topic如果不存在,就自動創建。

刪除主題

bin/kafka-topics.sh –delete –topic test –zookeeper 192.168.65.60:2181

第五步:發送消息

kafka自帶了一個producer命令客戶端,可以從本地文件中讀取內容,或者我們也可以以命令行中直接輸入內容,並將這些內容以消息的形式發送到kafka集群中。在默認情況下,每一個行會被當做成一個獨立的消息。

首先我們要運行發布消息的腳本,然後在命令中輸入要發送的消息的內容:

bin/kafka-console-producer.sh –broker-list 192.168.65.60:9092 –topic test >this is a msg >this is a another msg

第六步:消費消息

對於consumer,kafka同樣也攜帶了一個命令行客戶端,會將獲取到內容在命令中進行輸出,默認是消費最新的消息

bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –topic test

如果想要消費之前的消息可以通過–from-beginning參數指定,如下命令:

bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –from-beginning –topic test

如果你是通過不同的終端窗口來運行以上的命令,你將會看到在producer終端輸入的內容,很快就會在consumer的終端窗口上顯示出來。

以上所有的命令都有一些附加的選項;當我們不攜帶任何參數運行命令的時候,將會顯示出這個命令的詳細用法。

消費多主題

bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –whitelist “test|test-2”

單播消費

一條消息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組裡即可

分別在兩個客戶端執行如下消費命令,然後往主題里發送消息,結果只有一個客戶端能收到消息

bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –consumer-property group.id=testGroup –topic test

多播消費

一條消息能被多個消費者消費的模式,類似publish-subscribe模式費,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現多播只要保證這些消費者屬於不同的消費組即可。我們再增加一個消費者,該消費者屬於testGroup-2消費組,結果兩個客戶端都能收到消息

bin/kafka-console-consumer.sh –bootstrap-server 192.168.65.60:9092 –consumer-property group.id=testGroup-2 –topic test

查看消費組名

bin/kafka-consumer-groups.sh –bootstrap-server 192.168.65.60:9092 –list

查看消費組的消費偏移量

bin/kafka-consumer-groups.sh –bootstrap-server 192.168.65.60:9092 –describe –group testGroup

kafka快速實戰與基本原理詳解

current-offset:當前消費組的已消費偏移量

log-end-offset:主題對應分區消息的結束偏移量(HW)

lag:當前消費組未消費的消息數

主題Topic和消息日誌Log

可以理解Topic是一個類別的名稱,同類消息發送到同一個Topic下面。對於每一個Topic,下面可以有多個分區(Partition)日誌文件:

kafka快速實戰與基本原理詳解

Partition是一個有序的message序列,這些message按順序添加到一個叫做commit log的文件中。每個partition中的消息都有一個唯一的編號,稱之為offset,用來唯一標示某個分區中的message。

每個partition,都對應一個commit log文件。一個partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。

kafka一般不會刪除消息,不管這些消息有沒有被消費。只會根據配置的日誌保留時間(log.retention.hours)確認消息多久被刪除,默認保留最近一周的日誌消息。kafka的性能與保留的消息數據量大小沒有關係,因此保存大量的數據消息日誌信息不會有什麼影響。

每個consumer是基於自己在commit log中的消費進度(offset)來進行工作的。在kafka中,消費offset由consumer自己來維護;一般情況下我們按照順序逐條消費commit log中的消息,當然我可以通過指定offset來重複消費某些消息,或者跳過某些消息。

這意味kafka中的consumer對集群的影響是非常小的,添加一個或者減少一個consumer,對於集群或者其他consumer來說,都是沒有影響的,因為每個consumer維護各自的消費offset。

創建多個分區的主題:

bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 1 –partitions 2 –topic test1

查看下topic的情況

bin/kafka-topics.sh –describe –zookeeper 192.168.65.60:2181 –topic test1

kafka快速實戰與基本原理詳解
  • leader節點負責給定partition的所有讀寫請求。
  • replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
  • isr 是replicas的一個子集,它只列出當前還存活着的,並且已同步備份了該partition的節點。

增加topic的分區數量(目前kafka不支持減少分區)

bin/kafka-topics.sh -alter –partitions 3 –zookeeper 192.168.65.60:2181 –topic test

可以這麼來理解Topic,Partition和Broker

一個topic,代表邏輯上的一個業務數據集,比如按數據庫里不同表的數據操作消息區分放入不同topic,訂單相關操作消息放入訂單topic,用戶相關操作消息放入用戶topic,對於大型網站來說,後端數據都是海量的,訂單消息很可能是非常巨量的,比如有幾百個G甚至達到TB級別,如果把這麼多數據都放在一台機器上肯定會有容量限制問題,那麼就可以在topic內部劃分多個partition來分片存儲數據,不同的partition可以位於不同的機器上,每台機器上都運行一個Kafka的進程Broker。

為什麼要對Topic下數據進行分區存儲?

1、commit log文件會受到所在機器的文件系統大小的限制,分區之後可以將不同的分區放在不同的機器上,相當於對數據做了分布式存儲,理論上一個topic可以處理任意數量的數據。

2、為了提高並行度

kafka集群實戰

對於kafka來說,一個單獨的broker意味着kafka集群中只有一個節點。要想增加kafka集群中的節點數量,只需要多啟動幾個broker實例即可。為了有更好的理解,現在我們在一台機器上同時啟動三個broker實例。

首先,我們需要建立好其他2個broker的配置文件:

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

配置文件的需要修改的內容分別如下:

config/server-1.properties:

#broker.id屬性在kafka集群中必須要是唯一 broker.id=1 #kafka部署的機器ip和提供服務的端口號 listeners=
PLAINTEXT://192.168.65.60:9093 log.dir=/usr/local/data/kafka-logs-1 #kafka連接zookeeper的地址,要把多個kafka實例組成集群,對應連接的zookeeper必須相同 zookeeper.connect=192.168.65.60:2181

config/server-2.properties:

broker.id=2 listeners=PLAINTEXT://192.168.65.60:9094 log.dir=/usr/local/data/kafka-logs-2 zookeeper.connect=192.168.65.60:2181

目前我們已經有一個zookeeper實例和一個broker實例在運行了,現在我們只需要在啟動2個broker實例即可:

bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties

查看zookeeper確認集群節點是否都註冊成功:

kafka快速實戰與基本原理詳解

創建一個新的topic,副本數設置為3,分區數設置為2:

bin/kafka-topics.sh –create –zookeeper 192.168.65.60:2181 –replication-factor 3 –partitions 2 –topic my-replicated-topic

查看下topic的情況

bin/kafka-topics.sh –describe –zookeeper 192.168.65.60:2181 –topic my-replicated-topic

kafka快速實戰與基本原理詳解
  • leader節點負責給定partition的所有讀寫請求,同一個主題不同分區leader副本一般不一樣(為了容災)
  • replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
  • isr 是replicas的一個子集,它只列出當前還存活着的,並且已同步備份了該partition的節點。

kafka將很多集群關鍵信息記錄在zookeeper里,保證自己的無狀態,從而在水平擴容時非常方便。

集群消費

log的partitions分布在kafka集群中不同的broker上,每個broker可以請求備份其他broker上partition上的數據。kafka集群支持配置一個partition備份的數量。

針對每個partition,都有一個broker起到“leader”的作用,0個或多個其他的broker作為“follwers”的作用。leader處理所有的針對這個partition的讀寫請求,而followers被動複製leader的結果,不提供讀寫(主要是為了保證多副本數據與消費的一致性)。如果這個leader失效了,其中的一個follower將會自動地變成新的leader。

Producers

生產者將消息發送到topic中去,同時負責選擇將message發送到topic的哪一個partition中。通過round-robin做簡單的負載均衡。也可以根據消息中的某一個關鍵字來進行區分。通常第二種方式使用得更多。

Consumers

傳統的消息傳遞模式有2種:隊列( queue) 和(publish-subscribe)

  • queue模式:多個consumer從服務器中讀取數據,消息只會到達一個consumer。
  • publish-subscribe模式:消息會被廣播給所有的consumer。

Kafka基於這2種模式提供了一種consumer的抽象概念:consumer group。

  • queue模式:所有的consumer都位於同一個consumer group 下。
  • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

由2個broker組成的kafka集群,某個主題總共有4個partition(P0-P3),分別位於不同的broker上。這個集群由2個Consumer Group消費, A有2個consumer instances ,B有4個。

通常一個topic會有幾個consumer group,每個consumer group都是一個邏輯上的訂閱者( logical subscriber )。每個consumer group由多個consumer instance組成,從而達到可擴展和容災的功能。

消費順序

一個partition同一個時刻在一個consumer group中只能有一個consumer instance在消費,從而保證消費順序。

consumer group中的consumer instance的數量不能比一個Topic中的partition的數量多,否則,多出來的consumer消費不到消息。

Kafka只在partition的範圍內保證消息消費的局部順序性,不能在同一個topic中的多個partition中保證總的消費順序性。

如果有在總體上保證消費順序的需求,那麼我們可以通過將topic的partition數量設置為1,將consumer group中的consumer instance數量也設置為1,但是這樣會影響性能,所以kafka的順序消費很少用。

Java客戶端訪問Kafka

引入maven依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>

消息發送端代碼(生產者)

package com.yundasys.usercenter.collect.api.vo.req;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import org.apache.kafka.clients.producer.*;


import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @program: usercenter-portrait-collect
 * @description: MsgProducer
 * @author: yxh-word
 * @create: 2021-07-14
 * @version: v1.0.0 創建文件, yxh-word, 2021-07-14
 **/
public class MsgProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
         /*
         發出消息持久化機制參數
        (1)acks=0: 表示producer不需要等待任何broker確認收到消息的回復,就可以繼續發送下一條消息。性能最高,但是最容易丟消息。
        (2)acks=1: 至少要等待leader已經成功將數據寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一
             條消息。這種情況下,如果follower沒有成功備份數據,而此時leader又掛掉,則消息會丟失。
        (3)acks=-1或all: 需要等待 min.insync.replicas(默認為1,推薦配置大於等於2) 這個參數配置的副本個數都成功寫入日誌,這種策略會保證
            只要有一個備份存活就不會丟失數據。這是最強的數據保證。一般除非是金融級別,或跟錢打交道的場景才會使用這種配置。
         */
        /*props.put(ProducerConfig.ACKS_CONFIG, "1");
         *//*
        發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重複發送,比如網絡抖動,所以需要在
        接收者那邊做好消息接收的冪等性處理
        *//*
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //重試間隔設置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        //設置發送消息的本地緩衝區,如果設置了該緩衝區,消息會先發送到本地緩衝區,可以提高消息發送性能,默認值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        *//*
        kafka本地線程會從緩衝區取數據,批量發送到broker,
        設置批量發送消息的大小,默認值是16384,即16kb,就是說一個batch滿了16kb就發送出去
        *//*
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        *//*
        默認值是0,意思就是消息必須立即被發送,但這樣會影響性能
        一般設置10毫秒左右,就是說這個消息發送完後會進入本地的一個batch,如果10毫秒內,這個batch滿了16kb就會隨batch一起被發送出去
        如果10毫秒內,batch沒滿,那麼也必須把消息發送出去,不能讓消息的發送延遲時間太長
        *//*
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/
        //把發送的key從字符串序列化為字節數組
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把發送消息value從字符串序列化為字節數組
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        int msgNum = 5;
        final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
        for (int i = 1; i <= msgNum; i++) {
            Order order = new Order(i, 100 + i, 1, 1000.00);
            //指定發送分區
            /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
            //未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));

            //等待消息發送成功的同步阻塞方法
            /*RecordMetadata metadata = producer.send(producerRecord).get();
            System.out.println("同步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());*/

            //異步回調方式發送消息
            producer.send(producerRecord, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("發送消息失敗:" + exception.getStackTrace());

                    }
                    if (metadata != null) {
                        System.out.println("異步方式發送消息結果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                    countDownLatch.countDown();
                }
            });

            //送積分 TODO

        }

        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.close();
    }
}

消費者代碼

package com.yundasys.usercenter.collect.api.vo.req;

import com.fasterxml.jackson.databind.deser.std.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @program: usercenter-portrait-collect
 * @description: MsgConsumer
 * @author: yxh-word
 * @create: 2021-07-14
 * @version: v1.0.0 創建文件, yxh-word, 2021-07-14
 **/
public class MsgConsumer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
        // 消費分組名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        // 是否自動提交offset,默認就是true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自動提交offset的間隔時間
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        /*
        當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那麼應該如何消費
        latest(默認) :只消費自己啟動之後發送到主題的消息
        earliest:第一次從頭開始消費,以後按照消費offset記錄繼續消費,這個需要區別於consumer.seekToBeginning(每次都從頭開始消費)
        */
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      /*
      consumer給broker發送心跳的間隔時間,broker接收到心跳如果此時有rebalance發生會通過心跳響應將
      rebalance方案下發給consumer,這個時間可以稍微短一點
      */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        /*
        服務端broker多久感知不到一個consumer心跳就認為他故障了,會將其踢出消費組,
        對應的Partition也會被重新分配給其他consumer,默認是10秒
        */
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        //一次poll最大拉取消息的條數,如果消費者處理速度很快,可以設置大點,如果處理速度一般,可以設置小點
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        /*
        如果兩次poll操作間隔超過了這個時間,broker就會認為這個consumer處理能力太弱,
        會將其踢出消費組,將分區分配給別的consumer消費
        */
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        // 消費指定分區
        //consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

        //消息回溯消費
        /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/

        //指定offset消費
        /*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
        consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/

        //從指定時間點開始消費
        /*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
        //從1小時前開始消費
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
        Map<TopicPartition, Long> map = new HashMap<>();
        for (PartitionInfo par : topicPartitions) {
            map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
        }
        Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if (key == null || value == null) continue;
            Long offset = value.offset();
            System.out.println("partition-" + key.partition() + "|offset-" + offset);
            System.out.println();
            //根據消費里的timestamp確定offset
            if (value != null) {
                consumer.assign(Arrays.asList(key));
                consumer.seek(key, offset);
            }
        }*/

        while (true) {
            /*
             * poll() API 是拉取消息的長輪詢
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                        record.offset(), record.key(), record.value());
            }

            /*if (records.count() > 0) {
                // 手動同步提交offset,當前線程會阻塞直到offset提交成功
                // 一般使用同步提交,因為提交之後一般也沒有什麼邏輯代碼了
                consumer.commitSync();

                // 手動異步提交offset,當前線程提交offset不會阻塞,可以繼續處理後面的程序邏輯
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit failed for " + offsets);
                            System.err.println("Commit failed exception: " + exception.getStackTrace());
                        }
                    }
                });

            }*/
        }
    }
}

Spring Boot整合Kafka

引入spring boot kafka依賴:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

application.yml配置如下:

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
    producer: # 生產者
      retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
      # RECORD
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
      # BATCH
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
      # TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
      # COUNT
      # TIME | COUNT 有一個條件滿足時提交
      # COUNT_TIME
      # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
      # MANUAL
      # 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生產者代碼:

package com.yundasys.usercenter.collect.api.vo.req;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * @program: usercenter-portrait-collect
 * @description: KafkaController
 * @author: yxh-word
 * @create: 2021-07-14
 * @version: v1.0.0 創建文件, yxh-word, 2021-07-14
 **/
public class KafkaController {
    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
    }
}

消費者代碼:

package com.yundasys.usercenter.collect.api.vo.req;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

/**
 * @program: usercenter-portrait-collect
 * @description: MyConsumer
 * @author: yxh-word
 * @create: 2021-07-14
 * @version: v1.0.0 創建文件, yxh-word, 2021-07-14
 **/
public class MyConsumer {
    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同組下的消費者個數,就是並發消費數,必須小於等於分區總數
     * @param record
     */
    @KafkaListener(topics = "my-replicated-topic",groupId = "yundaGroup")
    public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手動提交offset
        ack.acknowledge();
    }

    /*//配置多個消費組
    @KafkaListener(topics = "my-replicated-topic",groupId = "likeGroup")
    public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        ack.acknowledge();
    }*/
}

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/251173.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-13 17:22
下一篇 2024-12-13 17:22

相關推薦

發表回復

登錄後才能評論