一、kafkak8s介紹
Kafka是一種分散式的基於發布/訂閱模式的消息隊列,用Scala語言編寫,由Apache軟體基金會開發。Kafka是一種快速的、可擴展的、設計內部極具容錯性的消息隊列系統,最初由LinkedIn公司開發,後來成為Apache項目的一部分。
Kafka8s是將Kafka與Kubernetes編排系統完美融合的產物。它利用Kubernetes的強大特性,輕鬆管理Kafka的部署、伸縮和升級過程,提高Kafka在容器時代的適用性。
Kafka8s架構如下圖所示:
二、部署Kafka集群
使用Kafka8s部署Kafka集群非常簡單。只需要編寫一個Kafka集群的配置文件,然後使用Kubernetes API將其部署到Kubernetes集群即可。
以下是一個Kafka集群的配置文件示例:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-kafka
spec:
kafka:
version: 2.7.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
auto.create.topics.enable: "true"
offsets.topic.replication.factor: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
- id: 1
type: persistent-claim
size: 100Gi
deleteClaim: false
jvmOptions:
"-Xms": "2g"
"-Xmx": "2g"
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
有了以上文件,只需要使用以下命令即可將Kafka集群部署到Kubernetes集群中:
kubectl create -f kafka-cluster.yaml
使用kubectl命令查看Kafka集群部署狀態:
kubectl get kafka
三、消息發送和消費
使用Java編寫一個簡單的生產者代碼向Kafka集群發送消息:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MyProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.close();
}
}
使用Java編寫一個簡單的消費者代碼從Kafka集群接收消息:
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
四、Kafka監控
使用Prometheus和Grafana可以實現對Kafka的監控。以下是一個Prometheus的配置文件的示例:
global:
scrape_interval: 10s
evaluation_interval: 10s
scrape_configs:
- job_name: kafka
metrics_path: "/metrics"
scheme: "http"
static_configs:
- targets: ["my-cluster-kafka-bootstrap:8080"]
relabel_configs:
- source_labels: [__address__]
regex: (.*)
target_label: instance
replacement: ${1}
使用以下命令啟動Prometheus:
docker run -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
使用以下命令下載並安裝Grafana:
curl https://grafana.com/api/plugins/grafana-piechart-panel/versions/1.6.0/download -k > /tmp/grafana-piechart-panel.zip
unzip /tmp/grafana-piechart-panel.zip -d /var/lib/grafana/plugins/
在Grafana中導入以下Dashboard模板即可:
{
"id": null,
"title": "Kafka Metrics",
"panels": [
{
"title": "Kafka Broker Metrics",
"type": "graph",
"span": 12,
"targets": [
{
"expr": "kafka_server_brokertopicmetrics_bytesinpersec[1m]",
"interval": "10s",
"legendFormat": "{{kafka_exporter_broker}}",
"refId": "A"
}
]
},
{
"title": "Kafka Topic Metrics",
"type": "table",
"span": 12,
"pageSize": null,
"columns": [],
"scroll": true,
"showHeader": true,
"showFullscreenControl": true,
"dataSource": {
"name": "Prometheus",
"type": "prometheus",
"url": "http://localhost:9090",
"access": "proxy",
"basicAuth": false
},
"targets": [
{
"expr": "kafka_topic_partition_current_offset{topic=\"my-topic\"}",
"interval": "10s",
"legendFormat": "",
"refId": "A"
},
{
"expr": "kafka_topic_partition_end_offset{topic=\"my-topic\"}",
"interval": "10s",
"legendFormat": "",
"refId": "B"
},
{
"expr": "rate(kafka_server_brokertopicmetrics_bytesinpersec{topic=\"my-topic\"}[1m])",
"interval": "10s",
"legendFormat": "",
"refId": "C"
}
]
}
],
"schemaVersion": 22,
"version": 0
}
五、Kafka集群伸縮
使用Kafka8s可以輕鬆地進行Kafka集群的伸縮。只需要修改Kafka集群配置文件中的replicas值,然後使用以下命令即可更新Kafka集群:
kubectl apply -f kafka-cluster.yaml
六、總結
Kafka8s是一個非常好用的分散式消息系統的部署方案,在容器時代具有很高的適用性。使用Kafka8s,我們可以輕鬆地部署和管理Kafka集群,同時也能夠方便地進行監控和伸縮操作。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/182086.html