本文目錄一覽:
- 1、怎麼使用java連接kafka
- 2、Kafka簡介+Kafka Tool使用簡介+使用實例
- 3、kafka集群測試正常,但是Java連接kafka出現異常,急求大神解答!!!!!!!!!!!
- 4、Kafka系列之(4)——Kafka Producer流程解析
- 5、3分鐘帶你徹底搞懂 Kafka
- 6、使用java實現kafka consumer時報錯
怎麼使用java連接kafka
把你要傳遞的數據轉換成json字元串返回介面,然後手機端調用介面就可以獲取到你要傳遞是值了
Kafka簡介+Kafka Tool使用簡介+使用實例
詳細安裝訪問:
macOS 可以用homebrew快速安裝,訪問地址:
原文鏈接:
查看topic列表:
創建topic:
–create :創建命令;
–topic :後面指定topic名稱;
–replication-factor :後面指定副本數;
–partitions :指定分區數,根據broker的數量決定;
–zookeeper :後面指定 zookeeper.connect 的zk鏈接
查看某個topic:
Kafka 作為消息系統的一種, 當然可 以像其他消 息中 間件一樣作為消息數據中轉的平台。 下面以 Java 語言為例,看一下如何使用 Kafka 來發送和接收消息。
1、引入依賴
2、消息生產者
示例 中用 KafkaProducer 類來創建一個消息生產者,該類的構造函數入參是一系列屬性值。下面看一下這些屬性具體都是什麼含義。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理伺服器,則伺服器地址之間用逗號分隔, 比如」 192.168.1.1 :9092,192.168.1.2:9092」 。 localhost 是筆者電腦的地址,9092 是 Kafka 伺服器默認監聽的埠號。
key.serializer 和 value.serializer 表示消息的序列化類型 。 Kafka 的消息是以鍵值對的形式發送到 Kafka 伺服器的,在消息被發送到伺服器之前,消息生產者需要把不同類型的 消息序列化為 二 進位類型,示例中是發送文本消息到伺服器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化類型。把來自 Kafka 集群的二進位消 息反序列 化 為指定 的 類型,因為序列化用的是String類型,所以用StringDeserializer 來反序列化。
zk.connect 用於指定 Kafka 連接 ZooKeeper 的 URL ,提供了基於 ZooKeeper 的集群伺服器自動感知功能, 可以動態從 ZooKeeper 中讀取 Kafka 集群配置信息。
有 了 消息生產者之後 , 就可以調用 send 方法發送消息了。該方法的入參是 ProducerRecord類型對象 , ProducerRecord 類提供了多種構造函數形參,常見的有如下三種 :
ProducerRecord(topic,partition,key,value);
ProducerRecord(topic,key,value);
ProducerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可選的 。如果指定了 pa時tion,那麼消息會被發送至指定的 partition ;如果沒指定 partition 但指定了 Key,那麼消息會按照 hash(key)發送至對應的 partition: 如果既沒指定 partition 也沒指定 key,那麼 消息會按照 round-robin 模式發送(即以輪詢的方式依次發送〉到每一個 partition。示例中將向 test-topic 主題發送三條消息。
3、消息消費者
和消息生產者類似,這裡用 KafkaConsumer 類來創建一個消息消費者,該類的構造函數入參也是一系列屬性值。
bootstrap. servers 和生產者一樣,表示 Kafka 集群。
group.id 表示消費者的分組 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 動提交 。
auto.commit.interval .ms 用於設置自動提交 offset 到 ZooKeeper 的時間間隔,時間單位是毫秒。
key. deserializer 和 value.deserializer 表示用字元串來反序列化消息數據。
消息消費者使用 subscribe 方法 訂閱了 Topic 為 test-topic 的消息。 Consumer 調用poll 方法來輪詢 Kafka 集群的消息, 一直等到 Kafka 集群中沒有消息或達到超時時間(示例中設置超時時間為 100 毫秒)為止 。 如果讀取到消息,則列印出消息記錄的 pa此ition, offset、key 等。
kafka集群測試正常,但是Java連接kafka出現異常,急求大神解答!!!!!!!!!!!
首先你在鏈接時候檢查是否代碼里的IP 和埠是不是對的,埠是broker 埠,默認9092 ;
其次查看代碼是生產者,看Kafka 集群里這個主題是否存在(如果不存在,默認是配置可以自動創建,看是非將該配置修改);然後檢測防火牆,相應埠是否開放(防火牆直接關也可以);檢測 server.properties 文件的 listeners 是否配置,若沒有將其配置好
Kafka系列之(4)——Kafka Producer流程解析
Kafka 0.9版本正式使用Java版本的producer替換了原Scala版本的producer。
註:ProducerRecord允許用戶在創建消息對象的時候就直接指定要發送的分區,這樣producer後續發送該消息時可以直接發送到指定分區,而不用先通過Partitioner計算目標分區了。另外,我們還可以直接指定消息的時間戳——但一定要慎重使用這個功能,因為它有可能會令時間戳索引機制失效。
流程描述:
用戶首先構建待發送的消息對象ProducerRecord,然後調用KafkaProducer#send方法進行發送。KafkaProducer接收到消息後首先對其進行序列化,然後結合本地緩存的元數據信息一起發送給partitioner去確定目標分區,最後追加寫入到內存中的消息緩衝池(accumulator)。此時KafkaProducer#send方法成功返回。同時,KafkaProducer中還有一個專門的Sender IO線程負責將緩衝池中的消息分批次發送給對應的broker,完成真正的消息發送邏輯。
新版本的producer從設計上來說具有以下幾個特點:
總共創建兩個線程:執行KafkaPrducer#send邏輯的線程——我們稱之為「用戶主線程」;執行發送邏輯的IO線程——我們稱之為「Sender線程」。
不同於Scala老版本的producer,新版本producer完全非同步發送消息,並提供了回調機制(callback)供用戶判斷消息是否成功發送。
batching機制——「分批發送「機制。每個批次(batch)中包含了若干個PRODUCE請求,因此具有更高的吞吐量。
更加合理的默認分區策略:對於無key消息而言,Scala版本分區策略是一段時間內(默認是10分鐘)將消息發往固定的目標分區,這容易造成消息分布的不均勻,而新版本的producer採用輪詢的方式均勻地將消息分發到不同的分區。
底層統一使用基於Selector的網路客戶端實現,結合Java提供的Future實現完整地提供了更加健壯和優雅的生命周期管理。
關鍵參數
batch.size 我把它列在了首位,因為該參數對於調優producer至關重要。之前提到過新版producer採用分批發送機制,該參數即控制一個batch的大小。默認是16KB
acks 關乎到消息持久性(durability)的一個參數。高吞吐量和高持久性很多時候是相矛盾的,需要先明確我們的目標是什麼? 高吞吐量?高持久性?亦或是中等?因此該參數也有對應的三個取值:0, -1和1
linger.ms 減少網路IO,節省帶寬之用。原理就是把原本需要多次發送的小batch,通過引入延時的方式合併成大batch發送,減少了網路傳輸的壓力,從而提升吞吐量。當然,也會引入延時
compression.type producer 所使用的壓縮器,目前支持gzip, snappy和lz4。壓縮是在用戶主線程完成的,通常都需要花費大量的CPU時間,但對於減少網路IO來說確實利器。生產環境中可以結合壓力測試進行適當配置
max.in.flight.requests.per.connection 關乎消息亂序的一個配置參數。它指定了Sender線程在單個Socket連接上能夠發送未應答PRODUCE請求的最大請求數。適當增加此值通常會增大吞吐量,從而整體上提升producer的性能。不過筆者始終覺得其效果不如調節batch.size來得明顯,所以請謹慎使用。另外如果開啟了重試機制,配置該參數大於1可能造成消息發送的亂序(先發送A,然後發送B,但B卻先行被broker接收)
retries 重試機制,對於瞬時失敗的消息發送,開啟重試後KafkaProducer會嘗試再次發送消息。對於有強烈無消息丟失需求的用戶來說,開啟重試機制是必選項。
當用戶調用KafkaProducer.send(ProducerRecord, Callback)時Kafka內部流程分析:
這是KafkaProducer#send邏輯的第一步,即為待發送消息進行序列化並計算目標分區,如下圖所示:
如上圖所示,一條所屬topic是”test”,消息體是”message”的消息被序列化之後結合KafkaProducer緩存的元數據(比如該topic分區數信息等)共同傳給後面的Partitioner實現類進行目標分區的計算。
producer創建時會創建一個默認32MB(由buffer.memory參數指定)的accumulator緩衝區,專門保存待發送的消息。除了之前在「關鍵參數」段落中提到的linger.ms和batch.size等參數之外,該數據結構中還包含了一個特別重要的集合信息:消息批次信息(batches)。該集合本質上是一個HashMap,裡面分別保存了每個topic分區下的batch隊列,即前面說的批次是按照topic分區進行分組的。這樣發往不同分區的消息保存在對應分區下的batch隊列中。舉個簡單的例子,假設消息M1, M2被發送到test的0分區但屬於不同的batch,M3分送到test的1分區,那麼batches中包含的信息就是:{“test-0” – [batch1, batch2], “test-1” – [batch3]}。
單個topic分區下的batch隊列中保存的是若干個消息批次。每個batch中最重要的3個組件包括:
compressor: 負責執行追加寫入操作
batch緩衝區:由batch.size參數控制,消息被真正追加寫入到的地方
thunks:保存消息回調邏輯的集合
這一步的目的就是將待發送的消息寫入消息緩衝池中,具體流程如下圖所示:
這一步執行完畢之後理論上講KafkaProducer.send方法就執行完畢了,用戶主線程所做的事情就是等待Sender線程發送消息並執行返回結果了。
此時,該Sender線程登場了。嚴格來說,Sender線程自KafkaProducer創建後就一直都在運行著 。它的工作流程基本上是這樣的:
不斷輪詢緩衝區尋找 已做好發送準備的分區 ;
將輪詢獲得的各個batch按照目標分區所在的leader broker進行分組;
將分組後的batch通過底層創建的 Socket連接 發送給各個broker;
等待伺服器端發送response回來。
為了說明上的方便,我還是基於圖的方式來解釋Sender線程的工作原理:
上圖中Sender線程會發送PRODUCE請求給對應的broker,broker處理完畢之後發送對應的PRODUCE response。一旦Sender線程接收到response將依次(按照消息發送順序)調用batch中的回調方法,如下圖所示:
refer:
3分鐘帶你徹底搞懂 Kafka
Kafka到底是個啥?用來幹嘛的?
官方定義如下:
翻譯過來,大致的意思就是,這是一個實時數據處理系統,可以橫向擴展,並高可靠!
實時數據處理 ,從名字上看,很好理解,就是將數據進行實時處理,在現在流行的微服務開發中,最常用實時數據處理平台有 RabbitMQ、RocketMQ 等消息中間件。
這些中間件,最大的特點主要有兩個:
在早期的 web 應用程序開發中,當請求量突然上來了時候,我們會將要處理的數據推送到一個隊列通道中,然後另起一個線程來不斷輪訓拉取隊列中的數據,從而加快程序的運行效率。
但是隨著請求量不斷的增大,並且隊列通道的數據一致處於高負載,在這種情況下,應用程序的內存佔用率會非常高,稍有不慎,會出現內存不足,造成程序內存溢出,從而導致服務不可用。
隨著業務量的不斷擴張,在一個應用程序內,使用這種模式已然無法滿足需求,因此之後,就誕生了各種消息中間件,例如 ActiveMQ、RabbitMQ、RocketMQ等中間件。
採用這種模型,本質就是將要推送的數據,不在存放在當前應用程序的內存中,而是將數據存放到另一個專門負責數據處理的應用程序中,從而實現服務解耦。
消息中間件 :主要的職責就是保證能接受到消息,並將消息存儲到磁碟,即使其他服務都掛了,數據也不會丟失,同時還可以對數據消費情況做好監控工作。
應用程序 :只需要將消息推送到消息中間件,然後啟用一個線程來不斷從消息中間件中拉取數據,進行消費確認即可!
引入消息中間件之後,整個服務開發會變得更加簡單,各負其責。
Kafka 本質其實也是消息中間件的一種,Kafka 出自於 LinkedIn 公司,與 2010 年開源到 github。
LinkedIn 的開發團隊,為了解決數據管道問題,起初採用了 ActiveMQ 來進行數據交換,大約是在 2010 年前後,那時的 ActiveMQ 還遠遠無法滿足 LinkedIn 對數據傳遞系統的要求,經常由於各種缺陷而導致消息阻塞或者服務無法正常訪問,為了能夠解決這個問題,LinkedIn 決定研發自己的消息傳遞系統, Kafka 由此誕生 。
在 LinkedIn 公司,Kafka 可以有效地處理每天數十億條消息的指標和用戶活動跟蹤,其強大的處理能力,已經被業界所認可,並成為大數據流水線的首選技術。
先來看一張圖, 下面這張圖就是 kafka 生產與消費的核心架構模型 !
如果你看不懂這些概念沒關係,我會帶著大家一起梳理一遍!
簡而言之,kafka 本質就是一個消息系統,與大多數的消息系統一樣,主要的特點如下:
與 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在於,它有一個**分區 Partition **的概念。
這個分區的意思就是說,如果你創建的 topic 有5個分區,當你一次性向 kafka 中推 1000 條數據時,這 1000 條數據默認會分配到 5 個分區中,其中每個分區存儲 200 條數據。
這樣做的目的,就是方便消費者從不同的分區拉取數據,假如你啟動 5 個線程同時拉取數據,每個線程拉取一個分區,消費速度會非常非常快!
這是 kafka 與其他的消息系統最大的不同!
和其他的中間件一樣,kafka 每次發送數據都是向 Leader 分區發送數據,並順序寫入到磁碟,然後 Leader 分區會將數據同步到各個從分區 Follower ,即使主分區掛了,也不會影響服務的正常運行。
那 kafka 是如何將數據寫入到對應的分區呢?kafka中有以下幾個原則:
與生產者一樣,消費者主動的去kafka集群拉取消息時,也是從 Leader 分區去拉取數據。
這裡我們需要重點了解一個名詞: 消費組 !
考慮到多個消費者的場景,kafka 在設計的時候,可以由多個消費者組成一個消費組,同一個消費組者的消費者可以消費同一個 topic 下不同分區的數據,同一個分區只會被一個消費組內的某個消費者所消費,防止出現重複消費的問題!
但是不同的組,可以消費同一個分區的數據!
你可以這樣理解,一個消費組就是一個客戶端,一個客戶端可以由很多個消費者組成,以便加快消息的消費能力。
但是,如果一個組下的消費者數量大於分區數量,就會出現很多的消費者閑置。
如果分區數量大於一個組下的消費者數量,會出現一個消費者負責多個分區的消費,會出現消費性能不均衡的情況。
因此,在實際的應用中,建議消費者組的 consumer 的數量與 partition 的數量保持一致!
光說理論可沒用,下面我們就以 centos7 為例,介紹一下 kafka 的安裝和使用。
kafka 需要 zookeeper 來保存服務實例的元信息,因此在安裝 kafka 之前,我們需要先安裝 zookeeper。
zookeeper 安裝環境依賴於 jdk,因此我們需要事先安裝 jdk
下載zookeeper,並解壓文件包
創建數據、日誌目錄
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存儲路徑
最後,啟動 Zookeeper 服務
到官網 下載想要的版本,我這裡下載是最新穩定版 2.8.0 。
按需修改配置文件 server.properties (可選)
server.properties 文件內容如下:
其中有四個重要的參數:
可根據自己需求修改對應的配置!
啟動 kafka 服務
創建一個名為 testTopic 的主題,它只包含一個分區,只有一個副本:
運行 list topic 命令,可以看到該主題。
輸出內容:
Kafka 附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其作為消息發送到 Kafka 集群。默認情況下,每行將作為單獨的消息發送。
運行生產者,然後在控制台中鍵入一些消息以發送到伺服器。
輸入兩條內容並回車:
Kafka 還有一個命令行使用者,它會將消息轉儲到標準輸出。
輸出結果如下:
本文主要圍繞 kafka 的架構模型和安裝環境做了一些初步的介紹,難免會有理解不對的地方,歡迎網友批評、吐槽。
由於篇幅原因,會在下期文章中詳細介紹 java 環境下 kafka 應用場景!
使用java實現kafka consumer時報錯
public static void consumer(){
Properties props = new Properties();
props.put(“zk.connect”, “hadoop-2:2181”);
props.put(“zk.connectiontimeout.ms”, “1000000”);
props.put(“groupid”, “fans_group”);
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
MapString, Integer map = new HashMapString, Integer();
map.put(“fans”, 1);
// create 4 partitions of the stream for topic 「test」, to allow 4 threads to consume
MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);
ListKafkaStreamMessage streams = topicMessageStreams.get(“fans”);
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStreamMessage stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIteratorMessage it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug(“use time=”+(System.currentTimeMillis()-startTime));
}
}
原創文章,作者:KEPL,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/141670.html