java調用kafka生產者,java調用kafka介面發送數據

本文目錄一覽:

怎麼使用java連接kafka

把你要傳遞的數據轉換成json字元串返回介面,然後手機端調用介面就可以獲取到你要傳遞是值了

Kafka簡介+Kafka Tool使用簡介+使用實例

詳細安裝訪問:

macOS 可以用homebrew快速安裝,訪問地址:

原文鏈接:

查看topic列表:

創建topic:

–create :創建命令;

–topic :後面指定topic名稱;

–replication-factor :後面指定副本數;

–partitions :指定分區數,根據broker的數量決定;

–zookeeper :後面指定 zookeeper.connect 的zk鏈接

查看某個topic:

Kafka 作為消息系統的一種, 當然可 以像其他消 息中 間件一樣作為消息數據中轉的平台。 下面以 Java 語言為例,看一下如何使用 Kafka 來發送和接收消息。

1、引入依賴

2、消息生產者

示例 中用 KafkaProducer 類來創建一個消息生產者,該類的構造函數入參是一系列屬性值。下面看一下這些屬性具體都是什麼含義。

bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理伺服器,則伺服器地址之間用逗號分隔, 比如」 192.168.1.1 :9092,192.168.1.2:9092」 。 localhost 是筆者電腦的地址,9092 是 Kafka 伺服器默認監聽的埠號。

key.serializer 和 value.serializer 表示消息的序列化類型 。 Kafka 的消息是以鍵值對的形式發送到 Kafka 伺服器的,在消息被發送到伺服器之前,消息生產者需要把不同類型的 消息序列化為 二 進位類型,示例中是發送文本消息到伺服器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化類型。把來自 Kafka 集群的二進位消 息反序列 化 為指定 的 類型,因為序列化用的是String類型,所以用StringDeserializer 來反序列化。

zk.connect 用於指定 Kafka 連接 ZooKeeper 的 URL ,提供了基於 ZooKeeper 的集群伺服器自動感知功能, 可以動態從 ZooKeeper 中讀取 Kafka 集群配置信息。

