@myecho
2016-11-22T21:53:21.000000Z
字数 1291
阅读 957
Kafka
在producer端需要配置如下的两个参数,才能使用消息压缩功能在使用kafkacat时则只需要指定-z snappy即可,通过看源代码发现,其读到-z opt后会自动的进行这两个kafka参数的配置。另外一点我们需要注意kafka的消息压缩的功能,如下是官方文档解释:
A set of messages can be compressed and represented as one compressed
message.
是的,并不是单独压缩一条Message,而是会压缩许多Message,具体是多少条压缩在一起有待探究。
注意还有一点需要我们注意的是,在发送Message时仍然会受到参数message.max.bytes的限制,因此当你使用kafkacat发送时,不要将消息存储文件中再进行发送,因为一个文件会被当作一条消息发送,直接被卡掉,不会进行压缩再进行发送,可以将文件以tail -f 然后再接入管道发送到kafcat,即可完成消息被压缩的过程。
压缩后的消息格式如下所示
前一个字节标识版本,第二个字节标识使用的压缩算法,后四个字节存储CRC校验码。
另外,参数message.max.bytes在kafka端也有配置,所以在一条压缩消息到达kafka时,也会判断压缩消息是否大于了参数值。
kafka这里主要牵扯到如何对压缩的消息进行offset的计算的问题。看两个地方的描述,
上述描述出自Linkedin的一篇技术文章,这段描述介绍了kafka0.8对压缩消息的处理是,当发现是压缩消息,首先解压缩,然后压缩消息就成了一系列的正常消息,然后再进行offset的处理,然后再将压缩消息存储到disk上。
这里提到kafka consumer在读取时,会将压缩的消息当作一个消息来处理进行读取,这里有点疑问?感觉和在kafka上处理offset的方式有点不一致?
kafka Consumer对压缩消息的处理是近乎透明的,不需要用户自己手动完成解压缩的过程,Consumer会根据其头部的压缩算法自动完成这个过程,并且将原始消息送达。
我们可以看到,光靠kafka的消息压缩机制无法完成我们的需求,必须通过手动在消息发送和消息接受端加入消息解压缩机制来满足,不过kafka消息压缩的机制可能就是为了提高Kafka的吞吐量而产生的?