phpkafka集群鏈接,php kafaka

本文目錄一覽:

給kafka配置外部連接

在公司的測試環境中,有的應用需要遠程連接kafka,本地有時也是需要連接到kafka進行Debug,這就需要將kafka配置成外部可連接。想要實現這種效果,有兩種實現方法,第一種方法是將所有kafka的連接都配置成公網IP連接。第二種方法是採用kafka的內外分離配置。方法一雖然能夠實現kafka的外部連接,可是服務器上面對kafka的連接也會默認使用公網IP的方式連接,而不是內網,這樣會給實例的公網帶寬帶來很大的壓力,應用一多,就會造成實例無法進行登錄。而方法二就可以有效的避免這種現象了,它是將雲上本地應用採用內網來連接kafka,而同時又採用不同的端口配置外網連接,這樣能夠有效的減少公網帶寬的壓力。

在原有配置的基礎上加上或者更改如下配置,配置接受外網連接的端口為9093,同時打開安全組的9093端口,配置過後通過重啟kafka即可通過9093在本地連接kafka。

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT主要分別定義內部和外部連接採用的安全協議

listeners=INTERNAL://內網IP:9092,EXTERNAL://0.0.0.0:9093主要是定義內部和外部連接監聽的地址端口

advertised.listeners=INTERNAL://內網IP:9092,EXTERNAL://外網IP:9093主要是提交給zookeeper來實現對kafka內部和外部的連接,最開始改配置只是配置了外部的連接,沒有內部的連接,所以對kafka的連接都是通過外部連接。

inter.broker.listener.name=INTERNAL主要是制定kafka集群內部broker之前通過INTERNAL的配置來進行內部通訊。

參考連接:

kafka集群測試正常,但是Java連接kafka出現異常,急求大神解答!!!!!!!!!!!

首先你在鏈接時候檢查是否代碼里的IP 和端口是不是對的,端口是broker 端口,默認9092 ;

其次查看代碼是生產者,看Kafka 集群里這個主題是否存在(如果不存在,默認是配置可以自動創建,看是非將該配置修改);然後檢測防火牆,相應端口是否開放(防火牆直接關也可以);檢測 server.properties 文件的 listeners 是否配置,若沒有將其配置好

Kafka(四)集群之kafka

在章節二( )中,我們部署了單機的kafka,現在我們部署一套集群模式的kafka。

這裡我準備了三台虛擬機:

192.168.184.134

192.168.184.135

192.168.184.136

每台機器部署一個zk和kafka。

上一章節中zk集群已經部署完畢。

在章節二中,134這台機器已經有kafka存在了,我們在另外兩台機器上安裝kafka:

在上面的文件中有幾個關鍵點,我們一一進行配置,我會對配置中的說明翻譯:

以下這兩個listeners,advertised_listeners 是對外暴露的服務端口,真正建立連接用的是 listeners。

在內網中我們使用listenners就可以了,在docker等容器或雲中使用advertised。

下面這個是日誌路徑的配置

下面這個是個重點的東西,topic在磁盤上會分為多個partitions存儲,相比單一文件存儲,增加了並行性,在後續文章中會詳細去講解:

日誌的保存時間:

以下是zookeeper的配置:

這裡我們直接設置後台啟動,三個節點都是如此:

這裡面有個小坑,還記得之前我們搭建的單機環境嗎?那時候默認的日誌文件夾在/tmp/kafka-logs下面,生成了很多內容,導致我們134這個節點無法啟動成功,報錯如下:

解決這個問題只需要把/tmp/kafka-logs文件刪除就好了。

看到日誌出現這一句表明啟動成功了:

下面我們驗證下是否搭建成功了,首先使用kafkatool工機具連接看下:

我們在134節點創建一個topic:

查看topic列表:

在kafkatool中查看:

創建生產者:

創建消費者:

生成者發送消息:

消費者接收消息:

到此為止,kafka的集群搭建已經完成了。在後面的文章我們會去學習如何在springboot中集成kafka。

kafka集群配置和使用

進入安裝目錄,修改server.properties文件

修改如下屬性,除id外,其他每台主機一致:

語義配置:(可選)

先啟動zookeeper集群,已經在三台主機上配置好了zookeeper集群,啟動:

在各台主機上進入zookeeper目錄,分別啟動zk:

在各台主機上進入kafka目錄,分別啟動kafka:

啟動結果為:

kafka佔據了前台,要使用主機,需要打開新終端

在新打開的終端上,進入zk目錄,

進入kafka目錄,創建主體

服務端技術實戰系列——Kafka篇

一.概念原理

[if !supportLists]1. [endif]主題(topic):主題是對消息的分類。

[if !supportLists]2. [endif]消息(message):消息是kafka通信的基本單位。

