本文目錄一覽:
php 使用kafka
終端開啟一個消費者:
生產者端發送:
消費者端接收:
消費者端接收:
起航吧, kafka 之旅
大型的PHP應用,通常使用什麼應用做消息隊列?
一、消息隊列概述
消息隊列中間件是分散式系統中重要的組件,主要解決應用耦合,非同步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中間件。
目前在生產環境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、消息隊列應用場景
以下介紹消息隊列在實際應用中常用的使用場景。非同步處理,應用解耦,流量削鋒和消息通訊四個場景。
2.1非同步處理
場景說明:用戶註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種1.串列的方式;2.並行方式。
(1)串列方式:將註冊信息寫入資料庫成功後,發送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端。(架構KKQ:466097527,歡迎加入)
(2)並行方式:將註冊信息寫入資料庫成功後,發送註冊郵件的同時,發送註冊簡訊。以上三個任務完成後,返回給客戶端。與串列的差別是,並行的方式可以提高處理的時間。
假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則串列方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。
小結:如以上案例描述,傳統的方式系統的性能(並發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入消息隊列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:
按照以上約定,用戶的響應時間相當於是註冊信息寫入資料庫的時間,也就是50毫秒。註冊郵件,發送簡訊寫入消息隊列後,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比串列提高了3倍,比並行提高了兩倍。
2.2應用解耦
場景說明:用戶下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的介面。如下圖:
傳統模式的缺點:
1) 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;
2) 訂單系統與庫存系統耦合;
如何解決以上問題呢?引入應用消息隊列後的方案,如下圖:
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦。
2.3流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
可以控制活動的人數;
可以緩解短時間內高流量壓垮應用;
用戶的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面;
秒殺業務根據消息隊列中的請求信息,再做後續處理。
2.4日誌處理
日誌處理是指將消息隊列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下:
日誌採集客戶端,負責日誌數據採集,定時寫受寫入Kafka隊列;
Kafka消息隊列,負責日誌數據的接收,存儲和轉發;
日誌處理應用:訂閱並消費kafka隊列中的日誌數據;
以下是新浪kafka日誌處理應用案例:
(1)Kafka:接收用戶日誌的消息隊列。
(2)Logstash:做日誌解析,統一成JSON輸出給Elasticsearch。
(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。
(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。
2.5消息通訊
消息通訊是指,消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等。
點對點通訊:
客戶端A和客戶端B使用同一隊列,進行消息通訊。
聊天室通訊:
客戶端A,客戶端B,客戶端N訂閱同一主題,進行消息發布和接收。實現類似聊天室效果。
以上實際是消息隊列的兩種消息模式,點對點或發布訂閱模式。模型為示意圖,供參考。
三、消息中間件示例
3.1電商系統
消息隊列採用高可用,可持久化的消息中間件。比如Active MQ,Rabbit MQ,Rocket Mq。(1)應用將主幹邏輯處理完成後,寫入消息隊列。消息發送是否成功可以開啟消息的確認模式。(消息隊列返回消息接收成功狀態後,應用再返回,這樣保障消息的完整性)
(2)擴展流程(發簡訊,配送處理)訂閱隊列消息。採用推或拉的方式獲取消息並處理。
(3)消息將應用解耦的同時,帶來了數據一致性問題,可以採用最終一致性方式解決。比如主數據寫入資料庫,擴展應用根據消息隊列,並結合資料庫方式實現基於消息隊列的後續處理。
3.2日誌收集系統
分為Zookeeper註冊中心,日誌收集客戶端,Kafka集群和Storm集群(OtherApp)四部分組成。
Zookeeper註冊中心,提出負載均衡和地址查找服務;
日誌收集客戶端,用於採集應用系統的日誌,並將數據推送到kafka隊列;
四、JMS消息服務
講消息隊列就不得不提JMS 。JMS(Java Message Service,Java消息服務)API是一個消息服務的標準/規範,允許應用程序組件基於JavaEE平台創建、發送、接收和讀取消息。它使分散式通信耦合度更低,消息服務更加可靠以及非同步性。
在EJB架構中,有消息bean可以無縫的與JM消息服務集成。在J2EE架構模式中,有消息服務者模式,用於實現消息與應用直接的解耦。
4.1消息模型
在JMS標準中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
4.1.1 P2P模式
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
P2P的特點
每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
接收者在成功接收消息之後需向隊列應答成功
如果希望發送的每個消息都會被成功處理的話,那麼需要P2P模式。(架構KKQ:466097527,歡迎加入)
4.1.2 Pub/sub模式
包含三個角色主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
Pub/Sub的特點
每個消息可以有多個消費者
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之後,才能消費發布者的消息。
為了消費消息,訂閱者必須保持運行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。
如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4.2消息消費
在JMS中,消息的產生和消費都是非同步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。
(1)同步
訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或超時之前)將一直阻塞;
(2)非同步
訂閱者或接收者可以註冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。
JNDI:Java命名和目錄介面,是一種標準的Java命名系統介面。可以在網路上查找和訪問服務。通過指定一個資源名稱,該名稱對應於資料庫或命名服務中的一個記錄,同時返回資源連接建立所必須的信息。
JNDI在JMS中起到查找和訪問發送目標或消息來源的作用。(架構KKQ:466097527,歡迎加入)
4.3JMS編程模型
(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的介面。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(7) MessageListener
消息監聽器。如果註冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
深入學習JMS對掌握JAVA架構,EJB架構有很好的幫助,消息中間件也是大型分散式系統必須的組件。本次分享主要做全局性介紹,具體的深入需要大家學習,實踐,總結,領會。
五、常用消息隊列
一般商用的容器,比如WebLogic,JBoss,都支持JMS標準,開發上很方便。但免費的比如Tomcat,Jetty等則需要使用第三方的消息中間件。本部分內容介紹常用的消息中間件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他們的特點。
5.1 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息匯流排。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
ActiveMQ特性如下:
⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
⒊ 對spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE伺服器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業伺服器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端-伺服器,點對點
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易得調用內嵌JMS provider,進行測試
5.2 RabbitMQ
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分散式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
幾個重要概念:
Broker:簡單來說就是消息隊列伺服器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程,如下:
(1)客戶端連接到消息隊列伺服器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關係。
(5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
5.3 ZeroMQ
號稱史上最快的消息隊列,它實際類似於Socket的一系列介面,他跟Socket的區別是:普通的socket是端到端的(1:1的關係),而ZMQ卻是可以N:M 的關係,人們對BSD套接字的了解較多的是點對點的連接,點對點連接需要顯式地建立連接、銷毀連接、選擇協議(TCP/UDP)和處理錯誤等,而ZMQ屏蔽了這些細節,讓你的網路編程更為簡單。ZMQ用於node與node間的通信,node可以是主機或者是進程。
引用官方的說法: 「ZMQ(以下ZeroMQ簡稱ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。ZMQ的明確目標是「成為標準網路協議棧的一部分,之後進入Linux內核」。現在還未看到它們的成功。但是,它無疑是極具前景的、並且是人們更加需要的「傳統」BSD套接字之上的一 層封裝。ZMQ讓編寫高性能網路應用程序極為簡單和有趣。」
特點是:
高性能,非持久化;
跨平台:支持Linux、Windows、OS X等。
多語言支持; C、C++、Java、.NET、Python等30多種開發語言。
可單獨部署或集成到應用中使用;
可作為Socket通信庫使用。
與RabbitMQ相比,ZMQ並不像是一個傳統意義上的消息隊列伺服器,事實上,它也根本不是一個伺服器,更像一個底層的網路通訊庫,在Socket API之上做了一層封裝,將網路通訊、進程通訊和線程通訊抽象為統一的API介面。支持「Request-Reply 「,」Publisher-Subscriber「,」Parallel Pipeline」三種基本模型和擴展模型。
ZeroMQ高性能設計要點:
1、無鎖的隊列模型
對於跨線程間的交互(用戶端和session)之間的數據交換通道pipe,採用無鎖的隊列演算法CAS;在pipe兩端註冊有非同步事件,在讀或者寫消息到pipe的時,會自動觸發讀寫事件。
2、批量處理的演算法
對於傳統的消息處理,每個消息在發送和接收的時候,都需要系統的調用,這樣對於大量的消息,系統的開銷比較大,zeroMQ對於批量的消息,進行了適應性的優化,可以批量的接收和發送消息。
3、多核下的線程綁定,無須CPU切換
區別於傳統的多線程併發模式,信號量或者臨界區, zeroMQ充分利用多核的優勢,每個核綁定運行一個工作者線程,避免多線程之間的CPU切換開銷。
5.4 Kafka
Kafka是一種高吞吐量的分散式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。
Kafka是一種高吞吐量的分散式發布訂閱消息系統,有如下特性:
通過O(1)的磁碟數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。(文件追加的方式寫入數據,過期的數據定期刪除)
高吞吐量:即使是非常普通的硬體Kafka也可以支持每秒數百萬的消息。
支持通過Kafka伺服器和消費機集群來分區消息。
支持Hadoop並行數據載入。
Kafka相關概念
Broker
Kafka集群包含一個或多個伺服器,這種伺服器被稱為broker[5]
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition
Parition是物理上的概念,每個Topic包含一個或多個Partition.
Producer
負責發布消息到Kafka broker
Consumer
消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group
每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
一般應用在大數據日誌處理或對實時性(少量延遲),可靠性(少量丟數據)要求稍低的場景使用。
Kafka丟失數據問題優化總結
數據丟失是一件非常嚴重的事情事,針對數據丟失的問題我們需要有明確的思路來確定問題所在,針對這段時間的總結,我個人面對kafka 數據丟失問題的解決思路如下:
1、是否真正的存在數據丟失問題,比如有很多時候可能是其他同事操作了測試環境,所以首先確保數據沒有第三方干擾。
2、理清你的業務流程,數據流向,數據到底是在什麼地方丟失的數據,在kafka 之前的環節或者kafka之後的流程丟失?比如kafka的數據是由flume提供的,也許是flume丟失了數據,kafka 自然就沒有這一部分數據。
3、如何發現有數據丟失,又是如何驗證的。從業務角度考慮,例如:教育行業,每年高考後數據量巨大,但是卻反常的比高考前還少,或者源端數據量和目的端數據量不符
4、 定位數據是否在kafka之前就已經丟失還事消費端丟失數據的
kafka支持數據的重新回放功能(換個消費group),清空目的端所有數據,重新消費。如果是在消費端丟失數據,那麼多次消費結果完全一模一樣的幾率很低。如果是在寫入端丟失數據,那麼每次結果應該完全一樣(在寫入端沒有問題的前提下)。
5、kafka環節丟失數據,常見的kafka環節丟失數據的原因有:
如果auto.commit.enable=true,當consumer fetch了一些數據但還沒有完全處理掉的時候,剛好到commit interval出發了提交offset操作,接著consumer crash掉了。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。網路負載很高或者磁碟很忙寫入失敗的情況下,沒有自動重試重發消息。沒有做限速處理,超出了網路帶寬限速。kafka一定要配置上消息重試的機制,並且重試的時間間隔一定要長一些,默認1秒鐘並不符合生產環境(網路中斷時間有可能超過1秒)。如果磁碟壞了,會丟失已經落盤的數據
單批數據的長度超過限制會丟失數據,報kafka.common.MessageSizeTooLargeException異常解決:
6、partition leader在未完成副本數follows的備份時就宕機的情況,即使選舉出了新的leader但是已經push的數據因為未備份就丟失了!kafka是多副本的,當你配置了同步複製之後。多個副本的數據都在PageCache裡面,出現多個副本同時掛掉的概率比1個副本掛掉的概率就很小了。(官方推薦是通過副本來保證數據的完整性的)
7、kafka的數據一開始就是存儲在PageCache上的,定期flush到磁碟上的,也就是說,不是每個消息都被存儲在磁碟了,如果出現斷電或者機器故障等,PageCache上的數據就丟失了。可以通過log.flush.interval.messages和log.flush.interval.ms來配置flush間隔,interval大丟的數據多些,小會影響性能但在0.8版本,可以通過replica機制保證數據不丟,代價就是需要更多資源,尤其是磁碟資源,kafka當前支持GZip和Snappy壓縮,來緩解這個問題 是否使用replica取決於在可靠性和資源代價之間的balance。
同時kafka也提供了相關的配置參數,來讓你在性能與可靠性之間權衡(一般默認):
當達到下面的消息數量時,會將數據flush到日誌文件中。默認10000
當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms
檢查是否需要將日誌flush的時間間隔
high-level版本已經封裝了對partition和offset的管理,默認是會定期自動commit offset,這樣可能會丟數據的low-level版本自己管理spout線程和partition之間的對應關係和每個partition上的已消費的offset(定期寫到zk)並且只有當這個offset被ack後,即成功處理後,才會被更新到zk,所以基本是可以保證數據不丟的即使spout線程crash(崩潰),重啟後還是可以從zk中讀到對應的offset
不能讓內存的緩衝池太滿,如果滿了內存溢出,也就是說數據寫入過快,kafka的緩衝池數據落盤速度太慢,這時肯定會造成數據丟失。盡量保證生產者端數據一直處於線程阻塞狀態,這樣一邊寫內存一邊落盤。非同步寫入的話還可以設置類似flume回滾類型的batch數,即按照累計的消息數量,累計的時間間隔,累計的數據大小設置batch大小。
不過非同步寫入丟失數據的情況還是難以控制還是得穩定整體集群架構的運行,特別是zookeeper,當然正對非同步數據丟失的情況盡量保證broker端的穩定運作吧
kafka不像hadoop更致力於處理大量級數據,kafka的消息隊列更擅長於處理小數據。針對具體業務而言,若是源源不斷的push大量的數據(eg:網路爬蟲),可以考慮消息壓縮。但是這也一定程度上對CPU造成了壓力,還是得結合業務數據進行測試選擇
topic設置多分區,分區自適應所在機器,為了讓各分區均勻分布在所在的broker中,分區數要大於broker數。分區是kafka進行並行讀寫的單位,是提升kafka速度的關鍵。
關閉自動更新offset,等到數據被處理後再手動跟新offset。
在消費前做驗證前拿取的數據是否是接著上回消費的數據,不正確則return先行處理排錯。
一般來說zookeeper只要穩定的情況下記錄的offset是沒有問題,除非是多個consumer group 同時消費一個分區的數據,其中一個先提交了,另一個就丟失了。
kafka的數據一開始就是存儲在PageCache上的,定期flush到磁碟上的,也就是說,不是每個消息都被存儲在磁碟了,如果出現斷電或者機器故障等,PageCache上的數據就丟失了。這個是總結出的到目前為止沒有發生丟失數據的情況
強行kill線程,導致消費後的數據,offset沒有提交,partition就斷開連接。比如,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session timeout時間(0.10.x版本默認是30秒),那麼就會re-blance重平衡,此時有一定幾率offset沒提交,會導致重平衡後重複消費。
如果在close之前調用了consumer.unsubscribe()則有可能部分offset沒提交,下次重啟會重複消費。
kafka數據重複 kafka設計的時候是設計了(at-least once)至少一次的邏輯,這樣就決定了數據可能是重複的,kafka採用基於時間的SLA(服務水平保證),消息保存一定時間(通常為7天)後會被刪除。
kafka的數據重複一般情況下應該在消費者端,這時log.cleanup.policy = delete使用定期刪除機制。
kafka如何做到磁碟讀寫比內存讀寫還快?
Kafka作為一個支持大數據量寫入寫出的消息隊列,由於是基於Scala和Java實現的,而Scala和Java均需要在JVM上運行,所以如果是基於內存的方式,即JVM的堆來進行數據存儲則需要開闢很大的堆來支持數據讀寫,從而會導致GC頻繁影響性能。考慮到這些因素,kafka是使用磁碟存儲數據的。
Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向topic的。topic存儲結構見下圖:
由於生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 採取了 分片 和 索引 機制,將每個partition分為多個segment。每個 segment對應兩個文件——「.index」文件和「.log」文件。
partition文件夾命名規則:
topic 名稱+分區序號,舉例有一個topic名稱文「kafka」,這個topic有三個分區,則每個文件夾命名如下:
index和log文件的命名規則:
1)partition文件夾中的第一個segment從0開始,以後每個segement文件以上一個segment文件的最後一條消息的offset+1命名(當前日誌中的第一條消息的offset值命名)。
2)數值最大為64位long大小。19位數字字元長度,沒有數字用0填充。
舉例,有以下三對文件:
以第二個文件為例看下對應的數據結構:
稀疏索引 需要注意下。
消息查找過程 :
找message-2589,即offset為2589:
1)先定位segment文件,在0000000000000002584中。
2)計算查找的offset在日誌文件的相對偏移量
offset – 文件名的數量 = 2589 – 2584 = 5;
在index文件查找第一個參數的值,若找到,則獲取到偏移量,通過偏移量到log文件去找對應偏移量的數據即可;
本例中沒有找到,則找到當前索引中偏移量的上線最接近的值,即3,偏移量文246;然後到log文件中從偏移量為246數據開始向下尋找。
簡單了解了kafka在數據存儲方面的知識,線面我們具體分析下為什麼kafka基於磁碟卻快於內存。
在前面了解存儲結構過程中,我們發現kafka記錄log日誌使用的結尾追加的方式,即 順序寫 。這樣要比隨機寫塊很多,這與磁碟的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭定址的時間。
mmap,簡單描述其就是將磁碟文件映射到內存, 用戶通過修改內存就能修改磁碟文件。
即便是順序寫磁碟,磁碟的讀寫速度任然比內存慢慢的多得多,好在操作系統已經幫我們解決這個問題。在Linux操作系統中,Linux會將磁碟中的一些數據讀取到內存當中,我們稱之為內存頁。當需要讀寫硬碟的時候,都優先在內存頁中進行處理。當內存頁的數據比硬碟數據多的時候,就形成了 臟頁 ,當臟頁達到一定數量,操作系統會進行 刷臟 ,即將內存也數據寫到磁碟。
問題:不可靠,寫到 mmap 中的數據並沒有被真正的寫到硬碟,操作系統會在程序主動調用 Flush 的時候才把數據真正的寫到硬碟。
零拷貝並不是不需要拷貝,而是減少不必要的拷貝次數,通常使用在IO讀寫過程中。
傳統io過程
如上圖所示,上圖共經歷了四次拷貝的過程:
1)數據到到內核態的read buffer;
2)內核態的read buffer到用戶態應用層的buffer;
3)用戶態到內核態的socket buffer;
4)socket buffer到網卡的buffer(NIC)。
DMA
引入DMA技術,是指外部設備不通過CPU而直接與系統內存交換數據的介面技術,網卡等硬體設備支持DMA技術。
如上圖所示,上圖共經歷了兩次拷貝的過程。
sendfile
在內核版本 2.1 中,引入了 Sendfile 系統調用,以簡化網路上和兩個本地文件之間的數據傳輸。同時使用了DMA技術。
如上圖所示,上圖共經歷了一次拷貝的過程。
sendfile( DMA 收集拷貝)
之前我們是把頁緩存的數據拷貝到socket緩存中,實際上,我們僅僅需要把緩衝區描述符傳到 socket 緩衝區,再把數據長度傳過去,這樣 DMA 控制器直接將頁緩存中的數據打包發送到網路中就可以了。
如上圖所示,最後一次的拷貝也被消除了,數據-read buffer-NIC。
kafka通過java和scala實現,而Java對sendfile是通過NIO 的 FileChannel (java.nio.channels.FileChannel )的 transferTo 和 transferFrom 方法實現零拷貝
註: transferTo 和 transferFrom 並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供 sendfile 這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。
3分鐘帶你徹底搞懂 Kafka
Kafka到底是個啥?用來幹嘛的?
官方定義如下:
翻譯過來,大致的意思就是,這是一個實時數據處理系統,可以橫向擴展,並高可靠!
實時數據處理 ,從名字上看,很好理解,就是將數據進行實時處理,在現在流行的微服務開發中,最常用實時數據處理平台有 RabbitMQ、RocketMQ 等消息中間件。
這些中間件,最大的特點主要有兩個:
在早期的 web 應用程序開發中,當請求量突然上來了時候,我們會將要處理的數據推送到一個隊列通道中,然後另起一個線程來不斷輪訓拉取隊列中的數據,從而加快程序的運行效率。
但是隨著請求量不斷的增大,並且隊列通道的數據一致處於高負載,在這種情況下,應用程序的內存佔用率會非常高,稍有不慎,會出現內存不足,造成程序內存溢出,從而導致服務不可用。
隨著業務量的不斷擴張,在一個應用程序內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ等中間件。
採用這種模型,本質就是將要推送的數據,不在存放在當前應用程序的內存中,而是將數據存放到另一個專門負責數據處理的應用程序中,從而實現服務解耦。
消息中間件 :主要的職責就是保證能接受到消息,並將消息存儲到磁碟,即使其他服務都掛了,數據也不會丟失,同時還可以對數據消費情況做好監控工作。
應用程序 :只需要將消息推送到消息中間件,然後啟用一個線程來不斷從消息中間件中拉取數據,進行消費確認即可!
引入消息中間件之後,整個服務開發會變得更加簡單,各負其責。
Kafka 本質其實也是消息中間件的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。
LinkedIn 的開發團隊,為了解決數據管道問題,起初採用了 ActiveMQ 來進行數據交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數據傳遞系統的要求,經常由於各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的消息傳遞系統, Kafka 由此誕生 。
在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經被業界所認可,並成為大數據流水線的首選技術。
先來看一張圖, 下面這張圖就是 kafka 生產與消費的核心架構模型 !
如果你看不懂這些概念沒關係,我會帶著大家一起梳理一遍!
簡而言之,kafka 本質就是一個消息系統,與大多數的消息系統一樣,主要的特點如下:
與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在於,它有一個**分區 Partition **的概念。
這個分區的意思就是說,如果你創建的 topic 有5個分區,當你一次性向 kafka 中推 1000 條數據時,這 1000 條數據默認會分配到 5 個分區中,其中每個分區存儲 200 條數據。
這樣做的目的,就是方便消費者從不同的分區拉取數據,假如你啟動 5 個線程同時拉取數據,每個線程拉取一個分區,消費速度會非常非常快!
這是 kafka 與其他的消息系統最大的不同!
和其他的中間件一樣,kafka 每次發送數據都是向 Leader 分區發送數據,並順序寫入到磁碟,然後 Leader 分區會將數據同步到各個從分區 Follower ,即使主分區掛了,也不會影響服務的正常運行。
那 kafka 是如何將數據寫入到對應的分區呢?kafka中有以下幾個原則:
與生產者一樣,消費者主動的去kafka集群拉取消息時,也是從 Leader 分區去拉取數據。
這裡我們需要重點了解一個名詞: 消費組 !
考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區的數據,同一個分區只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!
但是不同的組,可以消費同一個分區的數據!
你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。
但是,如果一個組下的消費者數量大於分區數量,就會出現很多的消費者閑置。
如果分區數量大於一個組下的消費者數量,會出現一個消費者負責多個分區的消費,會出現消費性能不均衡的情況。
因此,在實際的應用中,建議消費者組的 consumer 的數量與 partition 的數量保持一致!
光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。
kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。
zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk
下載zookeeper,並解壓文件包
創建數據、日誌目錄
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存儲路徑
最後,啟動 Zookeeper 服務
到官網 下載想要的版本,我這裡下載是最新穩定版 2.8.0 。
按需修改配置文件 server.properties (可選)
server.properties 文件內容如下:
其中有四個重要的參數:
可根據自己需求修改對應的配置!
啟動 kafka 服務
創建一個名為 testTopic 的主題,它只包含一個分區,只有一個副本:
運行 list topic 命令,可以看到該主題。
輸出內容:
Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。
運行生產者,然後在控制台中鍵入一些消息以發送到伺服器。
輸入兩條內容並回車:
Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。
輸出結果如下:
本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。
由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/198207.html