Kafka producer

前言

不管是把Kafka 作为消息队列、消息、总线还是数据存储平台来使用,总是需要有一个可以往Kafka 写入数据的生产者和一个可以从Kafka 读取数据的消费者,或者一个兼具两种角色的应用程序。

生产者概述

一个应用程序在很多情况下需要往Kafka 写入消息: 记录用户的活动(用于审计和分析)、记录度量指标、保存日志、消息、记录智能家电的信息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据,等等。

在保存网站的点击信息场景里,允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用户体验就行。换句话说,只要用户点击链接后可以马上加载页面,那么我们并不介意消息要在几秒钟之后才能到达Kafka 服务器。吞吐量则取决于网站用户使用网站的频度。
不同的使用场景对生产者API 的使用和配置会有直接的影响。尽管生产者API 使用起来很简单, 但消息的发送过程还是有点复杂。如图所示:

我们从创建一个ProducerRecord 对象开始, Producer Record 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前在ProducerRcord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据Producer Record 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker 上。服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

Kafka的配置属性

Kafka生产者有3个必选属性:bootstrap.servers,key.serializer,value.serializer。

  • bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port.清单里不需要包含所有的broker地址,生产者会从给定的broker 里查找到其他broker的信息。不过建议至少要提供两个broker 的信息, 一且其中一个宕机,生产者仍然能够连接到集群上。
  • key.serializer:broker :希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把java 对象作为键和值发送给broker 。这样的代码具有良好的可读性,不过生产者需要知道如何把这些java 对象转换成字节数组。key. serializer必须被设置为一
    个实现了org.apache.kafka.common.seialization.Serialize 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了ByteArraySeializer(这个只做很少的事情)、StringSerializer 和IntegerSerializer,因此,如果你只使用常见的几种java 对象类型,那么就没必要实现自己的序列化器。要注意, key.serializer 是必须设置的,就算你打算只发送值内容。
  • value.serializer:与key.serializer一样, value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。
  • acks
    acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。主参数有如下选项。
    • 如果acks=0 , 生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,
    如果当中出现了问题, 导致服务器没有收到消息,那么生产者就无从得知,消息也就丢
    失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大
    速度发送消息,从而达到很高的吞吐量。
    如果acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future对象的get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
    如果acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。
  • buffer.me mory
    该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send ()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer. full 参数(在0. 9.0.0 版本里被替换成了max.block.ms,表示在抛出异常之前可以阻塞一段时间)。
  • compression.type
    默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、gzip 或lz 4 ,它指定了消息被发送给broker 之前使用哪一种压缩算也进行压缩。snappy 压缩算法由Google发明,它占用较少的CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的CPU ,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储销,而这往往是向Kafka 发送消息的瓶颈所在。
  • retries
    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况
    下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待l00 ms ,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

  • batch.size
    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

  • linger.ms
    该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProduce 会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程, 生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms 设置成比0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了) 。
  • client.id
    该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
  • max.in.flight.requests.per.connection
    该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
  • timeout.ms 、request.timeout.ms 和metadata. fetch. timeout. ms
    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metada.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms指定了broker 等待同步副本返回消息确认的时间,与asks 的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误。
  • max.block.ms
    该参数指定了在调用send () 方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。
  • max.request.size
    该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB ,那么可以发送的单个最大消息为1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了1000 个消息,每个消息大小为1 KB。另外, broker 对可接收的消息最大值也有自己的限制(message.max.bytes ),所以两边的配置最好可以匹配,避免生产者发送的消息被broker 拒绝。
  • receive.buffer. bytes 和send . buffer.bytes
    这两个参数分别指定了TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为- 1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

下面的代码片段演示如何创建一个生产者,这里只指定了必要的属性,其他使用默认设置

