java開kafka消費端(java實現kafka消費者)

本文目錄一覽:

用Kafka和Java搭建的項目,Kafka管理中心在什麼情況下會重複發送消息?消費端的程序接收到消息,進入方法

非手動提交offset

消費者只要讀取到數據,就會修改offset,不需要方法體執行完

手動提交

需要手動提交代碼執行完畢

針對你的問題,情況有很多種可能。

你是否開啟手動提交offset

你的消費者,有幾個?是否是同一個組?

java工程kafka傳遞自定義對象,消費端獲取到的是null

3. 啟服務

3.1 啟zookeeper

啟zk兩種式第種使用kafka自帶zk

bin/zookeeper-server-start.sh config/zookeeper.properties

另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址

例zookeeper.connect=10.202.4.179:2181

3.2 啟 kafka

bin/kafka-server-start.sh config/server.properties

4.創建topic

bin/kafka-topics.sh –create –zookeeper 10.202.4.179:2181 –replication-factor 1 –partitions 1 –topic test

創建名testtopic副本區

通list命令查看剛剛創建topic

bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181

5.啟producer並發送消息啟producer

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

啟發送消息

test

hello boy

按Ctrl+C退發送消息

6.啟consumer

bin/kafka-console-consumer.sh –zookeeper 10.202.4.179:2181 –topic test –from-beginning

啟consumerconsole看producer發送消息

啟兩終端發送消息接受消息

都行查看zookeeper進程kafkatopic步步排查原吧

kafka消費者java版本讀取不到消息怎麼辦

Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於producer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費(具體如何確定consumer線程數目我們後面會詳細說明)。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。

java客戶端使用kafka時什麼情況下使用kafka client和spring kafka?

spring-kafka 是基於 java版的 kafka client與spring的集成,提供了 KafkaTemplate,封裝了各種方法,方便操作

所以你使用spring的情況下,可以用spring-kafka,當然直接用kafka client也行

使用java實現kafka consumer時報錯

public static void consumer(){

        Properties props = new Properties();  

        props.put(“zk.connect”, “hadoop-2:2181”);  

        props.put(“zk.connectiontimeout.ms”, “1000000”);  

        props.put(“groupid”, “fans_group”);  

          

        // Create the connection to the cluster  

        ConsumerConfig consumerConfig = new ConsumerConfig(props);  

        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

          

        MapString, Integer map = new HashMapString, Integer();

        map.put(“fans”, 1);

        

        // create 4 partitions of the stream for topic 「test」, to allow 4 threads to consume  

        MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);  

        ListKafkaStreamMessage streams = topicMessageStreams.get(“fans”);  

          

        // create list of 4 threads to consume from each of the partitions   

        ExecutorService executor = Executors.newFixedThreadPool(1);  

        long startTime = System.currentTimeMillis();

        // consume the messages in the threads  

        for(final KafkaStreamMessage stream: streams) {  

          executor.submit(new Runnable() {  

            public void run() {  

                 ConsumerIteratorMessage it = stream.iterator();

                  while (it.hasNext()){

                      log.debug(byteBufferToString(it.next().message().payload()));

                  }

              } 

            

          }); 

          log.debug(“use time=”+(System.currentTimeMillis()-startTime));

        }  

    }

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/300921.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-29 14:18
下一篇 2024-12-29 14:18

相關推薦

  • java client.getacsresponse 編譯報錯解決方法

    java client.getacsresponse 編譯報錯是Java編程過程中常見的錯誤,常見的原因是代碼的語法錯誤、類庫依賴問題和編譯環境的配置問題。下面將從多個方面進行分析…

    編程 2025-04-29
  • Java JsonPath 效率優化指南

    本篇文章將深入探討Java JsonPath的效率問題,並提供一些優化方案。 一、JsonPath 簡介 JsonPath是一個可用於從JSON數據中獲取信息的庫。它提供了一種DS…

    編程 2025-04-29
  • Java騰訊雲音視頻對接

    本文旨在從多個方面詳細闡述Java騰訊雲音視頻對接,提供完整的代碼示例。 一、騰訊雲音視頻介紹 騰訊雲音視頻服務(Cloud Tencent Real-Time Communica…

    編程 2025-04-29
  • Java Bean載入過程

    Java Bean載入過程涉及到類載入器、反射機制和Java虛擬機的執行過程。在本文中,將從這三個方面詳細闡述Java Bean載入的過程。 一、類載入器 類載入器是Java虛擬機…

    編程 2025-04-29
  • Java Milvus SearchParam withoutFields用法介紹

    本文將詳細介紹Java Milvus SearchParam withoutFields的相關知識和用法。 一、什麼是Java Milvus SearchParam without…

    編程 2025-04-29
  • Java 8中某一周的周一

    Java 8是Java語言中的一個版本,於2014年3月18日發布。本文將從多個方面對Java 8中某一周的周一進行詳細的闡述。 一、數組處理 Java 8新特性之一是Stream…

    編程 2025-04-29
  • Java判斷字元串是否存在多個

    本文將從以下幾個方面詳細闡述如何使用Java判斷一個字元串中是否存在多個指定字元: 一、字元串遍歷 字元串是Java編程中非常重要的一種數據類型。要判斷字元串中是否存在多個指定字元…

    編程 2025-04-29
  • VSCode為什麼無法運行Java

    解答:VSCode無法運行Java是因為默認情況下,VSCode並沒有集成Java運行環境,需要手動添加Java運行環境或安裝相關插件才能實現Java代碼的編寫、調試和運行。 一、…

    編程 2025-04-29
  • Java任務下發回滾系統的設計與實現

    本文將介紹一個Java任務下發回滾系統的設計與實現。該系統可以用於執行複雜的任務,包括可回滾的任務,及時恢復任務失敗前的狀態。系統使用Java語言進行開發,可以支持多種類型的任務。…

    編程 2025-04-29
  • Java 8 Group By 會影響排序嗎?

    是的,Java 8中的Group By會對排序產生影響。本文將從多個方面探討Group By對排序的影響。 一、Group By的概述 Group By是SQL中的一種常見操作,它…

    編程 2025-04-29

發表回復

登錄後才能評論