javakafka,javakafka消費程序

本文目錄一覽:

Kafka相關內容總結(Kafka集群搭建手記)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。

入門請參照:

在此不再贅述。

這部分不是本文的重點,但是kafka需要用到kafka集群,所以先搭建kafka集群。

從kafka官方文檔看到,kafka似乎在未來的版本希望拋棄zookeep集群,自己維護集群的一致性,拭目以待吧。

我們搭建集群使用的是三台同機房的機器,因為zookeeper不怎麼佔資源也不怎麼占空間(我們的業務目前比較簡單),所以三台機器上都搭建了zookeeper集群。

搭建zookeeper集群沒什麼難度,參考文檔:

下面列一下我的配置並解析:

一共用三台物理機器,搭建一個Kafka集群。

每台服務器的硬盤劃分都是一樣的,每個獨立的物理磁盤掛在一個單獨的分區裡面,這樣很方便用於Kafka多個partition的數據讀寫與冗餘。

/data1比較小,為了不成為集群的瓶頸,所以/data1用於存放kafka以及Zookeeper

每台機器的磁盤分布如下:

下面是kafka的簡單配置,三台服務器都一樣,如有不一致的在下文有說明。

kafka安裝在目錄/usr/local/kafka/下,下面的說明以10.1.xxx.57為例。

最重要的配置文件server.properties,需要配置的信息如下:

從上面的配置看到,kafka集群不需要像hadoop集群那樣,配置ssh通訊,而且一個kafka服務器(官方文檔稱之為broker,下面統一使用這個稱呼)並不知道其他的kafka服務器的存在,因此你需要逐個broker去啟動kafka。各個broker根據自己的配置,會自動去配置文件上的zk服務器報到,這就是一個有zk服務器粘合起來的kafka集群。

我寫了一個啟動腳本,放在 /usr/local/kafka/bin 下面。啟動腳本每個broker都一樣:

如同kafka集群裡面每一個broker都需要單獨啟動一樣,kafka集群裡面每一個broker都需要單獨關閉。

官方給出的關閉腳本是單獨運行 bin/kafka-server-stop.sh

但是我運行的結果是無法關閉。打開腳本一看,才發現是最簡單的辦法,發一個TERM信號到kafka的java進程,官方腳本給出的grep有點問題。

發信號之後,一直tail着kafka日誌,看到正常關閉。

指定zookeeper服務器,topic名稱是LvsKafka(注意topic名稱不能有英文句號(.)和下劃線(_),否則會通不過,理由是名稱會衝突,下文對此略有解析)

replication-factor指出重複因子是2,也就是每條數據有兩個拷貝,可靠性考慮。

partitions 指出需要多少個partition,數據量大的多一點,無論生產和消費,這是負載均衡和高並發的需要。

可以看到剛才新建的24個partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59這台機器。

建立topic時我們指出需要2個拷貝,從上面的輸出的Replicas字段看到,這兩個拷貝放在59,58兩個機器,也就是10.1.xxx.59和10.1.xxx.58.

Isr表示當前partition的所有拷貝所在的機器中,哪些是還活着(可以提供服務)的。現在是59和58都還存活。

這個命令另外還會看到一些類似於下面的內容:

__consumer_offsets到底是什麼呢?其實就是客戶端的消費進度,客戶端會定時上報到kafka集群,而kafka集群會把每個客戶端的消費進度放入一個自己內部的topic中,這個topic就是__consumer_offsets。我查看過__consumer_offsets的內容,其實就是每個客戶端的消費進度作為一條消息,放入__consumer_offsets這個topic中。

這裡給了我們兩個提示:

1、kafka自己管理客戶端的消費進度,而不是依靠zk,這就是kafka官方文檔說的kafka未來會拋棄zk的底氣之一;

2、留意到這個kafka自己的topic是帶下劃線的,也就是,kafka擔心我們自己建的topic如果帶下劃線的話會跟這些內部自用的topic衝突;

3分鐘帶你徹底搞懂 Kafka

Kafka到底是個啥?用來幹嘛的?

官方定義如下:

翻譯過來,大致的意思就是,這是一個實時數據處理系統,可以橫向擴展,並高可靠!

實時數據處理 ,從名字上看,很好理解,就是將數據進行實時處理,在現在流行的微服務開發中,最常用實時數據處理平台有 RabbitMQ、RocketMQ 等消息中間件。

