x
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"strings"
"time"
)
func main() {
bootstrapServers := "127.0.0.1:19092,127.0.0.1:19093"
topic := "test"
writer := &kafka.Writer{
Addr: kafka.TCP(strings.Split(bootstrapServers, ",")...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
// Writer 的默认 BatchSize 是 100,BatchTimeout 是 1 秒,默认模式是同步(Synchronous)模式。
for ind := 0; ind <= 100; ind++ {
kafkaMessage := kafka.Message{Value: []byte(fmt.Sprintf("message %d", ind))}
startTime := time.Now()
// 同步地发送完一条消息之后,才发送下一条消息
if err := writer.WriteMessages(context.Background(), kafkaMessage); err != nil {
panic(err)
}
// !!!每发送一条消息耗时约 1000ms!!!
fmt.Printf("time elapsd %dms\n", time.Now().Sub(startTime).Milliseconds())
}
}
Writer.WriteMessages()
方法的注释如下:
x
// WriteMessages writes a batch of messages to the kafka topic configured on this
// writer.
//
// Unless the writer was configured to write messages asynchronously, the method
// blocks until all messages have been written, or until the maximum number of
// attempts was reached.
//
// When sending synchronously and the writer's batch size is configured to be
// greater than 1, this method blocks until either a full batch can be assembled
// or the batch timeout is reached. The batch size and timeouts are evaluated
// per partition, so the choice of Balancer can also influence the flushing
// behavior. For example, the Hash balancer will require on average N * batch
// size messages to trigger a flush where N is the number of partitions. The
// best way to achieve good batching behavior is to share one Writer amongst
// multiple go routines.
//
// When the method returns an error, it may be of type kafka.WriteError to allow
// the caller to determine the status of each message.
//
// The context passed as first argument may also be used to asynchronously
// cancel the operation. Note that in this case there are no guarantees made on
// whether messages were written to kafka. The program should assume that the
// whole batch failed and re-write the messages later (which could then cause
// duplicates).
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
...
}
WriteMessages
将一批消息写到该 Writer 上配置的 topic。
除非将该 writer 配置为异步地写消息,否则该方法将阻塞,直到所有消息都已写入,或直到达到最大尝试次数。
当同步地发送消息,并且 writer 的批处理大小(batch size)被配置为大于 1 的值(默认为 100)时,该方法将阻塞,直到可以组装成完成的批次,或达到批处理超时(batch timeout,默认为 1 秒)。按照分区计算批处理大小和超时,所以均衡器(Balancer)的选择也会影响刷新行为。比如,哈希均衡器需要平均 N * 批处理大小条消息来触发刷新,其中 N 是分区数量。实现良好批处理行为的最佳方法是在多个 go routine 之间共享 Writer。
当该方法返回错误时,错误类型可能是 kafka.WriteError
,它使调用者可以确定每条消息的状态。
作为第一个参数传递的 context 用于异步地取消该操作。注意,在这种情况下,无法保证是否已将消息写入 Kafka。程序应该假设整批都已失败,然后重新写入消息(可能导致重复)。
综上可知,出现发送 1 条消息耗时 1 秒的情况的原因是:在同步地发送 1 条消息后,Writer 等待消息凑成一整批(默认 100 条),但是因为后续没有消息写入,因此它等待到超时(默认 1 秒)后,将消息写入到 Kafka,然后才返回。解决方法是:在该场景下,可将 BatchSize 设置为 1,需要注意的是,这样做会降低写入性能。