知方号

知方号

FlinkKafkaProducer默认和自定义分区策略

FlinkKafkaProducer默认和自定义分区策略

前言

我们经常关注的是Kafka消费者的消费策略,但是又会在某些特殊情况下要求自定义生产者的分区策略:如按序消费:将所有数据写入一个分区中。又或者默认的策略满足我们的业务需求。

值得一提的是,FlinkKafkaProducer(这里版本13.0.2,更早期的版本如1.10.0中是带版本号的FlinkKafkaProducer10等),和Kafka(这里版本kafka_2.11 1.1.1)的api中的KafkaProducer写入分区策略并不相同。 即Flink写kafka使用的机制与原生接口的写入方式是有差别的。

默认分区策略

强调一下:Flink写kafka使用的机制与原生接口的写入方式是有差别的。

Kafka原生API

默认是DefaultPartitioner:org.apache.kafka.clients.producer.internals.DefaultPartitioner:org

如果既没有partition值又没有key值的情况下,Kafka采用Sticky Partition (黏性分区器),会随机选择1个分区, 并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

如果存在key值的情况下,则根据key值取余分区数决定写入分区。

涵括还包括共三个实现类:

指定使用:

props.put("partitioner.class", Partitioner.class);//或者 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG , RoundRobinPartitioner.class); Flink的producerAPI

从构造函数来看,可以看出默认的是FlinkFixedPartitioner:org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner

并行度%分区数量=0,表示并行度是kafkatopic分区数的多倍,数据的写入每个分区数据量是均衡的;并行度%分区数量≠0,那么数据量势必会在每个分区上的数据量产生倾斜;

FlinkFixedPartitioner是根据Flink运行子任务的并行度进行分区数的取余写入的。parallelInstanceId来自另外一个open方法,暂时略过,等下再提及。

实操场景 先建立主题分区

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic partitionTest

分区数据获取

kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic partitionTest

消费打印

样例举例,本地机器四核CPU,故此dem程序默认四并行度,写入四个分区。最好是并行度和写的分区数一一致。

1> 测试数据:6

1> 测试数据:14

3> 测试数据:2

3> 测试数据:10

2> 测试数据:0

2> 测试数据:8

4> 测试数据:60

4> 测试数据:68

查询主题的offset偏移量:

这显然不是我们想要的情况,我需要他们均匀分布满足我的要求,所以进行自定义分区器的编写。

自定义Flink分区策略

实际上在运行过程中,发现的的确确每一个并行度都是一个子任务线程(完整的起一模一样的producer)。

参考kafka原生和flink的默认分区器:

open:重写方法:接受并行度和id;partition:重写方法:分区逻辑方法,核心;nextValue:根据方法:原子类单调递增,computeIfAbsent如果没有则创建变量。 /** * @author cbry * @version 1.0.0 * @ClassName FlinkRoundRobinPartitioner * @createTime 2023年06月02日 11:56:00 * 轮询写入topic的分区,参考org.apache.kafka.producer.RoundRobinPartitioner * 以及(默认)org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; * * @Desc 轮询的是并行度(默认机器CPU核数)子线程的写入。 */public class FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner { private static final long serialVersionUID = -3785323215233858777L; private int parallelInstanceId; private final ConcurrentMap topicCounterMap = new ConcurrentHashMap(); public FlinkRoundRobinPartitioner() { } /** * 构建子线程创建时候执行,只执行一次相当于赋值线程名称标识:并行度 */ @Override public void open(int parallelInstanceId, int parallelInstances) { System.out.println("写入分区:open"); Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); this.parallelInstanceId = parallelInstanceId; } @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { //并行度从1开始算 , 分区从0开始算 //int writePartiton = this.nextValue(targetTopic) % partitions.length; //System.out.println("并行度" + parallelInstanceId + "写入分区:" + writePartiton); return this.nextValue(targetTopic) % partitions.length; } /** * 越界问题:Integer.MAX_VALUE + 1 = Integer.MIX_VALUE; */ private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { System.out.println("写入分区:初始化"); return new AtomicInteger(0); }); return counter.getAndIncrement(); }}

我们可以明显的看到,单个并行度(子线程producer)是按轮询顺序写的,这得益于我们的原子类。

传几个数据看看:

主程调用

溯源FlinkKafkaProducer构造源码可以得带:

FlinkKafkaProducer producer = new FlinkKafkaProducer("partitionTest", new SimpleStringSchema(), properties , Optional.of(new FlinkRoundRobinPartitioner())); 写在末尾

主程的模拟数据源可以参考:Flink的常见算子和实例代码,自己修改定义。

实际上这也是生产上经常需要做的自定义操作之一。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至lizi9903@foxmail.com举报,一经查实,本站将立刻删除。