本文目錄一覽:
Kafka相關內容總結(Kafka集群搭建手記)
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。
入門請參照:
在此不再贅述。
這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。
從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。
我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼佔資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。
搭建zookeeper集群沒什麼難度,參考文檔:
下面列一下我的配置並解析:
一共用三台物理機器,搭建一個Kafka集群。
每台伺服器的硬碟劃分都是一樣的,每個獨立的物理磁碟掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。
/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper
每台機器的磁碟分布如下:
下面是kafka的簡單配置,三台伺服器都一樣,如有不一致的在下文有說明。
kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。
最重要的配置文件server.properties,需要配置的信息如下:
從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka伺服器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka伺服器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk伺服器報到,這就是一個有zk伺服器粘合起來的kafka集群。
我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:
如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。
官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh
但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。
發信號之後,一直tail著kafka日誌,看到正常關閉。
指定zookeeper伺服器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會衝突,下文對此略有解析)
replication-factor指出重複因子是2,也就是每條數據有兩個拷貝,可靠性考慮。
partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。
可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。
建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas欄位看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示當前partition的所有拷貝所在的機器中,哪些是還活著(可以提供服務)的。現在是59和58都還存活。
這個命令另外還會看到一些類似於下面的內容:
__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。
這裡給了我們兩個提示:
1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;
2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic衝突;
3分鐘帶你徹底搞懂 Kafka
Kafka到底是個啥?用來幹嘛的?
官方定義如下:
翻譯過來,大致的意思就是,這是一個實時數據處理系統,可以橫向擴展,並高可靠!
實時數據處理 ,從名字上看,很好理解,就是將數據進行實時處理,在現在流行的微服務開發中,最常用實時數據處理平台有 RabbitMQ、RocketMQ 等消息中間件。
這些中間件,最大的特點主要有兩個:
在早期的 web 應用程序開發中,當請求量突然上來了時候,我們會將要處理的數據推送到一個隊列通道中,然後另起一個線程來不斷輪訓拉取隊列中的數據,從而加快程序的運行效率。
但是隨著請求量不斷的增大,並且隊列通道的數據一致處於高負載,在這種情況下,應用程序的內存佔用率會非常高,稍有不慎,會出現內存不足,造成程序內存溢出,從而導致服務不可用。
隨著業務量的不斷擴張,在一個應用程序內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ等中間件。
採用這種模型,本質就是將要推送的數據,不在存放在當前應用程序的內存中,而是將數據存放到另一個專門負責數據處理的應用程序中,從而實現服務解耦。
消息中間件 :主要的職責就是保證能接受到消息,並將消息存儲到磁碟,即使其他服務都掛了,數據也不會丟失,同時還可以對數據消費情況做好監控工作。
應用程序 :只需要將消息推送到消息中間件,然後啟用一個線程來不斷從消息中間件中拉取數據,進行消費確認即可!
引入消息中間件之後,整個服務開發會變得更加簡單,各負其責。
Kafka 本質其實也是消息中間件的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。
LinkedIn 的開發團隊,為了解決數據管道問題,起初採用了 ActiveMQ 來進行數據交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數據傳遞系統的要求,經常由於各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的消息傳遞系統, Kafka 由此誕生 。
在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經被業界所認可,並成為大數據流水線的首選技術。
先來看一張圖, 下面這張圖就是 kafka 生產與消費的核心架構模型 !
如果你看不懂這些概念沒關係,我會帶著大家一起梳理一遍!
簡而言之,kafka 本質就是一個消息系統,與大多數的消息系統一樣,主要的特點如下:
與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在於,它有一個**分區 Partition **的概念。
這個分區的意思就是說,如果你創建的 topic 有5個分區,當你一次性向 kafka 中推 1000 條數據時,這 1000 條數據默認會分配到 5 個分區中,其中每個分區存儲 200 條數據。
這樣做的目的,就是方便消費者從不同的分區拉取數據,假如你啟動 5 個線程同時拉取數據,每個線程拉取一個分區,消費速度會非常非常快!
這是 kafka 與其他的消息系統最大的不同!
和其他的中間件一樣,kafka 每次發送數據都是向 Leader 分區發送數據,並順序寫入到磁碟,然後 Leader 分區會將數據同步到各個從分區 Follower ,即使主分區掛了,也不會影響服務的正常運行。
那 kafka 是如何將數據寫入到對應的分區呢?kafka中有以下幾個原則:
與生產者一樣,消費者主動的去kafka集群拉取消息時,也是從 Leader 分區去拉取數據。
這裡我們需要重點了解一個名詞: 消費組 !
考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區的數據,同一個分區只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!
但是不同的組,可以消費同一個分區的數據!
你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。
但是,如果一個組下的消費者數量大於分區數量,就會出現很多的消費者閑置。
如果分區數量大於一個組下的消費者數量,會出現一個消費者負責多個分區的消費,會出現消費性能不均衡的情況。
因此,在實際的應用中,建議消費者組的 consumer 的數量與 partition 的數量保持一致!
光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。
kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。
zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk
下載zookeeper,並解壓文件包
創建數據、日誌目錄
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存儲路徑
最後,啟動 Zookeeper 服務
到官網 下載想要的版本,我這裡下載是最新穩定版 2.8.0 。
按需修改配置文件 server.properties (可選)
server.properties 文件內容如下:
其中有四個重要的參數:
可根據自己需求修改對應的配置!
啟動 kafka 服務
創建一個名為 testTopic 的主題,它只包含一個分區,只有一個副本:
運行 list topic 命令,可以看到該主題。
輸出內容:
Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。
運行生產者,然後在控制台中鍵入一些消息以發送到伺服器。
輸入兩條內容並回車:
Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。
輸出結果如下:
本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。
由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!
kafka是幹嘛的
Kafka是由Apache軟體基金會開發的一個開源流處理平台,Scala和Java編寫。Kafka是一種高吞吐量的分散式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。這種動作是在現代網路上的許多社會功能的一個關鍵因素。這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
kafka簡介
一、kafka定義
二、kafka的優勢
三、kafka的原理
四、kafka起源
一、Kafka是最初由Linkedin公司開發,是一個分散式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分散式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。
二、kafka的優勢
高吞吐量、低延遲:kafka美妙之處是可以處理幾十萬條信息,它的延遲最低只有幾毫秒,每個topic可以分多個partition,consumer
group對partition進行consume操作。
可擴展性:kafka集群支持熱擴展
持久化、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
高並發:支持數千個客戶端同時讀寫
三、kafka的原理
kafka是如何實現以上所述這幾點,我們逐一說明:
1.高吞吐量、低延遲
kafka在設計之初就是為了針對大數據量的傳輸處理,高吞吐量、低延遲最主要看的就是單位時間內所能讀寫的數據總量,我們先來看生產端。
kafka採取了一定量的批處理機制,即當生產數據達到一定數量或者達到時間窗口後,將所收集到的數據一批次的提交到伺服器,我們假設處理一次數據的時間為1ms,那每秒鐘能處理1000條,延時為1ms,如果此時將處理間隔變成9ms,即每10ms處理一批數據,假設這段時間接收到100條處理,那每秒則能處理10000條,但是延時變成了10ms。為了獲得最大的吞吐量,需要犧牲一定的延遲,但是這樣的犧牲是值得的。當確定了這種小批量方式之後,高速的寫則取決於kafka自身寫磁碟的速度了。而由於kafka本身對數據不做任何的處理,只管寫入數據,保管數據,分發數據,因此會是一種批量順序寫入數據的情況,而磁碟的讀寫速度大量消耗在定址上,也就是隨機讀寫,但是對於順序寫入的速度是非常快的,甚至能媲美內存的隨機寫入速度。有人做過一個對比,普通磁碟順序寫入每秒能達到53.2M/s,SSD的順序寫入速度為42.2M/s,內存的順序寫入速度為358.2M/s。kafka正是利用了這個特性,順序寫入,速度相對較快。而kafka本身雖然也是寫入磁碟持久化數據,但實際上kafka是將數據順序寫入頁緩存中(page cache),然後由操作系統自行決定何時寫到磁碟上,因此kafka的寫操作能在每秒輕輕鬆鬆達到寫入數十萬條記錄。並且基於kafka的動態擴展,這個數字還能不斷增大。
kafka在消費端也有著高吞吐量,由於kafka是將數據寫入到頁緩存中,同時由於讀寫相間的間隔並不大,很大可能性會在緩存中命中,從而保證高吞吐量。另外kafka由於本身不對數據做任何的修改,完全使用零拷貝技術,大大提升數據的讀取能力。
2.kafka每個節點叫做broker,而每一個broker都是獨立運行的,可以隨時加入kafka集群,集群的心跳管理是由zookeeper負責,新加入的broker只要broker id不與原有的衝突就能順利的加入集群中,實現動態擴展。
3.kafka的持久化在上面已經提到,kafka繞過了java的堆處理數據,直接將數據寫入頁緩存,然後由操作系統來管理頁緩存寫入磁碟,實現持久化。kafka每一個主題topic是一個業務數據,他可由多個partition組成,而每個partition可以有多個replica副本,用於保證數據的可靠性。replica分為兩個角色,一個是leader,一個是追隨者,同一時間,每一個partition只能有一個leader,其他都是追問隨者,laeder負責接收數據並寫入log,而追隨者不能被用戶寫入數據,只是從leader角色的replica副本中同步log寫入自己的log,保持數據同步。kafka中有一個概念,ISR,全稱是in-sync
replica,即所有可用的replica副本,這裡的ISR數量只要大於1,這個partition就能正常運作,因此容錯性非常好,假設n個replica,那最多可以壞n-1個replica的情況下,還能保持系統正常運行。當replica遲滯到一定時間後,會被kafka從ISR中剔除,當再次同步後,可以再次加入ISR,如果這時候leader出現問題,會從ISR中重新選舉一個leader,原先的leader再次同步成功後會重新加入ISR,成為一個flower。
4.上面提到了kafka的ISR機制,kafka的容錯性就是由ISR的機制來保證的。
5.kafka集群可以動態擴展broker,多個partition同時寫入消費數據,實現真正的高並發。
四、kafka的起源
kafka起源於LinkedIn公司,當時領英公司需要收集兩大類數據,一是業務系統和應用程序的性能監控指標數據,而是用戶的操作行為數據。當時為了收集這兩類數據,領英自研了兩套相應的數據收集系統,但是這兩套系統都存在一些弊端,無法實現實時交互、實時性差、維護成本高。因此領英的工程師希望找到一個統一的組件來收集分發消費這些大批量的數據,ActiveMQ由於擴展性不足,不能支撐大數據量而被拋棄,從而決定自研一套滿足需求的系統組件,也就是kafka。
kafka的設計之初主要有三個目標:
1.為生產者和消費者提供一套簡單的API
2.降低網路傳輸和磁碟存儲開銷
3.具有高伸縮性架構
目前kafka可以算是超額完成了目標。
kafka的名稱由來也很有意思,因為kafka系統的寫操作性能特彆強,因此想使用一個作家的名字來命名kafka,而Jay Kreps,kafka的三位作者之一,在上大學的時候很喜歡Franz Kafka,因此起來這樣一個名字。
kafka在2010年開源,2011年7月正式進入Apache進行孵化,2012年10月順利畢業,後成為Apache的頂級項目。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/198791.html