本文目錄一覽:
- 1、用Kafka和Java搭建的項目,Kafka管理中心在什麼情況下會重複發送消息?消費端的程序接收到消息,進入方法
- 2、java工程kafka傳遞自定義對象,消費端獲取到的是null
- 3、kafka消費者java版本讀取不到消息怎麼辦
- 4、java客戶端使用kafka時什麼情況下使用kafka client和spring kafka?
- 5、使用java實現kafka consumer時報錯
用Kafka和Java搭建的項目,Kafka管理中心在什麼情況下會重複發送消息?消費端的程序接收到消息,進入方法
非手動提交offset
消費者只要讀取到數據,就會修改offset,不需要方法體執行完
手動提交
需要手動提交代碼執行完畢
針對你的問題,情況有很多種可能。
你是否開啟手動提交offset
你的消費者,有幾個?是否是同一個組?
java工程kafka傳遞自定義對象,消費端獲取到的是null
3. 啟服務
3.1 啟zookeeper
啟zk兩種式第種使用kafka自帶zk
bin/zookeeper-server-start.sh config/zookeeper.properties
另種使用其zookeeper位於本機位於其址種情況需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 啟 kafka
bin/kafka-server-start.sh config/server.properties
4.創建topic
bin/kafka-topics.sh –create –zookeeper 10.202.4.179:2181 –replication-factor 1 –partitions 1 –topic test
創建名testtopic副本區
通list命令查看剛剛創建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.啟producer並發送消息啟producer
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
啟發送消息
比
test
hello boy
按Ctrl+C退發送消息
6.啟consumer
bin/kafka-console-consumer.sh –zookeeper 10.202.4.179:2181 –topic test –from-beginning
啟consumerconsole看producer發送消息
啟兩終端發送消息接受消息
都行查看zookeeper進程kafkatopic步步排查原吧
kafka消費者java版本讀取不到消息怎麼辦
Kafka的生產者和消費者都可以多線程地並行操作,而每個線程處理的是一個分區的數據。因此分區實際上是調優Kafka並行度的最小單元。對於producer而言,它實際上是用多個線程並發地向不同分區所在的broker發起Socket連接同時給這些分區發送消息;而consumer呢,同一個消費組內的所有consumer線程都被指定topic的某一個分區進行消費(具體如何確定consumer線程數目我們後面會詳細說明)。所以說,如果一個topic分區越多,理論上整個集群所能達到的吞吐量就越大。
java客戶端使用kafka時什麼情況下使用kafka client和spring kafka?
spring-kafka 是基於 java版的 kafka client與spring的集成,提供了 KafkaTemplate,封裝了各種方法,方便操作
所以你使用spring的情況下,可以用spring-kafka,當然直接用kafka client也行
使用java實現kafka consumer時報錯
public static void consumer(){
Properties props = new Properties();
props.put(“zk.connect”, “hadoop-2:2181”);
props.put(“zk.connectiontimeout.ms”, “1000000”);
props.put(“groupid”, “fans_group”);
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
MapString, Integer map = new HashMapString, Integer();
map.put(“fans”, 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);
ListKafkaStreamMessage streams = topicMessageStreams.get(“fans”);
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStreamMessage stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIteratorMessage it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug(“use time=”+(System.currentTimeMillis()-startTime));
}
}
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/300921.html