前言
不管是把Kafka 作为消息队列、消息、总线还是数据存储平台来使用,总是需要有一个可以往Kafka 写入数据的生产者和一个可以从Kafka 读取数据的消费者,或者一个兼具两种角色的应用程序。
生产者概述
一个应用程序在很多情况下需要往Kafka 写入消息: 记录用户的活动(用于审计和分析)、记录度量指标、保存日志、消息、记录智能家电的信息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据,等等。
在保存网站的点击信息场景里,允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用户体验就行。换句话说,只要用户点击链接后可以马上加载页面,那么我们并不介意消息要在几秒钟之后才能到达Kafka 服务器。吞吐量则取决于网站用户使用网站的频度。
不同的使用场景对生产者API 的使用和配置会有直接的影响。尽管生产者API 使用起来很简单, 但消息的发送过程还是有点复杂。如图所示:
我们从创建一个ProducerRecord 对象开始, Producer Record 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前在ProducerRcord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据Producer Record 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker 上。服务器在收到这些消息时会返回一个响应。如果消息成功写入Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
Kafka的配置属性
Kafka生产者有3个必选属性:bootstrap.servers,key.serializer,value.serializer。
- bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port.清单里不需要包含所有的broker地址,生产者会从给定的broker 里查找到其他broker的信息。不过建议至少要提供两个broker 的信息, 一且其中一个宕机,生产者仍然能够连接到集群上。
- key.serializer:broker :希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把java 对象作为键和值发送给broker 。这样的代码具有良好的可读性,不过生产者需要知道如何把这些java 对象转换成字节数组。key. serializer必须被设置为一
个实现了org.apache.kafka.common.seialization.Serialize 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了ByteArraySeializer(这个只做很少的事情)、StringSerializer 和IntegerSerializer,因此,如果你只使用常见的几种java 对象类型,那么就没必要实现自己的序列化器。要注意, key.serializer 是必须设置的,就算你打算只发送值内容。 - value.serializer:与key.serializer一样, value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。
- acks
acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。主参数有如下选项。
• 如果acks=0 , 生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,
如果当中出现了问题, 导致服务器没有收到消息,那么生产者就无从得知,消息也就丢
失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大
速度发送消息,从而达到很高的吞吐量。
如果acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future对象的get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
如果acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。 - buffer.me mory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send ()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer. full 参数(在0. 9.0.0 版本里被替换成了max.block.ms,表示在抛出异常之前可以阻塞一段时间)。 - compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、gzip 或lz 4 ,它指定了消息被发送给broker 之前使用哪一种压缩算也进行压缩。snappy 压缩算法由Google发明,它占用较少的CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的CPU ,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储销,而这往往是向Kafka 发送消息的瓶颈所在。 retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况
下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待l00 ms ,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。- linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProduce 会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程, 生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms 设置成比0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了) 。 - client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。 - max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。 - timeout.ms 、request.timeout.ms 和metadata. fetch. timeout. ms
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metada.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms指定了broker 等待同步副本返回消息确认的时间,与asks 的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误。 - max.block.ms
该参数指定了在调用send () 方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。 - max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB ,那么可以发送的单个最大消息为1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了1000 个消息,每个消息大小为1 KB。另外, broker 对可接收的消息最大值也有自己的限制(message.max.bytes ),所以两边的配置最好可以匹配,避免生产者发送的消息被broker 拒绝。 - receive.buffer. bytes 和send . buffer.bytes
这两个参数分别指定了TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为- 1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
下面的代码片段演示如何创建一个生产者,这里只指定了必要的属性,其他使用默认设置
1 | Properties pro = new Properties(); |
实例完生产者对象后,接下来就可以开始发送消息了。发送消息有以下三种方式。
- 发送并忘记( fire- and-forget )
我们把消息发送给服务器,但井不关心它是否正常到达。大多数情况下,消息会正常到达,因为Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。 - 同步发送
我们使用send () 方法发送消息, 它会返回一个Future 对象,调用get () 方法进行等待,就可以知道悄息是否发送成功。 - 异步发送
我们调用send () 方怯,并指定一个回调函数, 服务器在返回响应时调用该函数。在下面的几个例子中, 我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。
发送消息的三种方式实例
- 并发并忘记
1 | package com.kafka.client; |
- 同步发送消息
1 | package com.kafka.client; |
- 异步发送消息
1 | package com.kafka.client; |
Producer 性能调优
1.一段Kafka生产端的实例代码
1 | Properties props = new Properties(); |
2.内存缓冲的大小
首先我们看看“buffer.memory”这个参数是什么意思?
Kafka的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说,你通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。
所以这个“buffer.memory”的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,他的默认值是32MB。
那么既然了解了这个含义,大家想一下,在生产项目里,这个参数应该怎么来设置呢?
你可以先想一下,如果这个内存缓冲设置的过小的话,可能会导致一个什么问题?
首先要明确一点,那就是在内存缓冲里大量的消息会缓冲在里面,形成一个一个的Batch,每个Batch里包含多条消息。
然后KafkaProducer有一个Sender线程会把多个Batch打包成一个Request发送到Kafka服务器上去。
那么如果要是内存设置的太小,可能导致一个问题:消息快速的写入内存缓冲里面,但是Sender线程来不及把Request发送到Kafka服务器。
这样是不是会造成内存缓冲很快就被写满?一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
所以对于“buffer.memory”这个参数应该结合自己的实际情况来进行压测,你需要测算一下在生产环境,你的用户线程会以每秒多少消息的频率来写入内存缓冲。
比如说每秒300条消息,那么你就需要压测一下,假设内存缓冲就32MB,每秒写300条消息到内存缓冲,是否会经常把内存缓冲写满?经过这样的压测,你可以调试出来一个合理的内存大小。
3.多少数据打包为一个Batch合适?
接着你需要思考第二个问题,就是你的“batch.size”应该如何设置?这个东西是决定了你的每个Batch要存放多少数据就可以发送出去了。
比如说你要是给一个Batch设置成是16KB的大小,那么里面凑够16KB的数据就可以发送了。
这个参数的默认值是16KB,一般可以尝试把这个参数调节大一些,然后利用自己的生产环境发消息的负载来测试一下。
比如说发送消息的频率就是每秒300条,那么如果比如“batch.size”调节到了32KB,或者64KB,是否可以提升发送消息的整体吞吐量。
因为理论上来说,提升batch的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升。
但是这个东西也不能无限的大,过于大了之后,要是数据老是缓冲在Batch里迟迟不发送出去,那么岂不是你发送消息的延迟就会很高。
比如说,一条消息进入了Batch,但是要等待5秒钟Batch才凑满了64KB,才能发送出去。那这条消息的延迟就是5秒钟。
所以需要在这里按照生产环境的发消息的速率,调节不同的Batch大小自己测试一下最终出去的吞吐量以及消息的 延迟,设置一个最合理的参数。
4.要是一个Batch迟迟无法凑满怎么办?
要是一个Batch迟迟无法凑满,此时就需要引入另外一个参数了,“linger.ms”
他的含义就是说一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。
给大家举个例子,比如说batch.size是16kb,但是现在某个低峰时间段,发送消息很慢。
这就导致可能Batch被创建之后,陆陆续续有消息进来,但是迟迟无法凑够16KB,难道此时就一直等着吗?
当然不是,假设你现在设置“linger.ms”是50ms,那么只要这个Batch从创建开始到现在已经过了50ms了,哪怕他还没满16KB,也要发送他出去了。
所以“linger.ms”决定了你的消息一旦写入一个Batch,最多等待这么多时间,他一定会跟着Batch一起发送出去。
避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。这是一个很关键的参数。
这个参数一般要非常慎重的来设置,要配合batch.size一起来设置。
举个例子,首先假设你的Batch是32KB,那么你得估算一下,正常情况下,一般多久会凑够一个Batch,比如正常来说可能20ms就会凑够一个Batch。
那么你的linger.ms就可以设置为25ms,也就是说,正常来说,大部分的Batch在20ms内都会凑满,但是你的linger.ms可以保证,哪怕遇到低峰时期,20ms凑不满一个Batch,还是会在25ms之后强制Batch发送出去。
如果要是你把linger.ms设置的太小了,比如说默认就是0ms,或者你设置个5ms,那可能导致你的Batch虽然设置了32KB,但是经常是还没凑够32KB的数据,5ms之后就直接强制Batch发送出去,这样也不太好其实,会导致你的Batch形同虚设,一直凑不满数据。
5.最大请求大小
“max.request.size”这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值,这个其实可以根据你自己的消息的大小来灵活的调整。
给大家举个例子,你们公司发送的消息都是那种大的报文消息,每条消息都是很多的数据,一条消息可能都要20KB。
此时你的batch.size是不是就需要调节大一些?比如设置个512KB?然后你的buffer.memory是不是要给的大一些?比如设置个128MB?
只有这样,才能让你在大消息的场景下,还能使用Batch打包多条消息的机制。但是此时“max.request.size”是不是也得同步增加?
因为可能你的一个请求是很大的,默认他是1MB,你是不是可以适当调大一些,比如调节到5MB?
6.重试机制
“retries”和“retries.backoff.ms”决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒。
这个大家适当设置几次重试的机会,给一定的重试间隔即可,比如给100ms的重试间隔。
7.持久化机制
“acks”参数决定了发送出去的消息要采用什么样的持久化策略,这个涉及到了很多其他的概念,大家可以参考之前专门为“acks”写过的一篇文章:
《简历写了会Kafka,面试官90%会让你讲讲acks参数对消息持久化的影响》。
参考文章