八、Kafka 的 Consumer API

bridge
2022-01-27 / 0 评论 / 0 点赞 / 1,000 阅读 / 13,678 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-01-27,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1.消费者API

  • Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故
    不用担心数据丢失问题。
  • 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
  • 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

2.消费者和消费者群组

  • kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
  • 假设主题T1有4个分区,我们创建了消费者C1,它是群组G1里唯一的消费者,我们用它订阅主题T1。消费者C1将收到主题T1全部的4个分区的消息,如下

image

  • 如果在群组G1里新增一个消费者C2,那么每个消费者将分别从两个分区接收消息。如下

image

  • 如果群组G1有4个消费者,那么每个消费者可以分配到一个分区

image

  • 如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息,如下图

image

  • 往群组里增加消费者是横向伸缩消费能力的主要方式。kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。但是要注意,不能让消费者的数量超过主题分区的数量,多余的消费者会被闲置。
  • 多个应用程序从同一个主题读取数据,kafka设计的主要目标之一,就是要让kafka主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分,只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题的所有消息。
  • 横向伸缩kafka消费者和消费者群组并不会对性能造成负面影响。

image

3.消费者群组和分区再均衡

  • 群组里的消费者共同读取主题的分区。一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其它消费者来读取。在主题发生变化时,比如添加了新的分区,会发生分区重分配。
  • 分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心的添加或移除消费者)。
  • 再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用,所以我们不希望发生这样的行为。当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
  • 消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮训消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
  • 如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
  • 分配分区的过程:
    • 当消费者要加入群组时,它会向群组协调器发送一个joinGroup请求。第一个加入群组的消费者将成为”群主“。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

4.消费者的常用配置

  • 1.fetch.min.bytes
    • 该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候就不需要来来回回地处理消息。如果没有很多可用的数据,但消费者的CPU使用率却很高,可以将此属性值设置的比默认值大。如果消费者的数量较多,把该属性值的值设置的大一点可以降低broker的工作负载。
  • 2.fetch.max.wait.ms
    • 该属性指定broker返回消息的等待时间,默认是500ms。如果没有足够的数据流入kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。如果要降低潜在的延迟(为了满足SLA),可以把该参数值设置的小一些。如果fetch.max.wait.ms被设为100ms,并且fetch.min.bytes被设为1MB,kafka在收到消费者的请求后,要么返回1MB的数据,要么在100ms后返回可用的数据,只要有一个条件满足了,就会立马返回。
  • 3.max.partition.fetch.bytes
    • 该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB。KafkaConsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
    • max.partition.fetch.bytes的值必须比broker能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
    • 在设置此值时,还需要考虑消费者处理数据的时间。消费者需要频繁的调用poll()方法来避免会话过期和发生分区的再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。出现这种情况,可以把max.partition.fetch.bytes改小,或者延长会话过期时间。
  • 4.session.timeout.ms
    • 该属性值指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其它消费者。heartbeat.interval.ms指定了poll()方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的三分之一。
    • session.timeout.ms调小:可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。
    • session.timeout.ms调大:可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。
  • 5.auto.offset.reset
    • 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是latest,偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,偏移量无效的情况下,消费者将从起始位置读取分区的记录。
  • 6.enable.auto.commit
    • 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
  • 7.partition.assignment.strategy
    • 分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。kafka有两个默认的分配策略
    • Range(默认):该策略会把主题的若干个连续的分区分配给消费者。假设消费者C1和C2同时订阅了主题T1和主题T2,并且每个主题有3个分区。那么消费者C1有可能分配到这两个主题的分区0和分区1,四个分区;而消费者C2分配到这两个主题的分区2,两个分区。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
    • org.apache.kafka.clients.consumer.RangeAssignor
    • RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin策略来给消费者C1和消费者C2分配分区,那么消费者C1将分到主题T1的分区0和分区2以及T2主题的分区1;消费者C2将分配到主题T1的分区1以及主体T2的分区0和分区2.一般来说,如果所有消费者都订阅相同的主题,RoundRobin策略会给所有消费者分配相同数量的分区(最多差一个分区)。
    • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • 8.client.id
    • 该属性可以是任意字符串,broker用它来标记从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
  • 9.max.poll.records
    • 该属性用于控制单次调用poll()方法能够返回的记录数量,可以控制在轮询里需要处理的数据量。
  • 10.receive.buffer.bytes和send.buffer.bytes
    • socket在读写数据时用到的TCP缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

5.消费者API

  • 自动提交offset方式
    • 轮询不只是获取数据那么简单。在第一次调用新消费者的poll方法时,它会负责查找GroupCoordinator,然后加入群组,接收分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。心跳也是从轮询里发出的。所以,需要确保在轮询期间所做的任何处理工作尽可能快的完成。
    • 在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。
  • 手动提交 offset
    • 虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
    • 手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。
    • 同步提交 offset
      • 由于同步提交 offset 有失败重试机制,故更加可靠;
    • 异步提交 offset
      • 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下;
