四、Kafka 的 Java 客户端

4.1、生产者

1、引入依赖

kafka-clients 的版本尽量与 Linux 上安装的版本一致

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

2、基本实现

  • 创建一个静态常量,用于指定主题名称
1
2
3
4
/**
* 主题名称
*/
public static final String TOPIC_NAME = "my-replicated-topic";
  • 定义一个 Order 类
1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
private int orderId;
private String productName;
}
  • 创建一个 Properties 对象,在其中设置配置属性的值
1
2
3
4
5
6
7
8
9
//1 创建一个 Properties 对象,设置属性值
Properties properties = new Properties();
//2 设置 Kafka Server 的 IP 地址和端口号
// 如果是 Kafka 集群,那么多个 IP:port 之间使用 "," 分开
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.78.198.32:9092");
//3 将待发送的 key 从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//4 把发送消息 value 从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • 创建一个消息生产者,需要传入上面定义的 Properties 对象
1
2
//5 创建一个 Producer 对象,传入上面定义的 Properties 对象
Producer<String, String> producer = new KafkaProducer<>(properties);
  • 定义一条消息对象

在发送消息时,可以不传入 key ,发出的消息是 order 对象的字符串形式,而key 的作用是用来告诉 Kafka 往哪个分区上发送消息,具体发送分区的计算公式是:hash(key) % 分区数目

1
2
3
4
5
6
//6 创建一个 Order 对象
Order order = new Order(1, "商品名...");
//7 构造一个 ProducerRecord 对象,这个对象就是我们要发送的消息
// key 的作用是用来告诉 Kafka 往哪个分区上发送消息
// 具体发送分区的计算公式是:hash(key) % 分区数目
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, order.getOrderId() + "", JSON.toJSONString(order));
  • 使用同步阻塞的方式发送消息
1
2
3
//8 使用 Producer 的 send 方法发送数据
// 等待消息发送成功的同步阻塞方法
RecordMetadata recordMetadata = producer.send(record).get();
  • 打印结果
1
System.out.println("同步方式发送消息结果: topic-" + recordMetadata.topic() + "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset());

全部代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 主题名称
*/
public static final String TOPIC_NAME = "my-replicated-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
//1 创建一个 Properties 对象,设置属性值
Properties properties = new Properties();
//2 设置 Kafka Server 的 IP 地址和端口号
// 如果是 Kafka 集群,那么多个 IP:port 之间使用 "," 分开
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.78.198.32:9092");
//3 将待发送的 key 从字符串序列化为字节数组
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//4 把发送消息 value 从字符串序列化为字节数组
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//5 创建一个 Producer 对象,传入上面定义的 Properties 对象
Producer<String, String> producer = new KafkaProducer<>(properties);
//6 创建一个 Order 对象
Order order = new Order(1, "商品名...");
//7 构造一个 ProducerRecord 对象,这个对象就是我们要发送的消息
// key 的作用是用来告诉 Kafka 往哪个分区上发送消息
// 具体发送分区的计算公式是:hash(key) % 分区数目
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, order.getOrderId() + "", JSON.toJSONString(order));
//8 使用 Producer 的 send 方法发送数据
// 等待消息发送成功的同步阻塞方法
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("同步方式发送消息结果: topic-" + recordMetadata.topic() + "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset());
}

3、发送消息到指定分区上

在创建 ProducerRecord 对象时,往构造方法中传入要发送的分区上

1
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0,order.getOrderId() + "", JSON.toJSONString(order));

4、同步发送

⽣产者同步发消息,在收到 kafka 的 ack 告知发送成功之前⼀直处于阻塞状态

1
2
3
4
//8 使用 Producer 的 send 方法发送数据
// 等待消息发送成功的同步阻塞方法
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("同步方式发送消息结果: topic-" + recordMetadata.topic() + "|partition-" + recordMetadata.partition() + "|offset-" + recordMetadata.offset());

image-20211010222633539

5、异步发消息

⽣产者发消息,发送完后不⽤等待 broker 给回复,直接执⾏下⾯的业务逻辑。

可以提供 callback ,让 broker 异步的调⽤ callback,告知⽣产者,消息发送的结果

1
2
3
4
5
6
7
8
9
10
11
12
//9 异步调用
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("消息发送失败:" + e.getMessage());
}
if (recordMetadata != null) {
System.out.println("异步⽅式发送消息结果:"+"topic-"+ recordMetadata.topic()+"|partition-"+recordMetadata.partition()+"|offset-"+recordMetadata.offset());
}
}
});

6、关于生产者的 ACK 参数配置

在同步发消息的场景下:⽣产者发动 broker 上后,ack 会有3种不同的选择:

  • acks = 0

表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下⼀条消息。性能最⾼,但是最容易丢消息。

  • acks = 1(默认)

⾄少要等待 leader 已经成功将数据写⼊本地 log,但是不需要等待所有 follower 是否成功写⼊。就可以继续发送下⼀条消息。

这种情况下,如果 follower 没有成功备份数据,⽽此时 leader ⼜挂掉,则消息会丢失

  • acks = -1all