[if !supportLists]3. [endif]分區(partition): 一組 消息對應 一個 主題, 一個 主題對應 一個或多個 分區。每個分區為一系列有序消息組成的 有序隊列 ;每個分區在物理上對應一個文件夾。

[if !supportLists]4. [endif]副本(replica):每個分區有 一個或多個 副本,分區的副本分布在集群的 不同 代理(機器)上,以提高可用性;分區的副本與日誌對象是一一對應的。

[if !supportLists]5. [endif]Kafka只保證一個 分區內 的消息 有序性 ,不保證跨分區消息的有序性。消息被追加到相應分區中, 順序寫入磁盤 ,效率非常高。

[if !supportLists]6. [endif]Kafka選取某個某個分區的 一個 副本作為leader副本,該分區的 其他 副本為follower副本。 只有leader副本負責處理客戶端讀/寫請求 ,follower副本從leader副本同步數據。

[if !supportLists]7. [endif]任何發布到分區的消息都會追加到日誌文件的尾部, 每條消息 在日誌文件中的 位置 都對應一個 按序遞增的偏移量 ;偏移量在一個分區下嚴格有序。

[if !supportLists]8. [endif]Kafka不允許對消息進行隨機讀寫。

[if !supportLists]9. [endif]新版消費者將 消費偏移量 保存到kafka內部的一個主題中。

[if !supportLists]10. [endif]Kafka集群由 一個或多個代理 (Broker,也稱為kafka實例)構成。可以在 一台 服務器上配置 一個或多個代理 ,每個代理具有唯一標識broker.id。

[if !supportLists]11. [endif]生產者將消息 發送給代理 (Broker)。

[if !supportLists]12. [endif]消費者以 拉取 (pull)方式拉取數據,每個消費者都屬於一個消費組。

[if !supportLists]13. [endif]同一個主題的一條消息只能被 同一個消費組 下的某一個消費者消費,但 不同消費組 的消費者可以 同時 消費該消息。

[if !supportLists]14. [endif]消息 廣播 :指定各消費者屬於不同消費組;消息 單播 :指定各消費者屬於同一個消費組。

[if !supportLists]15. [endif]Kafka啟動時在Zookeeper上創建相應節點來保存 元數據 ,元數據包括:代理節點信息、集群信息、主題信息、分區狀態信息、分區副本分配方案、動態配置等;

[if !supportLists]16. [endif]Kafka通過 監聽 機制在節點註冊監聽器來監聽節點元數據變化;

[if !supportLists]17. [endif]Kafka將數據寫入 磁盤 ,以文件系統來存數據;

[if !supportLists]18. [endif]生產環境一般將zookeeper集群和kafka集群 分機架 部署;

[if !supportLists]二.[endif] Kafka Producer

配置:

/**

 * xTestProxy——KafkaConfigConstant

 *

 * @author  ZhangChi

 * @date  2018年6月20日—下午5:50:44

 * @version  1.0

 */

public   class  KafkaConfigConstant {

public   static   final  String KAFKA_CLUSTER  = “fa-common1.hangzhou-1.kafka.internal.lede.com:9200,fa-common2.hangzhou-1.kafka.internal.lede.com:9200,fa-common3.hangzhou-1.kafka.internal.lede.com:9200”;

}

生產者配置:

/**

 * xTestProxy——HttpKafkaProducerFactory

 *

 * @author  ZhangChi

 * @date  2018年6月11日—下午2:37:51

 * @version  1.0

 */

