使用kafkak8s為中心打造分散式消息系統

一、kafkak8s介紹


Kafka是一種分散式的基於發布/訂閱模式的消息隊列,用Scala語言編寫,由Apache軟體基金會開發。Kafka是一種快速的、可擴展的、設計內部極具容錯性的消息隊列系統,最初由LinkedIn公司開發,後來成為Apache項目的一部分。

Kafka8s是將Kafka與Kubernetes編排系統完美融合的產物。它利用Kubernetes的強大特性,輕鬆管理Kafka的部署、伸縮和升級過程,提高Kafka在容器時代的適用性。

Kafka8s架構如下圖所示:

二、部署Kafka集群

使用Kafka8s部署Kafka集群非常簡單。只需要編寫一個Kafka集群的配置文件,然後使用Kubernetes API將其部署到Kubernetes集群即可。

以下是一個Kafka集群的配置文件示例:


apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-kafka
spec:
  kafka:
    version: 2.7.0
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      auto.create.topics.enable: "true"
      offsets.topic.replication.factor: 3
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    jvmOptions:
      "-Xms": "2g"
      "-Xmx": "2g"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false

有了以上文件,只需要使用以下命令即可將Kafka集群部署到Kubernetes集群中:


kubectl create -f kafka-cluster.yaml

使用kubectl命令查看Kafka集群部署狀態:


kubectl get kafka

三、消息發送和消費

使用Java編寫一個簡單的生產者代碼向Kafka集群發送消息:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MyProducer {
    public static void main(String[] args) {    
        Properties props = new Properties();
        props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);        
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
        producer.send(record);     
        producer.close();   
    }
}

使用Java編寫一個簡單的消費者代碼從Kafka集群接收消息:


import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
    public static void main(String[] args) {    
        Properties props = new Properties();
        props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
        props.put("group.id", "my-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

四、Kafka監控

使用Prometheus和Grafana可以實現對Kafka的監控。以下是一個Prometheus的配置文件的示例:


global:
  scrape_interval: 10s
  evaluation_interval: 10s
scrape_configs:
  - job_name: kafka
    metrics_path: "/metrics"
    scheme: "http"
    static_configs:
      - targets: ["my-cluster-kafka-bootstrap:8080"]
    relabel_configs:
      - source_labels: [__address__]
        regex: (.*)
        target_label: instance
        replacement: ${1}

使用以下命令啟動Prometheus:


docker run -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

使用以下命令下載並安裝Grafana:


curl https://grafana.com/api/plugins/grafana-piechart-panel/versions/1.6.0/download -k > /tmp/grafana-piechart-panel.zip
unzip /tmp/grafana-piechart-panel.zip -d /var/lib/grafana/plugins/

在Grafana中導入以下Dashboard模板即可:


{
  "id": null,
  "title": "Kafka Metrics",
  "panels": [
    {
      "title": "Kafka Broker Metrics",
      "type": "graph",
      "span": 12,
      "targets": [
        {
          "expr": "kafka_server_brokertopicmetrics_bytesinpersec[1m]",
          "interval": "10s",
          "legendFormat": "{{kafka_exporter_broker}}",
          "refId": "A"
        }
      ]
    },
    {
      "title": "Kafka Topic Metrics",
      "type": "table",
      "span": 12,
      "pageSize": null,
      "columns": [],
      "scroll": true,
      "showHeader": true,
      "showFullscreenControl": true,
      "dataSource": {
        "name": "Prometheus",
        "type": "prometheus",
        "url": "http://localhost:9090",
        "access": "proxy",
        "basicAuth": false
      },
      "targets": [
        {
          "expr": "kafka_topic_partition_current_offset{topic=\"my-topic\"}",
          "interval": "10s",
          "legendFormat": "",
          "refId": "A"
        },
        {
          "expr": "kafka_topic_partition_end_offset{topic=\"my-topic\"}",
          "interval": "10s",
          "legendFormat": "",
          "refId": "B"
        },
        {
          "expr": "rate(kafka_server_brokertopicmetrics_bytesinpersec{topic=\"my-topic\"}[1m])",
          "interval": "10s",
          "legendFormat": "",
          "refId": "C"
        }
      ]
    }
  ],
  "schemaVersion": 22,
  "version": 0
}

五、Kafka集群伸縮

使用Kafka8s可以輕鬆地進行Kafka集群的伸縮。只需要修改Kafka集群配置文件中的replicas值,然後使用以下命令即可更新Kafka集群:


kubectl apply -f kafka-cluster.yaml

六、總結

Kafka8s是一個非常好用的分散式消息系統的部署方案,在容器時代具有很高的適用性。使用Kafka8s,我們可以輕鬆地部署和管理Kafka集群,同時也能夠方便地進行監控和伸縮操作。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/182086.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-11-24 06:17
下一篇 2024-11-24 06:18

相關推薦

  • ArcGIS更改標註位置為中心的方法

    本篇文章將從多個方面詳細闡述如何在ArcGIS中更改標註位置為中心。讓我們一步步來看。 一、禁止標註智能調整 在ArcMap中設置標註智能調整可以自動將標註位置調整到最佳顯示位置。…

    編程 2025-04-29
  • Deepin系統分區設置教程

    本教程將會詳細介紹Deepin系統如何進行分區設置,分享多種方式讓您了解如何規劃您的硬碟。 一、分區的基本知識 在進行Deepin系統分區設置之前,我們需要了解一些基本分區概念。 …

    編程 2025-04-29
  • KeyDB Java:完美的分散式高速緩存方案

    本文將從以下幾個方面對KeyDB Java進行詳細闡述:KeyDB Java的特點、安裝和配置、使用示例、性能測試。 一、KeyDB Java的特點 KeyDB Java是KeyD…

    編程 2025-04-29
  • Java任務下發回滾系統的設計與實現

    本文將介紹一個Java任務下發回滾系統的設計與實現。該系統可以用於執行複雜的任務,包括可回滾的任務,及時恢復任務失敗前的狀態。系統使用Java語言進行開發,可以支持多種類型的任務。…

    編程 2025-04-29
  • 如何在樹莓派上安裝Windows 7系統?

    隨著樹莓派的普及,許多用戶想在樹莓派上安裝Windows 7操作系統。 一、準備工作 在開始之前,需要準備以下材料: 1.樹莓派4B一台; 2.一張8GB以上的SD卡; 3.下載並…

    編程 2025-04-29
  • CPU爆滿怎麼解決 Java為中心

    在Java編程中,難免會遇到CPU佔用過高的情況,接下來從多個方面介紹如何解決CPU爆滿問題。 一、優化代碼 1、減少循環次數。循環體內不要放太多邏輯判斷和計算,可以把計算提取出來…

    編程 2025-04-29
  • CMD如何升級為中心?

    本文將詳細介紹在Windows操作系統下如何將CMD升級為中心,以及如何在升級後使用CMD中心進行操作。 一、下載Windows Terminal Windows Terminal…

    編程 2025-04-29
  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • 如何使用GPU加速運行Python程序——以CSDN為中心

    GPU的強大性能是眾所周知的。而隨著深度學習和機器學習的發展,越來越多的Python開發者將GPU應用於深度學習模型的訓練過程中,提高了模型訓練效率。在本文中,我們將介紹如何使用G…

    編程 2025-04-29
  • 如何修改ant組件的動效為中心

    當我們使用Ant Design時,其默認的組件動效可能不一定符合我們的需求,這時我們需要修改Ant Design組件動效,使其更加符合我們的UI設計。本文將從多個方面詳細闡述如何修…

    編程 2025-04-29

發表回復

登錄後才能評論