需要等待 min.insync.replicas (默认为1,推荐配置⼤于等于2)这个参数配置的副本个数都成功写⼊⽇志,这种策略会保证只要有⼀个备份存活就不会丢失数据

  1. 这是最强的数据保证
  2. ⼀般除⾮是⾦融级别,或跟钱打交道的场景才会使⽤这种配置。
  • 如何设置?

在 Properties 对象中指定配置的值

1
properties.put(ProducerConfig.ACKS_CONFIG,"1");

9、其他细节

  • 发送会默认会重试 3 次,每次间隔 100ms
  • 发送的消息会先进⼊到本地缓冲区(32mb),kakfa 会跑⼀个线程,该线程去缓冲区中
    取 16k 的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送⼀次

发送消息时不是一条消息建立一次连接、发送一次,而是先存在一个缓冲区中,然后另起一个线程去发送消息(一次发送 16k )

image-20211010234216403

  • 这两个配置可以在生产者中配置
1
2
3
4
//配置生产者本地缓冲区大小,33554432 即 32 M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//配置生产者批量发送消息的大小,默认是 16 k,即 16384
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

4.2、消费者

1、基本实现

  • 创建两个常量,用于指定主题名和消费组名
1
2
3
4
5
6
7
8
/**
* 主题名称
*/
public static final String TOPIC_NAME = "my-replicated-topic";
/**
* 消费者组名
*/
public static final String CONSUMER_GROUP_NAME = "testGroup";
  • 创建一个 Properties 对象,设置配置值
1
2
3
4
5
6
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 配置键和值的序列号器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  • 创建一个 KafkaConsumer 消费者对象,传入上面定义的 Properties 对象
1
2
// 创建一个消费者对象,传入 Properties 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  • 令消费者对象订阅主题列表

一个消费者其实可以消费多个主题

1
2
// 消费者对象订阅主题列表
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
  • 进行消息拉取

使用消费者的 poll() 方法对消息进行长轮询