public   class  HttpKafkaProducerFactory {

// 真正的KafkaProducer僅有一份

private   static  KafkaProducer kafkaProducer  = null ;

private   static  Properties property ;

public   static  KafkaProducer getKafkaProducer() {

if  ( kafkaProducer  == null ) {

synchronized  (HttpKafkaProducerFactory. class ) {

if  ( kafkaProducer  == null ) {

property  = buildKafkaProperty ();

kafkaProducer  = new  KafkaProducer( property );

}

}

}

return   kafkaProducer ;

}

public   static  Properties buildKafkaProperty() {

Properties props = new  Properties();

props.put(ProducerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

props.put(ProducerConfig. ACKS_CONFIG , “all”);

props.put(ProducerConfig. RETRIES_CONFIG , 0);

props.put(ProducerConfig. BATCH_SIZE_CONFIG , 16384);

props.put(ProducerConfig. BUFFER_MEMORY_CONFIG , 33554432);

props.put(ProducerConfig. LINGER_MS_CONFIG , 1);

props.put(ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG , “org.apache.kafka.common.serialization.StringSerializer”);

props.put(ProducerConfig. VALUE_SERIALIZER_CLASS_CONFIG ,

“org.apache.kafka.common.serialization.StringSerializer”);

return  props;

}

}

生產者線程組:

/**

 * xTestProxy——HttpKafkaProducerThread

 * 多線程每次new一個實例

 *

 * @author  ZhangChi

 * @date  2018年6月25日—下午2:09:39

 * @version  1.0

 */

public   class  HttpKafkaProducerThread implements  Runnable {

private   static  Logger logger  = LoggerFactory. getLogger (“HttpKafkaProducerThread”);

private   final  String KAFKA_TOPIC = KafkaConstant. HTTP_REQ_RESP_TOPIC ;

private  String kafkaMessageJson;

private  KafkaProducer producer;

public  String messageType;

public  String originalMessage;

private   static  KafkaMessage kafkaMessage  = new  KafkaMessage();

public  HttpKafkaProducerThread(KafkaProducer producer, String messageType, String originalMessage) {

this .producer = producer;

this .messageType = messageType;

this .originalMessage = originalMessage;

}

@Override

public   void  run() {

// TODO  Auto-generated method stub

/* 1.構建kafka消息*/

kafkaMessageJson = generateKafkaMessage( this .messageType, this .originalMessage);

/* 2.發送kafka消息*/

if  (kafkaMessageJson != null   !StringUtils. isEmpty (kafkaMessageJson)) {

logger .info(“create message start:” + kafkaMessageJson);

producer.send( new  ProducerRecord( this .KAFKA_TOPIC, kafkaMessageJson));

} else  {

logger .info(“kafkaMessageJson is null!”);

}

}

private  String generateKafkaMessage(String messageType, String originalMessage) {

if  (StringUtils. isBlank (messageType) || StringUtils. isBlank (originalMessage)) {

return   null ;

}

kafkaMessage .setMessageId(KafkaMessageUtils. generateId ());

kafkaMessage .setMessageTime(KafkaMessageUtils. generateTime ());

kafkaMessage .setMessageType(messageType);

kafkaMessage .setMessage(originalMessage);

String kafkaMessageToJson = null ;

try  {

kafkaMessageToJson = KafkaMessageUtils. objectToJson ( kafkaMessage );

} catch  (JsonProcessingException e) {

// TODO  Auto-generated catch block

e.printStackTrace();

}

kafkaMessageJson = kafkaMessageToJson;

return  kafkaMessageToJson;

}

}

[if !supportLists]三.[endif] Kafka Consumer

消費者配置:

private   static  Properties buildKafkaProperty() {

Properties properties = new  Properties();

// 測試環境kafka的端口號是9200

properties.put(ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG , KafkaConfigConstant. KAFKA_CLUSTER );

// 消費組名稱

properties.put(ConsumerConfig. GROUP_ID_CONFIG , KafkaConfigConstant. GROUP_ID );

properties.put(ConsumerConfig. CLIENT_ID_CONFIG , “test”);

// 從頭消費

properties.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG , “earliest”);

// 自動提交偏移量

properties.put(ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG , “true”);

// 時間間隔1s

properties.put(ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG , “1000”);

properties.put(ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG ,

“org.apache.kafka.common.serialization.StringDeserializer”);

properties.put(ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG ,

“org.apache.kafka.common.serialization.StringDeserializer”);

return  properties;

}

消費者線程組:

/**

 * AnalysisEngine——HttpKafkaConsumerGroup

 *

 * @author  ZhangChi

 * @date  2018年6月11日—下午6:20:47

 * @version  1.0

 */

@Service(“httpKafkaConsumerGroup”)

public   class  HttpKafkaConsumerGroup {

@Autowired

private  RequestAnalyzer requestAnalyzer;

@Autowired

private  EsDocumentServiceImpl esDocumentServiceImpl;

@Autowired

private  AnalysisEngineClient analysisEngineClient;

@Autowired

private  MongoTemplate mongoTemplate;

private  List httpKafkaConsumerList = new  ArrayList();

public   void  initHttpKafkaConsumerGroup( int  consumerNumber, RunModeEnum mode) {

for  ( int  i = 0; i  consumerNumber; i++) {

/**

 * 將注入的服務當做構造參數,這樣保證每個子線程都能拿到服務實例而不是空指針!

 */

HttpKafkaConsumer consumerThread = new  HttpKafkaConsumer(requestAnalyzer, esDocumentServiceImpl, mode, analysisEngineClient, mongoTemplate);

httpKafkaConsumerList.add(consumerThread);

}

}

public   void  consumeGroupStart() {

for  (HttpKafkaConsumer item : httpKafkaConsumerList) {

LogConstant. runLog .info(“httpKafkaConsumerList size : ” + httpKafkaConsumerList.size());

Thread consumerThread = new  Thread(item);

consumerThread.start();

}

}

}