有 了 消息生產者之後 , 就可以調用 send 方法發送消息了。該方法的入參是 ProducerRecord類型對象 , ProducerRecord 類提供了多種構造函數形參,常見的有如下三種 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可選的 。如果指定了 pa時tion,那麼消息會被發送至指定的 partition ;如果沒指定 partition 但指定了 Key,那麼消息會按照 hash(key)發送至對應的 partition: 如果既沒指定 partition 也沒指定 key,那麼 消息會按照 round-robin 模式發送(即以輪詢的方式依次發送〉到每一個 partition。示例中將向 test-topic 主題發送三條消息。

3、消息消費者

和消息生產者類似,這裡用 KafkaConsumer 類來創建一個消息消費者,該類的構造函數入參也是一系列屬性值。

bootstrap. servers 和生產者一樣,表示 Kafka 集群。

group.id 表示消費者的分組 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 動提交 。

auto.commit.interval .ms 用於設置自動提交 offset 到 ZooKeeper 的時間間隔,時間單位是毫秒。

key. deserializer 和 value.deserializer 表示用字元串來反序列化消息數據。

消息消費者使用 subscribe 方法 訂閱了 Topic 為 test-topic 的消息。 Consumer 調用poll 方法來輪詢 Kafka 集群的消息, 一直等到 Kafka 集群中沒有消息或達到超時時間(示例中設置超時時間為 100 毫秒)為止 。 如果讀取到消息,則列印出消息記錄的 pa此ition, offset、key 等。

kafka集群測試正常,但是Java連接kafka出現異常,急求大神解答!!!!!!!!!!!

首先你在鏈接時候檢查是否代碼里的IP 和埠是不是對的,埠是broker 埠,默認9092 ;

其次查看代碼是生產者,看Kafka 集群里這個主題是否存在(如果不存在,默認是配置可以自動創建,看是非將該配置修改);然後檢測防火牆,相應埠是否開放(防火牆直接關也可以);檢測 server.properties 文件的 listeners 是否配置,若沒有將其配置好

Kafka系列之(4)——Kafka Producer流程解析

Kafka 0.9版本正式使用Java版本的producer替換了原Scala版本的producer。

註:ProducerRecord允許用戶在創建消息對象的時候就直接指定要發送的分區,這樣producer後續發送該消息時可以直接發送到指定分區,而不用先通過Partitioner計算目標分區了。另外,我們還可以直接指定消息的時間戳——但一定要慎重使用這個功能,因為它有可能會令時間戳索引機制失效。

流程描述:

用戶首先構建待發送的消息對象ProducerRecord,然後調用KafkaProducer#send方法進行發送。KafkaProducer接收到消息後首先對其進行序列化,然後結合本地緩存的元數據信息一起發送給partitioner去確定目標分區,最後追加寫入到內存中的消息緩衝池(accumulator)。此時KafkaProducer#send方法成功返回。同時,KafkaProducer中還有一個專門的Sender IO線程負責將緩衝池中的消息分批次發送給對應的broker,完成真正的消息發送邏輯。

新版本的producer從設計上來說具有以下幾個特點:

總共創建兩個線程:執行KafkaPrducer#send邏輯的線程——我們稱之為「用戶主線程」;執行發送邏輯的IO線程——我們稱之為「Sender線程」。

不同於Scala老版本的producer,新版本producer完全非同步發送消息,並提供了回調機制(callback)供用戶判斷消息是否成功發送。

batching機制——「分批發送「機制。每個批次(batch)中包含了若干個PRODUCE請求,因此具有更高的吞吐量。

更加合理的默認分區策略:對於無key消息而言,Scala版本分區策略是一段時間內(默認是10分鐘)將消息發往固定的目標分區,這容易造成消息分布的不均勻,而新版本的producer採用輪詢的方式均勻地將消息分發到不同的分區。

底層統一使用基於Selector的網路客戶端實現,結合Java提供的Future實現完整地提供了更加健壯和優雅的生命周期管理。

關鍵參數

batch.size 我把它列在了首位,因為該參數對於調優producer至關重要。之前提到過新版producer採用分批發送機制,該參數即控制一個batch的大小。默認是16KB

acks 關乎到消息持久性(durability)的一個參數。高吞吐量和高持久性很多時候是相矛盾的,需要先明確我們的目標是什麼? 高吞吐量?高持久性?亦或是中等?因此該參數也有對應的三個取值:0, -1和1

linger.ms 減少網路IO,節省帶寬之用。原理就是把原本需要多次發送的小batch,通過引入延時的方式合併成大batch發送,減少了網路傳輸的壓力,從而提升吞吐量。當然,也會引入延時

compression.type producer 所使用的壓縮器,目前支持gzip, snappy和lz4。壓縮是在用戶主線程完成的,通常都需要花費大量的CPU時間,但對於減少網路IO來說確實利器。生產環境中可以結合壓力測試進行適當配置

max.in.flight.requests.per.connection 關乎消息亂序的一個配置參數。它指定了Sender線程在單個Socket連接上能夠發送未應答PRODUCE請求的最大請求數。適當增加此值通常會增大吞吐量,從而整體上提升producer的性能。不過筆者始終覺得其效果不如調節batch.size來得明顯,所以請謹慎使用。另外如果開啟了重試機制,配置該參數大於1可能造成消息發送的亂序(先發送A,然後發送B,但B卻先行被broker接收)

retries 重試機制,對於瞬時失敗的消息發送,開啟重試後KafkaProducer會嘗試再次發送消息。對於有強烈無消息丟失需求的用戶來說,開啟重試機制是必選項。

當用戶調用KafkaProducer.send(ProducerRecord, Callback)時Kafka內部流程分析:

這是KafkaProducer#send邏輯的第一步,即為待發送消息進行序列化並計算目標分區,如下圖所示:

如上圖所示,一條所屬topic是”test”,消息體是”message”的消息被序列化之後結合KafkaProducer緩存的元數據(比如該topic分區數信息等)共同傳給後面的Partitioner實現類進行目標分區的計算。

producer創建時會創建一個默認32MB(由buffer.memory參數指定)的accumulator緩衝區,專門保存待發送的消息。除了之前在「關鍵參數」段落中提到的linger.ms和batch.size等參數之外,該數據結構中還包含了一個特別重要的集合信息:消息批次信息(batches)。該集合本質上是一個HashMap,裡面分別保存了每個topic分區下的batch隊列,即前面說的批次是按照topic分區進行分組的。這樣發往不同分區的消息保存在對應分區下的batch隊列中。舉個簡單的例子,假設消息M1, M2被發送到test的0分區但屬於不同的batch,M3分送到test的1分區,那麼batches中包含的信息就是:{“test-0” – [batch1, batch2], “test-1” – [batch3]}。

單個topic分區下的batch隊列中保存的是若干個消息批次。每個batch中最重要的3個組件包括:

compressor: 負責執行追加寫入操作

batch緩衝區:由batch.size參數控制,消息被真正追加寫入到的地方

thunks:保存消息回調邏輯的集合

這一步的目的就是將待發送的消息寫入消息緩衝池中,具體流程如下圖所示:

這一步執行完畢之後理論上講KafkaProducer.send方法就執行完畢了,用戶主線程所做的事情就是等待Sender線程發送消息並執行返回結果了。

此時,該Sender線程登場了。嚴格來說,Sender線程自KafkaProducer創建後就一直都在運行著 。它的工作流程基本上是這樣的:

不斷輪詢緩衝區尋找 已做好發送準備的分區 ;

將輪詢獲得的各個batch按照目標分區所在的leader broker進行分組;

將分組後的batch通過底層創建的 Socket連接 發送給各個broker;

等待伺服器端發送response回來。

為了說明上的方便,我還是基於圖的方式來解釋Sender線程的工作原理:

上圖中Sender線程會發送PRODUCE請求給對應的broker,broker處理完畢之後發送對應的PRODUCE response。一旦Sender線程接收到response將依次(按照消息發送順序)調用batch中的回調方法,如下圖所示:

refer:

3分鐘帶你徹底搞懂 Kafka

Kafka到底是個啥?用來幹嘛的?

官方定義如下:

翻譯過來,大致的意思就是,這是一個實時數據處理系統,可以橫向擴展,並高可靠!

實時數據處理 ,從名字上看,很好理解,就是將數據進行實時處理,在現在流行的微服務開發中,最常用實時數據處理平台有 RabbitMQ、RocketMQ 等消息中間件。

這些中間件,最大的特點主要有兩個:

在早期的 web 應用程序開發中,當請求量突然上來了時候,我們會將要處理的數據推送到一個隊列通道中,然後另起一個線程來不斷輪訓拉取隊列中的數據,從而加快程序的運行效率。

但是隨著請求量不斷的增大,並且隊列通道的數據一致處於高負載,在這種情況下,應用程序的內存佔用率會非常高,稍有不慎,會出現內存不足,造成程序內存溢出,從而導致服務不可用。

隨著業務量的不斷擴張,在一個應用程序內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ等中間件。

採用這種模型,本質就是將要推送的數據,不在存放在當前應用程序的內存中,而是將數據存放到另一個專門負責數據處理的應用程序中,從而實現服務解耦。

消息中間件 :主要的職責就是保證能接受到消息,並將消息存儲到磁碟,即使其他服務都掛了,數據也不會丟失,同時還可以對數據消費情況做好監控工作。

應用程序 :只需要將消息推送到消息中間件,然後啟用一個線程來不斷從消息中間件中拉取數據,進行消費確認即可!

引入消息中間件之後,整個服務開發會變得更加簡單,各負其責。

Kafka 本質其實也是消息中間件的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。

LinkedIn 的開發團隊,為了解決數據管道問題,起初採用了 ActiveMQ 來進行數據交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數據傳遞系統的要求,經常由於各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的消息傳遞系統, Kafka 由此誕生 。

在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經被業界所認可,並成為大數據流水線的首選技術。

先來看一張圖, 下面這張圖就是 kafka 生產與消費的核心架構模型 !

如果你看不懂這些概念沒關係,我會帶著大家一起梳理一遍!

簡而言之,kafka 本質就是一個消息系統,與大多數的消息系統一樣,主要的特點如下:

與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在於,它有一個**分區 Partition **的概念。

這個分區的意思就是說,如果你創建的 topic 有5個分區,當你一次性向 kafka 中推 1000 條數據時,這 1000 條數據默認會分配到 5 個分區中,其中每個分區存儲 200 條數據。

這樣做的目的,就是方便消費者從不同的分區拉取數據,假如你啟動 5 個線程同時拉取數據,每個線程拉取一個分區,消費速度會非常非常快!

這是 kafka 與其他的消息系統最大的不同!

和其他的中間件一樣,kafka 每次發送數據都是向 Leader 分區發送數據,並順序寫入到磁碟,然後 Leader 分區會將數據同步到各個從分區 Follower ,即使主分區掛了,也不會影響服務的正常運行。

那 kafka 是如何將數據寫入到對應的分區呢?kafka中有以下幾個原則:

與生產者一樣,消費者主動的去kafka集群拉取消息時,也是從 Leader 分區去拉取數據。

這裡我們需要重點了解一個名詞: 消費組 !

考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區的數據,同一個分區只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!

但是不同的組,可以消費同一個分區的數據!

你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。

但是,如果一個組下的消費者數量大於分區數量,就會出現很多的消費者閑置。

如果分區數量大於一個組下的消費者數量,會出現一個消費者負責多個分區的消費,會出現消費性能不均衡的情況。

因此,在實際的應用中,建議消費者組的 consumer 的數量與 partition 的數量保持一致!

光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。

kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。

zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk

下載zookeeper,並解壓文件包

創建數據、日誌目錄

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存儲路徑

最後,啟動 Zookeeper 服務

到官網 下載想要的版本,我這裡下載是最新穩定版 2.8.0 。

按需修改配置文件 server.properties (可選)

server.properties 文件內容如下:

其中有四個重要的參數:

可根據自己需求修改對應的配置!

啟動 kafka 服務

創建一個名為 testTopic 的主題,它只包含一個分區,只有一個副本:

運行 list topic 命令,可以看到該主題。

輸出內容:

Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。

運行生產者,然後在控制台中鍵入一些消息以發送到伺服器。

輸入兩條內容並回車:

Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。

輸出結果如下:

本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。

由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!

使用java實現kafka consumer時報錯

public static void consumer(){

        Properties props = new Properties();  

        props.put(“zk.connect”, “hadoop-2:2181”);  

        props.put(“zk.connectiontimeout.ms”, “1000000”);  

        props.put(“groupid”, “fans_group”);  

          

        // Create the connection to the cluster  

        ConsumerConfig consumerConfig = new ConsumerConfig(props);  

        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

          

        MapString, Integer map = new HashMapString, Integer();

        map.put(“fans”, 1);

        

        // create 4 partitions of the stream for topic 「test」, to allow 4 threads to consume  

        MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);  

        ListKafkaStreamMessage streams = topicMessageStreams.get(“fans”);  

          

        // create list of 4 threads to consume from each of the partitions   

        ExecutorService executor = Executors.newFixedThreadPool(1);  

        long startTime = System.currentTimeMillis();

        // consume the messages in the threads  

        for(final KafkaStreamMessage stream: streams) {  

          executor.submit(new Runnable() {  

            public void run() {  

                 ConsumerIteratorMessage it = stream.iterator();

                  while (it.hasNext()){

                      log.debug(byteBufferToString(it.next().message().payload()));

                  }

              } 

            

          }); 

          log.debug(“use time=”+(System.currentTimeMillis()-startTime));

        }  

    }