1
2
3
4
5
6
7
8
9
10
11
while (true) {
/**
* poll() API 是拉取消息的长轮询
* 在 1000 ms 内拉取过来的消息都会放在 ConsumerRecords 列表中
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
  • 所有代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 主题名称
*/
public static final String TOPIC_NAME = "my-replicated-topic";
/**
* 消费者组名
*/
public static final String CONSUMER_GROUP_NAME = "testGroup";


public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 配置键和值的序列号器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建一个消费者对象,传入 Properties 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 消费者对象订阅主题列表
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

while (true) {
/**
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}


}

2、Kafka 消费者 offset 的自动提交和手动提交

在消费者对消息进行消费时,需要向 Kafka 提交 offset ,有自动提交和手动提交两种模式

无论是自动提交还是手动提交,都需要把所属的消费组 + 消费的某个主题 + 消费的某个分区及消费的偏移量,这样的信息提交到集群的 _comsumer_offsets 主题中

image-20211011103251825

  • offset 的自动提交(默认)
1
2
3
4
// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// ⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

消费者 poll 到消息后,会自动向 Broker 的 _comsumer_offsets 主题提交当前主题 - 分区消费的偏移量。

自动提交可能导致消息丢失,因为消费者可能一次性 poll 下来多条消息,消费者还没消费 poll 下来的消息就自动提交了偏移量,如果消费者在没有完全消费 poll 下来的消息前就挂了,那么那一部分已经 poll 下来但还没有消费完的消息就会丢失,下一个消费者会从已提交的 offset 的下一个位置开始消费消息。

  • offset 的手动提交

在消费消息时 / 后再提交 offset

  1. 设置手动提交参数
1
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  1. 在消费完消息后进行手动提交(手动提交 offset 又分为手动同步提交和手动异步提交)

手动同步提交

1
2
3
4
5
if (records.count > 0) {
// 手动同步提交 offset ,当前线程会阻塞直到 offset 提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();
}

手动异步提交,定义一个回调方法, Kafka 集群会调用这个方法通知手动提交是否成功

1
2
3
4
5
6
7
8
9
if (records.count() > 0) {
// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
});
}

3、消费者 poll 消息的过程

  • 消费者建立与 Broker 之间的联系,开始 poll 消息
  • 默认一次性 poll 500条消息

可以根据消费速度的快慢来设置,因为如果两次 poll 的时间如果超出了 30s 的时间间隔,kafka 会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者

1
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 设置消费者两次 poll 之间的时间间隔

消费者在被剔除消费组后,会触发 rebalance 机制, rebalance 机制会造成性能开销

1
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
  • 代码中配置了长轮询 poll 的时间,如果每隔 1s 内没有 poll 到任何消息,则继续去 poll 消息,循环往复,直到 poll 到消息。如果超出了 1s,则此次⻓轮询结束
1
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  1. 如果一次 poll 到的消息的条数为配置中 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 配置的条数,那么直接执行下面的逻辑
  2. 如果一次 poll 到的消息没有达到配置中 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 配置的条数,且时间在 poll 方法配置的时间 Duration.ofMillis(1000) 内,那么长轮询会继续 poll ,直到 poll 到的消息条数达到配置的条数或者时间达到配置时间。
  3. 如果多次 poll 都没有达到配置中 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 配置的条数,且时间已经超过 poll 方法配置的时间 Duration.ofMillis(1000) ,那么直接执行下面的逻辑
  • 消费者给 Kafka 发送心跳的时间间隔

Kafka 使用 Zookeeper 来维持心跳

1
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  • Kafka 如果超过 10 秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏ rebalance,把分区分配给其他消费者。
1
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

4、消费者指定主题的分区进行消费

  • 可以通过消费者对象的 assign 方法来指定分区

下面的代码中,为当前消费者指定消费 my-replicated-topic 主题中的第 0 个分区。

1
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));

5、消息的回溯消费

Kafka 支持两种回溯方式回溯消息,一种是基于消息偏移量 offset 回溯,一种是基于时间点回溯,还有一种方式是从头消费

  • 从头消费
1
2
3
4
// 指定消费该主题的第一个分区
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
// 从头开始消费
consumer.seekToBeginning(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
  • 基于消息偏移量(offset)回溯

在 Kafka 的每个分区中,每条消息都有唯一的一个 offset 值,这个值用于表示这条消息在该分区中的位置,在消费者消费完这条消息后,会将这条消息的 offset 提交到 Broker ,下条消息从这个位置后开始消费。

基于消息偏移量回溯很简单,只需要重置 offset ,然后消费者会从该 offset 之后开始消费

1
2
3
4
// 指定消费该主题的第一个分区
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)));
// 指定 offset 消费
consumer.seek(Collections.singletonList(new TopicPartition(TOPIC_NAME, 0)),10);
  • 基于时间点回溯

我们先了解一下 Kafka 存储消息的文件格式,Kafka 存储消息是以日志的形式存储的,每个分区对应一个日志,但日志不是一个文件,而是由多个文件组成的。

日志文件都存储在一个文件夹中,文件夹格式为 topic-0。

  1. topic 为 kafka 对应的主题名称,0 是分区所在的分区号。
  2. 文件夹中存储着 4 个日志,分别为日志分段文件偏移量索引文件时间戳索引文件其他文件

那么是怎么通过时间戳回溯的呢?

  1. 将要回溯的时间换算为时间戳 timestamp (yyyy-MM-dd HH:mm:ss –> long)

  2. 根据时间戳 timestamp 在时间戳索引文件中找到不大于这个时间戳 timestamp 的最大偏移量

  3. 找到偏移量后,找到对应消息的位置 position (查找过程后面补充)

  4. 通过 position 定位消息,获取该消息的生成时间,将这个时间与 timestamp 进行对比,然后按顺序逐渐和后面的消息进行比对,如果前一个消息的生成时间小于 timestamp 且后一个消息的生成时间大于 timestamp ,那么当前这个位置就是要回溯的点,获取消息的 offset ,对消费者消费记录的 offset 进行重置,回溯完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1⼩时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) {
continue;
}
long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费⾥的timestamp确定offset
if (value != null) {
consumer.assign(Collections.singletonList(key));
consumer.seek(key, offset);
}
}

6、指定新消费组的消费偏移量

当消费主题的是⼀个新的消费组,或者指定 offset 的消费⽅式,offset 不存在,那么应该如何消费?

  • latest(默认)

只消费自己启动后发送到主题的消息

  • earliest

第一次从头开始消费,以后按照消费 offset 的记录继续消费,这个需要区别于 consumer.seekToBeginning (每次都从头开始消费)

1
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

五、Spring Boot 使用 Kafka

5.1、前期准备

  • 创建一个 Spring Boot 项目,引入相关依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.3.3.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
  • 创建一个实体类
1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
private String productId;
private String productName;
}

5.2、创建配置文件

一般来说,一个应用最多只扮演生产者 / 消费者之中的一个角色,这个时候只需要添加对应角色的配置即可。

1、生产者相关配置

  • 重试次数(retries)

设置一个大于 0 的值,默认为 3 ,即生产者在发送失败时会默认重发三次

  • 批量发给 Kafka 的数据大小(batch-size)
  • 生产者本地缓冲区(buffer-memory)

这两个配置联系十分紧密,生产者不是每接收到一条消息就直接发给 Kafka ,而是先将消息存储在本地的一个缓冲区中,这个缓冲区的大小为 buffer-memory ,同时在后台另起一个线程,让这个线程每次从这个缓冲区中拿出 batch-size 大小的数据发送给 Kafka 。

  • 生产者发送消息的确认模式(acks)

默认为 1 ,⾄少要等待 leader 已经成功将数据写⼊本地 log,但是不需要等待所有 follower 是否成功写⼊,就可以继续发送下⼀条消息。

效率与可靠性折中的一种选择

  • 键和消息记录的编解码方式(key-serializer / value-serializer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
kafka:
# 生产者
producer:
# 设置一个大于 0 的值,则客户端会将发送失败的记录重新发送
retries: 3
# 生产者一次从本地缓冲区中取出 batch-size 大小的数据发送给 Kafka
batch-size: 16384
# 设置 acks 为 1, ⾄少要等待 leader 已经成功将数据写⼊本地 log,但是不需要等待所有 follower 是否成功写⼊。就可以继续发送下⼀条消息。
acks: 1
# 指定消息 key 与消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者本地缓冲区,默认为 32m
buffer-memory: 33554432

2、消费者相关配置

  • 消费者的消费组 id (group-id)

  • 是否启用自动提交(enable-auto-commit)

  • 新消费组的消费偏移量选取策略(auto-offset-reset)

  • 一次长轮询中从 Broker 中拉取的消息条数的最大值(max-poll-records)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8080
spring:
kafka:
consumer:
# 指定消费组 id
group-id: default-group
# 关闭自动提交
enable-auto-commit: false
# 当出现一个新的消费组或者需要传入 offset 但 offset 为空时,指定其从头开始消费
# 但不是每次都是从头开始消费
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者一次从 Broker 最多拉取 500 条消息
max-poll-records: 500

3、额外配置

  • Kafka-Server 的地址(bootstrap-servers)

如果是 Kafka 集群,那么在多个 IP:port 之间使用 , 隔开

  • Kafka 消费者的消费模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 8080
spring:
kafka:
# kafka-server 的地址,如果是集群,那么 ip:port 之间使用 "," 隔开
bootstrap-servers: 120.78.198.32:9092
listener:
# 当每一条消息被消费者监听器(ListenerConsumer)处理后才提交(record)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交(batch)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于 TIME 时提交(time)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,被处理 record 数量⼤于等于COUNT时提交(count)
# TIME | COUNT 有⼀个条件满⾜时提交(count_time)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤ Acknowledgment.acknowledge() 后提交(manual)
# ⼿动调⽤ Acknowledgment.acknowledge() 后⽴即提交,⼀般使⽤这种(manual_immediate)
ack-mode: manual_immediate

4、完整配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
server:
port: 8080
spring:
kafka:
# kafka-server 的地址,如果是集群,那么 ip:port 之间使用 "," 隔开
bootstrap-servers: 120.78.198.32:9092
# 生产者
producer:
# 设置一个大于 0 的值,则客户端会将发送失败的记录重新发送
retries: 3
# 生产者一次从本地缓冲区中取出 batch-size 大小的数据发送给 Kafka
batch-size: 16384
# 设置 acks 为 1, ⾄少要等待 leader 已经成功将数据写⼊本地 log,但是不需要等待所有 follower 是否成功写⼊。就可以继续发送下⼀条消息。
acks: 1
# 指定消息 key 与消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 生产者本地缓冲区,默认为 32m
buffer-memory: 33554432
consumer:
# 指定消费组 id
group-id: default-group
# 关闭自动提交
enable-auto-commit: false
# 当出现一个新的消费组或者需要传入 offset 但 offset 为空时,指定其从头开始消费
# 但不是每次都是从头开始消费
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者一次从 Broker 最多拉取 500 条消息
max-poll-records: 500
listener:
# 当每一条消息被消费者监听器(ListenerConsumer)处理后才提交(record)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交(batch)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于 TIME 时提交(time)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,被处理 record 数量⼤于等于COUNT时提交(count)
# TIME | COUNT 有⼀个条件满⾜时提交(count_time)
# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤ Acknowledgment.acknowledge() 后提交(manual)
# ⼿动调⽤ Acknowledgment.acknowledge() 后⽴即提交,⼀般使⽤这种(manual_immediate)
ack-mode: manual_immediate

5、Kafka 消费模式介绍

Kafka 支持多种消费模式,消费模式在 org.springframework.kafka.listener.ContainerProperties.AckMode 枚举中,这是一个静态内部枚举,我们可以查看这个类中的详细信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* The offset commit behavior enumeration.
*/
public enum AckMode {

/**
* Commit after each record is processed by the listener.
* 当每一条消息被消费者监听器(ListenerConsumer)处理后才提交(record)
*/
RECORD,

/**
* Commit whatever has already been processed before the next poll.
* 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交(batch)
*/
BATCH,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
* 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于 TIME 时提交(time)
*/
TIME,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
* 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后,被处理 record 数量⼤于等于COUNT时提交(count)
*/
COUNT,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
* TIME | COUNT 有⼀个条件满⾜时提交(count_time)
*/
COUNT_TIME,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
* 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤ Acknowledgment.acknowledge() 后提交(manual)
*/
MANUAL,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
* ⼿动调⽤ Acknowledgment.acknowledge() 后⽴即提交,⼀般使⽤这种(manual_immediate)
*/
MANUAL_IMMEDIATE,

}

MANUALMANUAL_IMMEDIATE 的区别

  • MANUAL:只有当 poll 下来的消息全部处理完后,再调用 Acknowledgment.acknowledge() 进行批量提交
  • MANUAL_IMMEDIATE:当调用 Acknowledgment.acknowledge() 后立即提交,处理一条提交一条

5.3、消息生产者

  • 指定要发送的分区
1
public static final String TOPIC_NAME = "my-replicated-topic";
  • 在生产者中注入 KafkaTemplate 对象
1
2
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
  • 创建消息,并调用 KafkaTemplate 对象的 send 方法发送消息

  • 全部代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@RequestMapping("message")
public class KafkaProducerController {
public static final String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("send")
public String send() {
Product product = new Product("100", "测试商品-100");
// 在这里进行消息发送,指定主题、分区、key和消息记录
kafkaTemplate.send(TOPIC_NAME, 0, product.getProductId(), JSON.toJSONString(product));
return "send success!";
}
}

5.4、消息消费者

  • 创建一个 Consumer 类,需要将这个类交给 Spring 容器管理,在类上添加 @Component 注解
1
2
3
4
@Component
public class KafkaConsumer {
...
}
  • 创建一个方法,在方法上添加 @KafkaListener 注解,在注解中指定监听的主题和消费组

如果在处理消息后没有进行提交,那么这条消息会被重复消费,当自动提交关闭时,需要我们使用 ack.acknowledge(); 进行手动提交

1
2
3
4
5
6
7
8
@KafkaListener(topics = KafkaProducerController.TOPIC_NAME, groupId = GROUP_ID)
public void listenGroup (ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交 offset
ack.acknowledge();
}
  • 全部代码

实际上拿到的是多条记录,但每次只对一条记录进行处理

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class KafkaConsumer {
public static final String GROUP_ID = "MyGroup";
@KafkaListener(topics = KafkaProducerController.TOPIC_NAME, groupId = GROUP_ID)
public void listenGroup (ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交 offset
ack.acknowledge();
}
}

也可以对多条记录进行处理

1
2
3
4
5
6
@KafkaListener(topics = KafkaProducerController.TOPIC_NAME, groupId = GROUP_ID)
public void listenGroups (ConsumerRecords<String, String> records, Acknowledgment ack) {
// 处理逻辑
// 手动提交 offset
ack.acknowledge();
}

5.5、测试

启动项目,访问 http://localhost:8080/message/send 接口,查看结果

image-20211011162449160

控制台中输出结果如下,可以看到消费者收到了生产者发送的消息

1
2
{"productId":"100","productName":"测试商品-100"}
ConsumerRecord(topic = my-replicated-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1633940677717, serialized key size = 3, serialized value size = 52, headers = RecordHeaders(headers = [], isReadOnly = false), key = 100, value = {"productId":"100","productName":"测试商品-100"})

5.6、消费者的详细配置

设置消费组、多 Topic 、指定分区、指定偏移量和设置消费者个数

  • concurrency 就是同组下的消费者的个数,就是并发消费数,建议小于等于分组总数

启动时, Kafka 会帮你创建出 concurrency 个这样的消费者。

  • topicPartitions 由多个 topicPartition 组成

topicPartition 中指定了这个消费者要消费的主题和主题中哪几个分区

  1. 对于 @TopicPartition(topic = "topic1", partitions = {"0", "1"}) ,我们希望这个消费者消费主题 topic1 的第一个和第二个分区
  2. 对于 @TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")),我们希望这个消费者消费第一个分区和第二个分区,同时在消费第二个分区时,初始偏移量为 100
1
2
3
4
5
6
7
8
9
10
11
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "3")
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 手动提交 offset
ack.acknowledge();
}