這些中間件,最大的特點主要有兩個:

在早期的 web 應用程序開發中,當請求量突然上來了時候,我們會將要處理的數據推送到一個隊列通道中,然後另起一個線程來不斷輪訓拉取隊列中的數據,從而加快程序的運行效率。

但是隨着請求量不斷的增大,並且隊列通道的數據一致處於高負載,在這種情況下,應用程序的內存佔用率會非常高,稍有不慎,會出現內存不足,造成程序內存溢出,從而導致服務不可用。

隨着業務量的不斷擴張,在一個應用程序內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ等中間件。

採用這種模型,本質就是將要推送的數據,不在存放在當前應用程序的內存中,而是將數據存放到另一個專門負責數據處理的應用程序中,從而實現服務解耦。

消息中間件 :主要的職責就是保證能接受到消息,並將消息存儲到磁盤,即使其他服務都掛了,數據也不會丟失,同時還可以對數據消費情況做好監控工作。

應用程序 :只需要將消息推送到消息中間件,然後啟用一個線程來不斷從消息中間件中拉取數據,進行消費確認即可!

引入消息中間件之後,整個服務開發會變得更加簡單,各負其責。

Kafka 本質其實也是消息中間件的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。

LinkedIn 的開發團隊,為了解決數據管道問題,起初採用了 ActiveMQ 來進行數據交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數據傳遞系統的要求,經常由於各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的消息傳遞系統, Kafka 由此誕生 。

在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經被業界所認可,並成為大數據流水線的首選技術。

先來看一張圖, 下面這張圖就是 kafka 生產與消費的核心架構模型 !

如果你看不懂這些概念沒關係,我會帶着大家一起梳理一遍!

簡而言之,kafka 本質就是一個消息系統,與大多數的消息系統一樣,主要的特點如下:

與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在於,它有一個**分區 Partition **的概念。

這個分區的意思就是說,如果你創建的 topic 有5個分區,當你一次性向 kafka 中推 1000 條數據時,這 1000 條數據默認會分配到 5 個分區中,其中每個分區存儲 200 條數據。

這樣做的目的,就是方便消費者從不同的分區拉取數據,假如你啟動 5 個線程同時拉取數據,每個線程拉取一個分區,消費速度會非常非常快!

這是 kafka 與其他的消息系統最大的不同!

和其他的中間件一樣,kafka 每次發送數據都是向 Leader 分區發送數據,並順序寫入到磁盤,然後 Leader 分區會將數據同步到各個從分區 Follower ,即使主分區掛了,也不會影響服務的正常運行。

那 kafka 是如何將數據寫入到對應的分區呢?kafka中有以下幾個原則:

與生產者一樣,消費者主動的去kafka集群拉取消息時,也是從 Leader 分區去拉取數據。

這裡我們需要重點了解一個名詞: 消費組 !

考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區的數據,同一個分區只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!

但是不同的組,可以消費同一個分區的數據!

你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。

但是,如果一個組下的消費者數量大於分區數量,就會出現很多的消費者閑置。

如果分區數量大於一個組下的消費者數量,會出現一個消費者負責多個分區的消費,會出現消費性能不均衡的情況。

因此,在實際的應用中,建議消費者組的 consumer 的數量與 partition 的數量保持一致!

光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。

kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。

zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk

下載zookeeper,並解壓文件包

創建數據、日誌目錄

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存儲路徑

最後,啟動 Zookeeper 服務

到官網 下載想要的版本,我這裡下載是最新穩定版 2.8.0 。

按需修改配置文件 server.properties (可選)

server.properties 文件內容如下:

其中有四個重要的參數:

可根據自己需求修改對應的配置!

啟動 kafka 服務

創建一個名為 testTopic 的主題,它只包含一個分區,只有一個副本:

運行 list topic 命令,可以看到該主題。

輸出內容:

Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。

運行生產者,然後在控制台中鍵入一些消息以發送到服務器。

輸入兩條內容並回車:

Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。

輸出結果如下:

本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。

由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!

kafka是幹嘛的

Kafka是由Apache軟件基金會開發的一個開源流處理平台,Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。這種動作是在現代網絡上的許多社會功能的一個關鍵因素。這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

kafka簡介

一、kafka定義

二、kafka的優勢