原創文章,作者:KEPL,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/141670.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
KEPL的頭像KEPL
上一篇 2024-10-08 17:53
下一篇 2024-10-08 17:56

相關推薦

  • Java JsonPath 效率優化指南

    本篇文章將深入探討Java JsonPath的效率問題,並提供一些優化方案。 一、JsonPath 簡介 JsonPath是一個可用於從JSON數據中獲取信息的庫。它提供了一種DS…

    編程 2025-04-29
  • java client.getacsresponse 編譯報錯解決方法

    java client.getacsresponse 編譯報錯是Java編程過程中常見的錯誤,常見的原因是代碼的語法錯誤、類庫依賴問題和編譯環境的配置問題。下面將從多個方面進行分析…

    編程 2025-04-29
  • Java Bean載入過程

    Java Bean載入過程涉及到類載入器、反射機制和Java虛擬機的執行過程。在本文中,將從這三個方面詳細闡述Java Bean載入的過程。 一、類載入器 類載入器是Java虛擬機…

    編程 2025-04-29
  • Python讀取CSV數據畫散點圖

    本文將從以下方面詳細闡述Python讀取CSV文件並畫出散點圖的方法: 一、CSV文件介紹 CSV(Comma-Separated Values)即逗號分隔值,是一種存儲表格數據的…

    編程 2025-04-29
  • Java騰訊雲音視頻對接

    本文旨在從多個方面詳細闡述Java騰訊雲音視頻對接,提供完整的代碼示例。 一、騰訊雲音視頻介紹 騰訊雲音視頻服務(Cloud Tencent Real-Time Communica…

    編程 2025-04-29
  • Java Milvus SearchParam withoutFields用法介紹

    本文將詳細介紹Java Milvus SearchParam withoutFields的相關知識和用法。 一、什麼是Java Milvus SearchParam without…

    編程 2025-04-29
  • Java 8中某一周的周一

    Java 8是Java語言中的一個版本,於2014年3月18日發布。本文將從多個方面對Java 8中某一周的周一進行詳細的闡述。 一、數組處理 Java 8新特性之一是Stream…

    編程 2025-04-29
  • Python中讀入csv文件數據的方法用法介紹

    csv是一種常見的數據格式,通常用於存儲小型數據集。Python作為一種廣泛流行的編程語言,內置了許多操作csv文件的庫。本文將從多個方面詳細介紹Python讀入csv文件的方法。…

    編程 2025-04-29
  • Java判斷字元串是否存在多個

    本文將從以下幾個方面詳細闡述如何使用Java判斷一個字元串中是否存在多個指定字元: 一、字元串遍歷 字元串是Java編程中非常重要的一種數據類型。要判斷字元串中是否存在多個指定字元…

    編程 2025-04-29
  • 如何用Python統計列表中各數據的方差和標準差

    本文將從多個方面闡述如何使用Python統計列表中各數據的方差和標準差, 並給出詳細的代碼示例。 一、什麼是方差和標準差 方差是衡量數據變異程度的統計指標,它是每個數據值和該數據值…

    編程 2025-04-29

發表回復

登錄後才能評論