@Alpacadh
2022-09-18T17:18:02.000000Z
字数 1027
阅读 232
kafka
无论消息是否被消费,Kafka 都会保留所有消息。有两种策略可以删除旧数据:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
1)Controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 Topic 被创建时,则 Controller 会通过 watch 得到该 Topic 的 Partition 和 Replica 的分配信息。
2)Controller 从 /brokers/ids 读取当前所有可用的 Broker 列表,对于 set_p 中的每一个 Partition:
2.1)从分配给该 Partition 的所有 Replica(称为 AR)中任选一个可用的 Broker 作为新的 Leader,并将 AR 设置为新的 ISR;
2.2)将新的 Leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state。
3)Controller 通过 RPC 向相关的 Broker 发送 LeaderAndISRRequest。
相比如创建,删除就较为简单很多:
1)Controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 Topic 被删除,则 Controller 会通过 watch 得到该 Topic 的 Partition 和 Replica 的分配信息。
2)若 delete.topic.enable=false 直接结束;否则 Controller 注册在 /admin/delete_topics 上的 watch 被 fire,Controller 通过回调向对应的 Broker 发送 StopReplicaRequest。