Kafka_Consumer的七种消费场景
从0.10 开始,再无低等级消费者,消息均需通过ConsumerAPI:KafkaConsumer类进行消费,不同的消费场景,不一样的实现,官方提供了7种场景的解决方案:
- Automatic Offset Committing
- Manual Offset Control
- Manual Partition Assignment
- Storing Offsets Outside Kafka
- Controlling The Consumer’s Position
- Consumption Flow Control
- Multi-threaded Processing
下面就基于kafka2.0官方文档,一一拆解:
1. Automatic Offset Committing 自动确认Offset
1 | Properties props = new Properties(); |
说明:
bootstrap.servers
只是代表kafka的连接入口,只需要指定集群中的某一broker- Setting
enable.auto.commit
means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms. - 一旦consumer和kakfa集群建立连接,consumer会以心跳的方式来高速集群自己还活着,如果
session.timeout.ms
内心跳未到达服务器,服务器认为心跳丢失,会做rebalence
2. Manual Offset Control 手动控制Offset
如果consumer在获得数据后需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认。举个栗子:
consumer获得数据后,需要将数据持久化到DB中。自动确认offset的情况下,如果数据从kafka集群读出,就确认,但是持久化过程失败,就会导致数据丢失。我们就需要控制offset的确认。
1 | Properties props = new Properties(); |
还可以精细的控制对具体分区具体offset数据的确认:
1 | try { |
说明:
- 确认的offset为已接受数据最大offset+1。
3.Manual Partition Assignment 指定分区消费
可以向特定的分区订阅消息。但是会失去partion的负载分担。有几种场景可能会这么玩:
- 只需要获取本机磁盘的分区数据;
- 程序自己或者外部程序能够自己实现负载和错误处理。例如YARN/Mesos的介入,当consumer挂掉后,再启动一个consumer。
1 | String topic = "foo"; |
使用此模式,只需调用assign(Collection)
,而不是使用subscribe
订阅主题。
说明:
- 手动分区分配使用了consumer Group,因此consumer故障不会做reblance,即便消费者公用groupid。
- 不可以在同一consumer中混用topic订阅(例如subscribe)和手动分区分配(例如assign),。
4. Storing Offsets Outside Kafka 自主控制Offset
消费者可以自定义kafka的offset存储位置。该设计的主要目的是让消费者将数据和offset进行原子性的存储。这样可以避免上面提到的重复消费问题。
举例说明:
订阅特定分区。存储所获得的记录时,将每条记录的offset一起存储。保证数据和offset的存储是原子性的。当异步存储被异常打断时,凡已经存储的数据,都有有相应的offset记录。这种方式可以保证不会有数据丢失,也不会重复的从服务端读取。
如何配置实现:
- 关闭offset自动确认:enable.auto.commit=false;
- 从ConsumerRecord中获取offset,保存下来;
- Consumer重启时,调用seek(TopicPartition, long)重置在服务端的消费记录。
如果消费分区也是自定义的,这种方式用起来会很爽。如果分区是自动分配的,当分区发生reblance的时候,就要考虑清楚了。如果因为升级等原因,分区漂移到一个不会更新offset的consumer上,那就日了狗了。
该情况下:
- 原consumer需要监听分区撤销事件,并在撤销时确认好offset。接口:ConsumerRebalanceListener.onPartitionsRevoked(Collection);
- 新consumer监听分区分配事件,获取当前分区消费的offset。接口:ConsumerRebalanceListener.onPartitionsAssigned(Collection);
- consumer监听到 ConsumerRebalance事件,还没有处理或者持久化的缓存数据flush掉。
5. Controlling The Consumer’s Position 控制者消费位置
大多数情况下,服务端的Consumer的消费位置都是由客户端间歇性的确认。Kafka允许Consumer自己设置消费起点,达到的效果:
- 可以消费已经消费过的数据;
- 可以跳跃性的消费数据;
看下这样做的一些场景: - 对Consumer来说,数据具备时效性,只需要获取最近一段时间内的数据,就可以进行跳跃性的获取数据;
- 上面自己存offset的场景,重启后就需要从指定的位置开始消费。
接口上面已经提到过了,用seek(TopicPartition, long)
。
6. Consumption Flow Control 控制消费流
如果一个consumer同时消费多个分区,默认情况下,这多个分区的优先级是一样的,同时消费。Kafka提供机制,可以让暂停某些分区的消费,先获取其他分区的内容。场景举栗:
- 流式计算,consumer同时消费两个Topic,然后对两个Topic的数据做Join操作。但是这两个Topic里面的数据产生速率差距较大。Consumer就需要控制下获取逻辑,先获取慢的Topic,慢的读到数据后再去读快的。
- 同样多个Topic同时消费,但是Consumer启动是,本地已经存有了大量某些Topic数据。此时就可以优先去消费下其他的Topic。
调控的手段:让某个分区消费先暂停,时机到了再恢复,然后接着poll。接口:pause(TopicPartition…),resume(TopicPartition…)
7. Multi-threaded Processing 多线程处理模型
Kafka的Consumer的接口为非线程安全的。多线程共用IO,Consumer线程需要自己做好线程同步。
如果想立即终止consumer,唯一办法是用调用接口:wakeup(),使处理线程产生WakeupException。
1 | public class KafkaConsumerRunner implements Runnable { |
说明:
- KafkaConsumerRunner是runnable的,请自觉补脑多线程运行;
- 外部线程控制KafkaConsumerRunner线程的停止;
- 主要说的是多线程消费同一topic,而不是消费同一分区;
比较一下两种模型:
Consumer单线程模型
优点:实现容易;
优点:没有线程之间的协作。通常比下面的那种更快;
优点:单分区数据的顺序处理;
缺点:多个TCP连接,但是关系不大,kafka对自己的server自信满满;
缺点:太多的Request可能导致server的吞吐降低一丢丢;
缺点:consumer数量受到分区数量限制,一个consumer一个分区;
Consumer多线程模型
优点:一个consumer任意多的线程,线程数不用受到分区数限制;
缺点:如果有保序需求,自己要加控制逻辑;
缺点:该模型中如果手动offset,自己要加控制逻辑;
一种可行的解决办法:为每个分区分配独立的存储,获取的数据根据数据所在分区进行 hash存储。这样可以解决顺序消费,和offset的确认问题。
参考
https://blog.csdn.net/xianzhen376/article/details/51167742
http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html