三、kafka的原理

四、kafka起源

一、Kafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。

二、kafka的優勢

高吞吐量、低延遲:kafka美妙之處是可以處理幾十萬條信息,它的延遲最低只有幾毫秒,每個topic可以分多個partition,consumer

group對partition進行consume操作。

可擴展性:kafka集群支持熱擴展

持久化、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失

容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)

高並發:支持數千個客戶端同時讀寫

三、kafka的原理

kafka是如何實現以上所述這幾點,我們逐一說明:

1.高吞吐量、低延遲

kafka在設計之初就是為了針對大數據量的傳輸處理,高吞吐量、低延遲最主要看的就是單位時間內所能讀寫的數據總量,我們先來看生產端。

kafka採取了一定量的批處理機制,即當生產數據達到一定數量或者達到時間窗口後,將所收集到的數據一批次的提交到服務器,我們假設處理一次數據的時間為1ms,那每秒鐘能處理1000條,延時為1ms,如果此時將處理間隔變成9ms,即每10ms處理一批數據,假設這段時間接收到100條處理,那每秒則能處理10000條,但是延時變成了10ms。為了獲得最大的吞吐量,需要犧牲一定的延遲,但是這樣的犧牲是值得的。當確定了這種小批量方式之後,高速的寫則取決於kafka自身寫磁盤的速度了。而由於kafka本身對數據不做任何的處理,只管寫入數據,保管數據,分發數據,因此會是一種批量順序寫入數據的情況,而磁盤的讀寫速度大量消耗在尋址上,也就是隨機讀寫,但是對於順序寫入的速度是非常快的,甚至能媲美內存的隨機寫入速度。有人做過一個對比,普通磁盤順序寫入每秒能達到53.2M/s,SSD的順序寫入速度為42.2M/s,內存的順序寫入速度為358.2M/s。kafka正是利用了這個特性,順序寫入,速度相對較快。而kafka本身雖然也是寫入磁盤持久化數據,但實際上kafka是將數據順序寫入頁緩存中(page cache),然後由操作系統自行決定何時寫到磁盤上,因此kafka的寫操作能在每秒輕輕鬆鬆達到寫入數十萬條記錄。並且基於kafka的動態擴展,這個數字還能不斷增大。

kafka在消費端也有着高吞吐量,由於kafka是將數據寫入到頁緩存中,同時由於讀寫相間的間隔並不大,很大可能性會在緩存中命中,從而保證高吞吐量。另外kafka由於本身不對數據做任何的修改,完全使用零拷貝技術,大大提升數據的讀取能力。

2.kafka每個節點叫做broker,而每一個broker都是獨立運行的,可以隨時加入kafka集群,集群的心跳管理是由zookeeper負責,新加入的broker只要broker id不與原有的衝突就能順利的加入集群中,實現動態擴展。

3.kafka的持久化在上面已經提到,kafka繞過了java的堆處理數據,直接將數據寫入頁緩存,然後由操作系統來管理頁緩存寫入磁盤,實現持久化。kafka每一個主題topic是一個業務數據,他可由多個partition組成,而每個partition可以有多個replica副本,用於保證數據的可靠性。replica分為兩個角色,一個是leader,一個是追隨者,同一時間,每一個partition只能有一個leader,其他都是追問隨者,laeder負責接收數據並寫入log,而追隨者不能被用戶寫入數據,只是從leader角色的replica副本中同步log寫入自己的log,保持數據同步。kafka中有一個概念,ISR,全稱是in-sync

replica,即所有可用的replica副本,這裡的ISR數量只要大於1,這個partition就能正常運作,因此容錯性非常好,假設n個replica,那最多可以壞n-1個replica的情況下,還能保持系統正常運行。當replica遲滯到一定時間後,會被kafka從ISR中剔除,當再次同步後,可以再次加入ISR,如果這時候leader出現問題,會從ISR中重新選舉一個leader,原先的leader再次同步成功後會重新加入ISR,成為一個flower。

4.上面提到了kafka的ISR機制,kafka的容錯性就是由ISR的機制來保證的。

5.kafka集群可以動態擴展broker,多個partition同時寫入消費數據,實現真正的高並發。

四、kafka的起源