private static Properties props = new Properties();

static {
    //指定kafka server的地址,集群配多个,中间,逗号隔开
    props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
    //指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
    props.put("group.id", "test");
    //指定消息key和消息体的序列化方式
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}

public static void main(String[] args) {

}

/**
 * 自动提交偏移量
 */
public static void commitAuto() {
    //设置自动提交offset
    props.put("enable.auto.commit", "true");
    //自动提交 offset 的时间间隔
    props.put("auto.commit.interval.ms", "1000");
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Collections.singleton("test-topic"));
    log.info("consumer start");
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
    }
}

/**
 * 同步提交偏移量
 */
public static void commitSelfSync() {
    //设置自动提交offset
    props.put("enable.auto.commit", "false");
    //自动提交 offset 的时间间隔
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Collections.singleton("test-topic"));
    log.info("consumer start");
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        //同步手动提交偏移量,只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。
        consumer.commitSync();
    }
}

/**
 * 异步提交偏移量
 */
public static void commitSelfAsync() {
    //设置自动提交offset
    props.put("enable.auto.commit", "false");
    //自动提交 offset 的时间间隔
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Collections.singleton("test-topic"));
    log.info("consumer start");
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        //异步提交
        consumer.commitAsync(new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    System.err.println("Commit failed for" +
                            offsets);
                }
            }
        });
    }
}

6.自定义存储 offset

  • Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。 offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
  • 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
  • 消费者发生 Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset位置继续消费。
  • 要实现自定义存储 offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
//方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里读取了。
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
//方法会在重新分配分区之后和消费者开始读取消息之前被调用。
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
示例代码
private static Properties props = new Properties();

static {
    //指定kafka server的地址,集群配多个,中间,逗号隔开
    props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
    //指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
    props.put("group.id", "test");
    //指定消息key和消息体的序列化方式
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}

public static void main(String[] args) {
    commitSelfSyncRebalance();
}

/**
 * 同步提交偏移量
 */
public static void commitSelfSyncRebalance() {
    log.info("consumer start1");

    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    //设置自动提交offset
    props.put("enable.auto.commit", "false");
    //自动提交 offset 的时间间隔
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Collections.singleton("test-topic"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                log.info("onPartitionsAssigned: [{}]",partition);
                consumer.seek(partition, 300);//定位到最近提交的 offset 位置继续消费
            }
        }
    });
    log.info("consumer start2");
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            offsets.put(new TopicPartition("test-topic",record.partition()),new OffsetAndMetadata(record.offset()));
        }
        //同步手动提交偏移量,只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。
        consumer.commitSync(offsets);
    }
}

7.拦截器

拦截器原理
拦截器案例
  • 实现消费者拦截器接口
    • 在消息后面增加":MyConsumerInterceptor"
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
//        log.info("onConsume");
        Map<TopicPartition, List<ConsumerRecord<String, String>>> map = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> recordsList = records.records(partition);
            List<ConsumerRecord<String, String>> newRecordsList = new ArrayList<>();
            for (ConsumerRecord<String, String> item : recordsList) {
                newRecordsList.add(new ConsumerRecord<>(item.topic(), item.partition(), item.offset(), item.key(), item.value() + ":MyConsumerInterceptor"));
            }
            map.put(partition, newRecordsList);
        }
        return new ConsumerRecords<>(map);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        
    }

    @Override
    public void close() {
        log.info("close");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        log.info("configure: [{}]", configs);
    }
}
  • 在消费者配置自定义拦截器
@Slf4j
public class MyConsumer {

    private static Properties props = new Properties();

    static {
        //指定kafka server的地址,集群配多个,中间,逗号隔开
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        //指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //指定消息key和消息体的序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {
        commitSelfSyncInterceptor();
    }
    
    /**
     * 同步提交偏移量
     */
    public static void commitSelfSyncInterceptor() {
        log.info("commitSelfSyncInterceptor start");

        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        //设置自动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 2 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.liguanqiao.study.kafka.MyConsumerInterceptor");
        //设置拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
        //自动提交 offset 的时间间隔
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singleton("test-topic"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    log.info("onPartitionsAssigned: [{}]", partition);
                    consumer.seek(partition, 300);//定位到最近提交的 offset 位置继续消费
                }
            }
        });
        log.info("commitSelfSyncInterceptor start2");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                log.info("topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                offsets.put(new TopicPartition("test-topic", record.partition()), new OffsetAndMetadata(record.offset()));
            }
            //同步手动提交偏移量,只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。
            consumer.commitSync(offsets);
        }
    }
}
0

评论区