六、Kafka 及集群 Controller 、 Rebalance 和 HW

6.1、Controller

回到这张图,在下面的 Kafka 集群中,我们创建了两个分区、三个副本

image-20211011171119093

在众多副本中,存在一个副本会作为 leader ,这个副本负责数据的读写与同步,而其他的副本作为其的 follower 存在,在 leader 挂掉后,新 leader 会从处于 Isr 列表中的剩下 follower 之中选举产生。

controller 就负责这个新 follower 的选举和产生。

1、概念

Kafka 集群中的 Broker 会在 ZK 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为 Kafka 集群中的 controller ,负责管理整个集群中的所有分区和所有副本的状态。

2、职责

  • 当某个分区的 Leader 副本发生故障时,由 controller 负责为该分区选举产生新的 Leader 副本
  • 当检测到某个分区的 ISR 集合发生变化时,由 controller 负责通知所有的 broker 更新其元数据信息
  • 当使⽤ kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由 controller 负责让新分区被其他节点感知到

6.2、Rebalance 机制

1、触发前提

消费者没有指明分区消费

2、是什么?

重平衡是其实是一个协议,它规定了如何让消费者组下的所有消费者来分配 Topic 中的每一个分区

比如说一个 Topic 中有 100 个分区,一个消费组中有 20 个消费者,在协调者(Group Coordinator)下的控制下让组内的每一个消费者分配到 5 个分区,这个分配的过程就是重平衡。