kafka起源於LinkedIn公司,當時領英公司需要收集兩大類數據,一是業務系統和應用程序的性能監控指標數據,而是用戶的操作行為數據。當時為了收集這兩類數據,領英自研了兩套相應的數據收集系統,但是這兩套系統都存在一些弊端,無法實現實時交互、實時性差、維護成本高。因此領英的工程師希望找到一個統一的組件來收集分發消費這些大批量的數據,ActiveMQ由於擴展性不足,不能支撐大數據量而被拋棄,從而決定自研一套滿足需求的系統組件,也就是kafka。

kafka的設計之初主要有三個目標:

1.為生產者和消費者提供一套簡單的API

2.降低網絡傳輸和磁盤存儲開銷

3.具有高伸縮性架構

目前kafka可以算是超額完成了目標。

kafka的名稱由來也很有意思,因為kafka系統的寫操作性能特彆強,因此想使用一個作家的名字來命名kafka,而Jay Kreps,kafka的三位作者之一,在上大學的時候很喜歡Franz Kafka,因此起來這樣一個名字。

kafka在2010年開源,2011年7月正式進入Apache進行孵化,2012年10月順利畢業,後成為Apache的頂級項目。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/198791.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-04 10:27
下一篇 2024-12-04 10:27

相關推薦

  • Python程序需要編譯才能執行

    Python 被廣泛應用於數據分析、人工智能、科學計算等領域,它的靈活性和簡單易學的性質使得越來越多的人喜歡使用 Python 進行編程。然而,在 Python 中程序執行的方式不…

    編程 2025-04-29
  • python強行終止程序快捷鍵

    本文將從多個方面對python強行終止程序快捷鍵進行詳細闡述,並提供相應代碼示例。 一、Ctrl+C快捷鍵 Ctrl+C快捷鍵是在終端中經常用來強行終止運行的程序。當你在終端中運行…

    編程 2025-04-29
  • Python程序文件的拓展

    Python是一門功能豐富、易於學習、可讀性高的編程語言。Python程序文件通常以.py為文件拓展名,被廣泛應用於各種領域,包括Web開發、機器學習、科學計算等。為了更好地發揮P…

    編程 2025-04-29
  • Python購物車程序

    Python購物車程序是一款基於Python編程語言開發的程序,可以實現購物車的相關功能,包括商品的添加、購買、刪除、統計等。 一、添加商品 添加商品是購物車程序的基礎功能之一,用…

    編程 2025-04-29
  • 爬蟲是一種程序

    爬蟲是一種程序,用於自動獲取互聯網上的信息。本文將從如下多個方面對爬蟲的意義、運行方式、應用場景和技術要點等進行詳細的闡述。 一、爬蟲的意義 1、獲取信息:爬蟲可以自動獲取互聯網上…

    編程 2025-04-29
  • Vb運行程序的三種方法

    VB是一種非常實用的編程工具,它可以被用於開發各種不同的應用程序,從簡單的計算器到更複雜的商業軟件。在VB中,有許多不同的方法可以運行程序,包括編譯器、發布程序以及命令行。在本文中…

    編程 2025-04-29
  • Python一元二次方程求解程序

    本文將詳細闡述Python一元二次方程求解程序的相關知識,為讀者提供全面的程序設計思路和操作方法。 一、方程求解 首先,我們需要了解一元二次方程的求解方法。一元二次方程可以寫作: …

    編程 2025-04-29
  • 如何使用GPU加速運行Python程序——以CSDN為中心

    GPU的強大性能是眾所周知的。而隨着深度學習和機器學習的發展,越來越多的Python開發者將GPU應用於深度學習模型的訓練過程中,提高了模型訓練效率。在本文中,我們將介紹如何使用G…

    編程 2025-04-29
  • Web程序和桌面程序的區別

    Web程序和桌面程序都是進行軟件開發的方式,但是它們之間存在很大的區別。本文將從多角度進行闡述。 一、運行方式 Web程序運行於互聯網上,用戶可以通過使用瀏覽器來訪問它。而桌面程序…

    編程 2025-04-29
  • 微信小程序和Python數據交互完整指南

    本篇文章將從多個方面介紹如何在微信小程序中實現與Python的數據交互。通過本文的學習,您將掌握如何將微信小程序與後台Python代碼結合起來,實現更豐富的功能。 一、概述 微信小…

    編程 2025-04-29

發表回復

登錄後才能評論