一、Kafka Group ID概念
Kafka Group ID是Kafka中管理Consumer群組的重要概念。
在Kafka中,Consumer通過訂閱Topic來消費消息。一個Consumer Group中有多個Consumer實例,每個實例只能消費一個消息分區,多個實例同時消費同一個分區的情況下會導致消息重複,因此需要保證同一個Group中各個Consumer實例分配到不同的分區進行消費。
因此,在Kafka中,Group ID用來標識一個Consumer Group,是實現分區分配的重要依據。
二、Group ID命名規範
為了保證Kafka Consumer Group的穩定和可維護性,Group ID需要遵守一些命名規範:
1、長度不超過255個字符;
2、只能包含ASCII字符集中的字母、數字和”.”、”-“、”_”;
3、不能以”.”、”-“、”_”開頭;
4、同一個Kafka集群中的Group ID不能重複。
示例代碼:
String groupID = "test-group-1";
三、Group ID與分區分配的關係
在Kafka中,消息分區是Kafka提供的一個高並發、高吞吐量的特性。Kafka通過Partition將Topic中的消息分散到不同的Broker上,每個Broker上可以存放一個或多個Partition的數據。
當一個Consumer Group中的Consumer實例訂閱Topic時,Kafka會將這個Group中的所有Consumer實例平均分配到所有Partition上,以實現負載均衡的效果。因此,在Consumer Group中必須保證Group ID的唯一性,以確保Kafka能夠正確分配Partition。
示例代碼:
// 訂閱test-topic
consumer.subscribe(Arrays.asList("test-topic"));
// 讀取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Partition: " + record.partition() + ", Offset: " + record.offset() +
", Value: " + record.value() + ", Group ID: " + groupID);
}
}
四、Group ID與消費方式的關係
Kafka支持兩種消費方式:拉取式消費和推送式消費。對於拉取式消費,不同Consumer Group之間的消息是獨立的,即不同Group ID之間的消費互不影響;對於推送式消費,Kafka不支持同一個Topic同時推送到多個Group中。
因此,在選擇消費方式時,需要考慮Group ID的選擇與管理。
示例代碼:
// 拉取式消費示例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
String groupID = "test-group-1";
consumer.subscribe(Collections.singleton(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("GroupID: " + groupID + ", Partition: " + record.partition() +
", Offset: " + record.offset() + ", Value: " + record.value());
}
}
// 推送式消費示例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "test-topic";
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);
producer.close();
KafkaAdminClient adminClient = KafkaAdminClient.create(props);
Short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
adminClient.createTopics(Collections.singleton(newTopic));
五、Group ID的動態變化
在Kafka中,Group ID的動態變化是常見的場景之一,如Group ID重新分配、Group ID尺寸的擴展等。
Group ID的重新分配可以通過在Consumer實例中重新設置Group ID的方式實現,而Group ID的擴展則需要考慮Kafka消費的並發性和數據一致性。
示例代碼:
// 動態變化Group ID示例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
String groupID = "test-group-1";
consumer.subscribe(Collections.singleton(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("GroupID: " + groupID + ", Partition: " + record.partition() +
", Offset: " + record.offset() + ", Value: " + record.value());
}
}
// 動態變化Group ID示例
consumer.unsubscribe();
String newGroupID = "test-group-2";
consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned partitions: " + partitions);
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked partitions: " + partitions);
}
});
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/186185.html
微信掃一掃
支付寶掃一掃