当消费组中消费者和分区的关系发生变化时,那么就会触发 rebalance 机制。

3、触发条件

重平衡的触发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者

注意:这里的减少很大可能是被动的,即消费者崩溃退出,这种情况是最常见的情况

  • 主题的分区数发生变化,Kafka 目前只支持增加分区,当增加分区时会触发重平衡
  • 订阅的主题数发生变化

当消费者组使用正则表达式订阅主题时,此时可能新建的主题符合正则规则,那么又会触发重平衡。

4、 Rebalance 机制带来的问题

在 Rebalance 过程中,消费者无法从 Kafka 中消费消息,这对 Kafka 的 TPS 影响极大,且 Rebalance 的时间会随着 Kafka 集群节点的增多而变长,在 Rebalance 时间内 Kafka 几乎处于不可用状态。

所以在实际环境中应该尽量避免 Rebalance 的发生。

5、消费者消费分区的三种策略

在触发 Rebalance 机制之前,消费者消费哪个分区有三种策略

  • range : 通过公式来计算某个消费者消费哪个分区

  • 轮询 :消费组中的消费者轮流消费

  • sticky : 在触发 rebalance 后,在消费者消费的原分区不变的基础上进行调整

如果这个策略没有开启,那么就要进行所有分区的重新分配,建议开启。