1
2
3
4
5
Properties pro = new Properties();
pro.put("bootstrap.servers","broker1:9092,broker2:9092")
pro.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
pro.put("value.serialization.StringSerializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(pro);

实例完生产者对象后,接下来就可以开始发送消息了。发送消息有以下三种方式。

  • 发送并忘记( fire- and-forget )
    我们把消息发送给服务器,但井不关心它是否正常到达。大多数情况下,消息会正常到达,因为Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
  • 同步发送
    我们使用send () 方法发送消息, 它会返回一个Future 对象,调用get () 方法进行等待,就可以知道悄息是否发送成功。
  • 异步发送
    我们调用send () 方怯,并指定一个回调函数, 服务器在返回响应时调用该函数。在下面的几个例子中, 我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。

发送消息的三种方式实例

  • 并发并忘记
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.kafka.client;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerDemo {
/**
* @Author:落墨
* @CreateTime:2020/7/1
* @Description:KafkaProducer
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties kafkaPropertie = new Properties();
//配置broker地址,配置多个容错
kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
//配置key-value允许使用参数化类型
kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);

ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

kafkaProducer.send(record);

}
}
  • 同步发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.kafka.client;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @Author:落墨
* @CreateTime:2020/7/1
* @Description:KafkaProducer
*/
public class KafkaProducerDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties kafkaPropertie = new Properties();
//配置broker地址,配置多个容错
kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
//配置key-value允许使用参数化类型
kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
//创建消息对象,第一个为参数topic,第二个参数为key,第三个参数为value
ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

//同步发送方式,get方法返回结果
RecordMetadata metadata = (RecordMetadata) kafkaProducer.send(record).get();
System.out.println("broker返回消息发送信息" + metadata);

}
}
  • 异步发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.kafka.client;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @Author:落墨
* @CreateTime:2020/7/1
* @Description:KafkaProducer
*/
public class KafkaProducerDemo {

public static void main(String[] args) {
Properties kafkaPropertie = new Properties();
//配置broker地址,配置多个容错
kafkaPropertie.put("bootstrap.servers", "node1:9092,node1:9093,node1:9094");
//配置key-value允许使用参数化类型
kafkaPropertie.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaPropertie.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer(kafkaPropertie);
//创建消息对象,第一个为参数topic,第二个参数为key,第三个参数为value
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic","key1","hello world");

//异步发送消息。异常时打印异常信息或发送结果
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println(e.getMessage());
} else {
System.out.println("接收到返回结果:" + recordMetadata);
}
}
});
//异步发送消息时必须要flush,否则发送不成功,不会执行回调函数
kafkaProducer.flush();
}
}

Producer 性能调优

1.一段Kafka生产端的实例代码

1
2
3
4
5
6
7
8
9
10
11
12
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 67108864);
props.put("batch.size", 131072);
props.put("linger.ms", 100);
props.put("max.request.size", 10485760);
props.put("acks", "1");
props.put("retries", 10);
props.put("retry.backoff.ms", 500);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

2.内存缓冲的大小

首先我们看看“buffer.memory”这个参数是什么意思?

Kafka的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说,你通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。

所以这个“buffer.memory”的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,他的默认值是32MB。

那么既然了解了这个含义,大家想一下,在生产项目里,这个参数应该怎么来设置呢?

你可以先想一下,如果这个内存缓冲设置的过小的话,可能会导致一个什么问题?

首先要明确一点,那就是在内存缓冲里大量的消息会缓冲在里面,形成一个一个的Batch,每个Batch里包含多条消息。

然后KafkaProducer有一个Sender线程会把多个Batch打包成一个Request发送到Kafka服务器上去。

那么如果要是内存设置的太小,可能导致一个问题:消息快速的写入内存缓冲里面,但是Sender线程来不及把Request发送到Kafka服务器。

这样是不是会造成内存缓冲很快就被写满?一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。

所以对于“buffer.memory”这个参数应该结合自己的实际情况来进行压测,你需要测算一下在生产环境,你的用户线程会以每秒多少消息的频率来写入内存缓冲。

比如说每秒300条消息,那么你就需要压测一下,假设内存缓冲就32MB,每秒写300条消息到内存缓冲,是否会经常把内存缓冲写满?经过这样的压测,你可以调试出来一个合理的内存大小。

3.多少数据打包为一个Batch合适?

接着你需要思考第二个问题,就是你的“batch.size”应该如何设置?这个东西是决定了你的每个Batch要存放多少数据就可以发送出去了。

比如说你要是给一个Batch设置成是16KB的大小,那么里面凑够16KB的数据就可以发送了。