先逐個初始化消費者實例,然後將這些消費者加入到消費組列表中。消費組啟動後,會循環產生消費者線程。

 

Docker搭建Kafka測試集群

zookeeper利用這個hostname在集群中的broker之間同步消息, 這裡配置為kafka在docker虛擬網絡中的IP。 使用你的宿主機的IP也可以,但是端口號要改成宿主機的端口號

Kafka在Docker container中的端口號

如果你需要在Docker虛擬網絡之外使用Kafka集群,你需要把這兩個參數配置成你宿主機的IP,端口號要改成Docker映射到宿主機的端口號(9092, 9093)。當往Kafka其中的一個broker發送消息時,Kafka集群從ZooKeeper取得Broker IP和端口號,然後同步數據,使用虛擬網絡IP會導致發送和接收消息失敗,因為宿主機無法訪問Docker虛擬網絡內的節點

 例如我的宿主機IP是,192.168,0.2,

上述2個Kafka容器,9092端口號分別映射到宿主機的9092和9093端口。

環境變量配置如下

Kafka1配置為

      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.2

      KAFKA_ADVERTISED_PORT: 9092

Kafka2配置為

      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.2

      KAFKA_ADVERTISED_PORT: 9093

原創文章,作者:ARDO,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/140843.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
ARDO的頭像ARDO
上一篇 2024-10-04 00:24
下一篇 2024-10-04 00:24

相關推薦

  • PHP和Python哪個好找工作?

    PHP和Python都是非常流行的編程語言,它們被廣泛應用於不同領域的開發中。但是,在考慮擇業方向的時候,很多人都會有一個問題:PHP和Python哪個好找工作?這篇文章將從多個方…

    編程 2025-04-29
  • PHP怎麼接幣

    想要在自己的網站或應用中接受比特幣等加密貨幣的支付,就需要對該加密貨幣擁有一定的了解,並使用對應的API進行開發。本文將從多個方面詳細闡述如何使用PHP接受加密貨幣的支付。 一、環…

    編程 2025-04-29
  • Avue中如何按照後端返回的鏈接顯示圖片

    Avue是一款基於Vue.js、Element-ui等技術棧的可視化開發框架,能夠輕鬆搭建前端頁面。在開發中,我們使用到的圖片通常都是存儲在後端服務器上的,那麼如何使用Avue來展…

    編程 2025-04-28
  • 使用Go-Redis獲取Redis集群內存使用率

    本文旨在介紹如何使用Go-Redis獲取Redis集群的內存使用率。 一、Go-Redis簡介 Go-Redis是一個用於連接Redis服務器的Golang客戶端。它支持Redis…

    編程 2025-04-28
  • LwIP短鏈接client例程用法介紹

    本文將詳細闡述LwIP短鏈接client例程,該例程是基於LwIP協議棧實現的一個短鏈接客戶端程序,適用於嵌入式設備上進行互聯網通信。 一、LwIP介紹 LwIP(Lightwei…

    編程 2025-04-28
  • 使用PHP foreach遍歷有相同屬性的值

    本篇文章將介紹如何使用PHP foreach遍歷具有相同屬性的值,並給出相應的代碼示例。 一、基礎概念 在講解如何使用PHP foreach遍歷有相同屬性的值之前,我們需要先了解幾…

    編程 2025-04-28
  • 如何創建短鏈接和實現熱切換

    在本文中,我們將會介紹如何使用Python創建短鏈接和實現熱切換功能。 一、創建短鏈接 1、什麼是短鏈接?通俗易懂來說,短鏈接就是將長鏈接變成一個短小精悍的地址,通常是為了方便用戶…

    編程 2025-04-28
  • PHP獲取301跳轉後的地址

    本文將為大家介紹如何使用PHP獲取301跳轉後的地址。301重定向是什麼呢?當我們訪問一個網頁A,但是它已經被遷移到了另一個地址B,此時若服務器端做了301重定向,那麼你的瀏覽器在…

    編程 2025-04-27
  • Redis5.0集群擴容用法介紹

    Redis是一個內存數據庫,越來越受到開發者的歡迎。在開發中,我們經常需要考慮Redis集群的擴容問題。而Redis5.0針對集群擴容方面進行了多項優化和改進,本文將從多個方面詳細…

    編程 2025-04-27
  • PHP登錄頁面代碼實現

    本文將從多個方面詳細闡述如何使用PHP編寫一個簡單的登錄頁面。 1. PHP登錄頁面基本架構 在PHP登錄頁面中,需要包含HTML表單,用戶在表單中輸入賬號密碼等信息,提交表單後服…

    編程 2025-04-27

發表回復

登錄後才能評論