6.3、HW 与 LEO

1、基本概念

  • Base Offset

起始偏移量,即副本中第一条消息的偏移量,如下图,这里的起始位移是 0,如果一个日志文件写满 1G 后(默认 1G 后会 log rolling),这个起始位移就不是 0 开始了。

  • HW(high watermark

副本的高水位值,replica 中 Leader 副本和 Follower 副本都会有这个值,通过它可以得知副本中已提交或者已备份消息的范围, Leader 副本中的 HW ,决定了消费者能消费的最新消息能到哪个 offset 。

如下图所示,HW 值为 8 ,代表 offset 为 [0,8] 的 9 条消息都能被消费到,它们是对消费者可见的,而 [9,12] 这四条消息由于未提交,所以对消费者不可见。

  • LEO (log end offset

日志末端位移,代表日志文件中下一条待写入消息的 offset ,这个 offset 实际上是没有消息的

不管是 Leader 副本还是 Follower 副本都有 LEO ,当 Leader 副本收到生产者的一条消息时,LEO 通常会自增 1 ,而 Follower 副本需要从 Leader 副本同步到数据后,才能增加它的 LEO 。

最后 Leader 副本会比较自己的 LEO 和满足条件的 Follower 副本上的 LEO ,然后选取两者中较小值作为新的 HW ,来更新自己的 HW 值。

image-20211011194242472

2、为什么对于新写入的消息不能立即消费?

对于新写入到 Leader 副本的消息,不能直接交给 Consumer 进行消费,Leader 会等待该消息被所有 Isr 列表中的副本同步后,再更新 HW ,此时消息才能被 Consumer 进行消费。

这样就保证了如果 Leader 所在的 Broker 挂掉,该消息仍然可以从新选举的 Leader 中获取,可以有效地防止消息丢失。

七、Kafka 相关问题

7.1、如何防止消息丢失

1、生产者

  • acks 设置 1 或者 -1 / all 可以有效防止消息丢失
  • 如果要做到 99.9999% ,那么可以将 acks 设置为 all ,然后将 min.insync.replicas 设置为分区备份数

这样做,只有等待上一条消息被所有副本同步完后,才能发送下一条消息。

2、消费者

将自动提交改为手动提交

7.2、如何防止消息的重复消费

在生产环境中,可能出现以下情况

  1. 生产者向 Kafka 集群中投递消息,Kafka 已经接收到了这条消息,并返回给发送者一个 ack
  2. 此时因为网络动荡,导致生产者没有收到这个 ack ,所以生产者启动了重发机制,再次发送一条消息到 Kafka 中,这样就导致了 Kafka 收到了两条一模一样的消息

如果为了消息的不重复消费,⽽把⽣产端的重试机制关闭、消费端的⼿动提交改成⾃动提交,这样反⽽会出现消息丢失,那么可以直接在防治消息丢失的⼿段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题

  • 使用 MySQL 插入业务 id 作为主键,主键是唯一的,所以一次只能插入一条
  • 使用 Redis 或者 Zookeeper 的分布式锁,以业务 id 为锁,保证只有一条记录能够创建成功
  • 构建幂等表保证幂等性

7.3、如何做到顺序消费

1、生产者

在发送时不能将 ack 设置为 0 ,同时需要关闭重试,并使用同步发送,等到发送成功后再发送下一条消息,确保消息是顺序发送的。

2、消费者

消息发送到一个分区中,同时只能有一个消费组里的消费者来接收消息,因此,Kafka 的顺序消费会牺牲掉一部分性能。

7.4、如何解决消息积压问题

1、问题说明

消息的消费者的消费速度远远赶不上生产者的生产消息的速度,导致 Kafka 中有大量的数据没有被消费,随着没有被消费的数据堆积越来越多,消费者寻址的性能会越来越差,最后导致整个 Kafka 对外提供服务的性能越来越差,从而拖垮其他服务,造成服务雪崩

2、解决方案

消息积压会导致很多问题,⽐如磁盘被打满、⽣产端发消息导致 Kafka 性能过慢,就容易出现服务雪崩,就需要有相应的⼿段:

  • ⽅案⼀:在⼀个消费者中启动多个线程,让多个线程同时消费(提升⼀个消费者的消费能⼒),充分利用机器的性能。
  • ⽅案⼆:如果⽅案⼀还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同⼀服务器上也可以提⾼消费能⼒(充分利⽤服务器的 cpu 资源)。
  • ⽅案三:让⼀个消费者去把收到的消息往另外⼀个 topic 上发,另⼀个 topic 设置多个分区和多个消费者 ,进⾏具体的业务消费。

7.5、Kafka 为什么快?

Kafka 主要做了 4 个优化

  • 磁盘顺序读写
  • 页缓存
  • 零拷贝
  • 批量操作

1、磁盘顺序读写

Kafka 进行的第一个优化就是磁盘顺序读写,在传统认知中,磁盘的读取效率应该远低于内存,但 2019 年 ACM 机构的调研中表明,磁盘顺序读写的效率虽然远不及内存顺序读写的效率,但前者的效率是要略高于内存随机读写的效率的。

image-20211011211150388

上面的文章在一定程度上指导了 Kafka 后面的设计,Kafka 基于顺序读写实现高性能

image-20211011211432085

每一个分区对应一个日志,新消息到来后在日志后面进行顺序追加,而对于已经消费过的消息,Kafka 并不对其进行实时地数据删除,而是基于某些策略(存放时间、日志大小等)来对这些已经消费过的消息进行统一批量删除。

2、页缓存

虽然 Kafka 是基于 Scala 编写,必须依赖 JVM 运行的应用,但 Kafka 中的消息在处理投递过程中并不直接通过 JVM ,而是直接使用使用操作系统的页缓存特性来提高处理速度

这样做的好处有什么?

  • 避免了 JVM GC 带来的性能损耗
  • Kafka 采用字节紧密存储,避免产生对象,进一步提高了空间利用率

3、零拷贝

在进行数据传递过程中,不再发生内核态 - 用户态的转变,而是直接将页缓存中的数据拷贝到 Socket 缓冲区。

这里的零拷贝不是说不发生复制,而是说不再发生与用户态发生的数据拷贝

image-20211011212557504

4、批处理

  • Kafka 在生产者端设置有本地缓存,同时会采用线程异步将数据批量提交到 Broker 的方式提高性能
  • Kafka 提供了许多批处理的 API ,可以让我们对数据进行统一的压缩和合并,让数据通过更小的数据包来进行网络传输,提高了性能

7.6、Kafka 的幂等性

1、Kafka 生产者幂等性

在生产者向 Kafka 发送消息过程中,可能会出现这种情况,Kafka 集群已经接收到了这条消息,同时已经将其写入日志了,但在向生产者返回 ack 的过程中由于某些原因 ack 丢失导致生产者没有接收到 ack,此时生产者会启动重试机制,如果 Kafka 无法保证幂等性,那么可能在 partition 中会保存多条一模一样的消息。

image-20211012160455192

2、生产者配置幂等性

  • 只需要在生产者端添加一个配置即可
1
properties.put(ENABLE_IDEMPOTENCE_CONFIG, true)

3、Kafka 保证幂等性的原理

为实现生产者的幂等性, Kafka 引入了 Producer ID (PID)和 Sequence Number 两个概念。

  • PID:每个 Producer 在初始化时,都会分配一个唯一的 PID,这个 PID 对用户来说,是透明的
  • Sequence Number:针对每个生产者(对应 PID )发送到指定主题分区的消息都对应一个从 0 开始递增的 Sequence Number。

image-20211012161500813

  • 在生产者向 Broker 发送消息时,**会将自己的 PID 和此条消息的序列号一起发送给 Broker **
  • Broker 接收到消息后,将消息、 PID 和 序列号一起保存
  • Broker 向生产者返回一个发送成功的 ACK ,但这个 ACK 在返回过程中因为某个原因丢失了,没有送到生产者手上
  • 生产者收不到 ACK ,以为消息发送失败,所以启动重试机制,在发送的消息中同样带上自己的 PID 和消息的序列号(PID: 250, seq: 0)
  • Broker 接收到重发消息后,检查这条消息的 PID 和 seq ,发现这条消息的 seq <= 分区中该 PID 对应的 seq ,所以不进行保存
  • Broker 向生产者发送一个 ACK

7.7、Kafka 的数据存储形式

image-20211012164238789

  • 一个 Topic 由多个分区组成
  • 一个分区(partition)由多个 segment (段)组成
  • 一个 segment(段)由多个文件组成(log、index、timeindex

1、存储日志

我们先来看看 Kafka 中的数据是如何在磁盘中进行存储的,进入 Kafka 安装目录下的 data/kafka-logs 文件夹中,可以看到以下命名的文件夹,这些就是 Kafka 中的消息在磁盘中的存储位置,文件夹以 主题名 - 分区 ID 的格式命名,下面就是 Kafka 中 my-replicated-topic 主题下两个分区的物理存储目录。

image-20211012164657651

进入其中一个目录,查看目录下的文件,可以看到以下几个文件,我们对这些文件进行一一介绍

image-20211012164920515

文件名描述
00000000000000000000.index索引文件,根据 offset 查找数据时就是根据该索引文件来操作的
00000000000000000000.log日志数据文件,真正存放消息数据的文件
00000000000000000000.timeindex时间索引文件
00000000000000000001.snapshot快照文件
leader-epoch-checkpoint持久化每个 partition leader 的 LEO
  • segment 是几个文件名一致的文件的集合。在一个分区文件夹下 (my-replicated-topic-0 )可能可以看到前缀一样,但文件类型的几个文件,**这种前缀相同,但文件类型不同的几个文件组合起来,就称之为一个 segment **

假设说一个分区文件夹下,有 6 个这样的文件(简化它们的文件名),分别为(a.index、a.log、a.timeindex、b.index、b.log、b.timeindex),那么我们可以称这个分区中有两个 segment ,即 a 段(a.index、a.log、a.timeindex)和 b 段(b.index、b.log、b.timeindex)。

  • 每个 segment 文件的文件名固定是 20 位数字,通常为 0…….xx.abc ,文件名代表这个 segment 的起始偏移量,因为每个分区的起始偏移量都是 0 ,所以分区的日志文件固定从 0000000000000000000.log 开始

00000000000000000097.log 代表这个 segment 的第一条消息的偏移量为 97

  • 默认的每个日志文件最大为「log.segment.bytes =1024 * 1024 * 1024」1G

2、写入消息

  • 新的消息总是写入到最后一个 segment 的 log 文件中
  • 如果该文件到达了指定的大小(默认为 1 GB),那么将滚动到一个新的文件中。

3、读取消息

日志段 segment 的引入方便了 Kafka 数据的查询和定位,Kafka 使用二分查找来查询消息。

Kafka 的日志段又分为活跃日志段和非活跃日志段,一个分区只能存在一个活跃日志段,而只有活跃日志段才可以被写入和读取,非活跃日志段只能被读取

  • 使用 Offset 查找消息

偏移量索引文件由 4 字节的相对位移(offset)和 4 字节的物理地址(position)组成。

在我们使用 kafka-run-class.sh 来查看 .index 文件后,我们可以看到以下内容

1
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/kafka-2.5.1/data/kafka-logs/my-replicated-topic-0/00000000000000000000.index

可以看到每一行都是 offset: xxx position: yyyy ,其中 offset 为相对偏移量,而 position 为物理地址

第一行的 offset: 12 position: 4423 代表相对偏移量从 0 - 12 的消息的物理地址在 0 - 4423

image-20211012171900217

按上面的例子,如何找到 offset 为 60 的消息?

  1. 先根据 offset 找到对应的日志段,即 segment ,这里我们找到 00000000000000000000.index
  2. 通过二分法找到不大于 offset 的最大索引项,这里我们找到第二行(offset:24 position:8773)
  3. 到 00000000000000000000.log 文件中,从 position 为 8773 的位置开始顺序扫描,直到找到 offset 为 60 的消息
  • 使用 TimeStamp 进行消息查找

时间戳索引文件是由 8 字节的时间戳和 4 字节的相对偏移量组成,dump 下来的文件格式如下

image-20211012173318334

  1. 查找该时间戳应该在哪个 segment 中,将待查找的时间戳(timestamp)与各个日志分段中的最大时间戳(largestTimeStamp)逐一对比,直到找到不小于 timestamp 所对应的日志分段。
  2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的 position
  3. 到 .log 文件中,从 position 开始扫描,直到找到待查找数据

4、删除消息

  • 在消息被消费后,Kafka 不会实时删除已经被消费过的消息,而是定期清理,一次性删除一个 segment 的日志文件
  • Kafka 的日志管理器会根据 Kafka 的配置,来决定哪些文件可以被删除。

7.8、Kafka 中数据清理(Log Deletion)

Kafka消息存储在磁盘中,为了控制磁盘占用空间Kafka 需要不断地对过去的一些消息进行清理工作

Kafka 的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka 中,提供两种日志清理方式:

  • 日志删除

按照指定的策略直接删除不符合条件的日志

  • 日志压缩

按照消息的 Key 进行整合,如果遇到有相同 Key 但有不同 value 的消息,那么只保留最后一个版本

1、Kafka 数据清理配置

在 Kafka 的 Broker 和 Topic 配置中

配置项配置值说明
log.cleaner.enabletrue(默认)是否开启日志清理功能
log.cleanup.policydelete(默认)删除日志
log.cleanup.policycompaction压缩日志
log.cleanup.policydelete,compact同时支持压缩和删除

2、日志删除

日志删除是以段为单位进行定期清理

  • 定时日志删除任务

Kafka 日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过 broker 端参数 log.retention.check.interval.ms 来配置,默认值为 300,000 ,即5分钟。当前日志分段的保留策略有3种:

  1. 基于时间的保留策略
  2. 基于日志大小的保留策略
  3. 基于日志起始偏移量的保留策略

3、基于时间的保留策略

以下三种配置可以指定如果 Kafka 中的消息超过指定的阈值,就会将日志进行自动清理:

  • log.retention.hours

  • log.retention.minutes

  • log.retention.ms

其中,优先级 log.retention.ms > log.retention.minutes > log.retention.hours,默认情况下,在 broker 中配置为 log.retention.hours = 168,即日志默认保留 168 小时,相当于保留七天

  • 删除日志分段时
  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
  2. 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
  3. Kafka 的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来设置,默认值为60000,即1分钟。

4、基于日志大小的保留策略

  • 日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。
  • 可以通过 broker 端参数 log.retention.bytes 来配置,默认值为 -1,表示无穷大。如果超过该大小,会自动将超出部分删除。

注意:log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

5、基于日志起始偏移量保留策略

每个 segment 日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

6、日志压缩

Log Compaction 是默认的日志删除之外的清理过时数据的方式。它会将相同的 key 对应的数据只保留一个版本。

image-20211012191155887

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment
  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
  • 基于 Log Compaction 可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态