这个参数的默认值是16KB,一般可以尝试把这个参数调节大一些,然后利用自己的生产环境发消息的负载来测试一下。

比如说发送消息的频率就是每秒300条,那么如果比如“batch.size”调节到了32KB,或者64KB,是否可以提升发送消息的整体吞吐量。

因为理论上来说,提升batch的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。

但是这个东西也不能无限的大,过于大了之后,要是数据老是缓冲在Batch里迟迟不发送出去,那么岂不是你发送消息的延迟就会很高。

比如说,一条消息进入了Batch,但是要等待5秒钟Batch才凑满了64KB,才能发送出去。那这条消息的延迟就是5秒钟。

所以需要在这里按照生产环境的发消息的速率,调节不同的Batch大小自己测试一下最终出去的吞吐量以及消息的 延迟,设置一个最合理的参数。

4.要是一个Batch迟迟无法凑满怎么办?

要是一个Batch迟迟无法凑满,此时就需要引入另外一个参数了,“linger.ms”

他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。

给大家举个例子,比如说batch.size是16kb,但是现在某个低峰时间段,发送消息很慢。

这就导致可能Batch被创建之后,陆陆续续有消息进来,但是迟迟无法凑够16KB,难道此时就一直等着吗?

当然不是,假设你现在设置“linger.ms”是50ms,那么只要这个Batch从创建开始到现在已经过了50ms了,哪怕他还没满16KB,也要发送他出去了。

所以“linger.ms”决定了你的消息一旦写入一个Batch,最多等待这么多时间,他一定会跟着Batch一起发送出去。

避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。这是一个很关键的参数。

这个参数一般要非常慎重的来设置,要配合batch.size一起来设置。

举个例子,首先假设你的Batch是32KB,那么你得估算一下,正常情况下,一般多久会凑够一个Batch,比如正常来说可能20ms就会凑够一个Batch。

那么你的linger.ms就可以设置为25ms,也就是说,正常来说,大部分的Batch在20ms内都会凑满,但是你的linger.ms可以保证,哪怕遇到低峰时期,20ms凑不满一个Batch,还是会在25ms之后强制Batch发送出去。

如果要是你把linger.ms设置的太小了,比如说默认就是0ms,或者你设置个5ms,那可能导致你的Batch虽然设置了32KB,但是经常是还没凑够32KB的数据,5ms之后就直接强制Batch发送出去,这样也不太好其实,会导致你的Batch形同虚设,一直凑不满数据。

5.最大请求大小

“max.request.size”这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值,这个其实可以根据你自己的消息的大小来灵活的调整。

给大家举个例子,你们公司发送的消息都是那种大的报文消息,每条消息都是很多的数据,一条消息可能都要20KB。

此时你的batch.size是不是就需要调节大一些?比如设置个512KB?然后你的buffer.memory是不是要给的大一些?比如设置个128MB?

只有这样,才能让你在大消息的场景下,还能使用Batch打包多条消息的机制。但是此时“max.request.size”是不是也得同步增加?

因为可能你的一个请求是很大的,默认他是1MB,你是不是可以适当调大一些,比如调节到5MB?

6.重试机制

“retries”和“retries.backoff.ms”决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒。

这个大家适当设置几次重试的机会,给一定的重试间隔即可,比如给100ms的重试间隔。

7.持久化机制

“acks”参数决定了发送出去的消息要采用什么样的持久化策略,这个涉及到了很多其他的概念,大家可以参考之前专门为“acks”写过的一篇文章:

《简历写了会Kafka,面试官90%会让你讲讲acks参数对消息持久化的影响》

参考文章

Kafka参数调优实战

文章目录
  1. 1. 前言
  2. 2. 生产者概述
  3. 3. Kafka的配置属性
    1. 3.1. 发送消息的三种方式实例
  4. 4. Producer 性能调优
    1. 4.1. 1.一段Kafka生产端的实例代码
    2. 4.2. 2.内存缓冲的大小
    3. 4.3. 3.多少数据打包为一个Batch合适?
    4. 4.4. 4.要是一个Batch迟迟无法凑满怎么办?
    5. 4.5. 5.最大请求大小
    6. 4.6. 6.重试机制
    7. 4.7. 7.持久化机制
,