1. 环境说明


2. 流程描述


3. 原因分析

Writer.WriteMessages() 方法的注释如下:

WriteMessages 将一批消息写到该 Writer 上配置的 topic。

除非将该 writer 配置为异步地写消息,否则该方法将阻塞,直到所有消息都已写入,或直到达到最大尝试次数。

同步地发送消息,并且 writer 的批处理大小(batch size)被配置为大于 1 的值(默认为 100)时,该方法将阻塞,直到可以组装成完成的批次,或达到批处理超时(batch timeout,默认为 1 秒)。按照分区计算批处理大小和超时,所以均衡器(Balancer)的选择也会影响刷新行为。比如,哈希均衡器需要平均 N * 批处理大小条消息来触发刷新,其中 N 是分区数量。实现良好批处理行为的最佳方法是在多个 go routine 之间共享 Writer。

当该方法返回错误时,错误类型可能是 kafka.WriteError,它使调用者可以确定每条消息的状态。

作为第一个参数传递的 context 用于异步地取消该操作。注意,在这种情况下,无法保证是否已将消息写入 Kafka。程序应该假设整批都已失败,然后重新写入消息(可能导致重复)。

综上可知,出现发送 1 条消息耗时 1 秒的情况的原因是:在同步地发送 1 条消息后,Writer 等待消息凑成一整批(默认 100 条),但是因为后续没有消息写入,因此它等待到超时(默认 1 秒)后,将消息写入到 Kafka,然后才返回。解决方法是:在该场景下,可将 BatchSize 设置为 1,需要注意的是,这样做会降低写入性能。