我们经常关注的是Kafka消费者的消费策略,但是又会在某些特殊情况下要求自定义生产者的分区策略:如按序消费:将所有数据写入一个分区中。又或者默认的策略满足我们的业务需求。
值得一提的是,FlinkKafkaProducer(这里版本13.0.2,更早期的版本如1.10.0中是带版本号的FlinkKafkaProducer10等),和Kafka(这里版本kafka_2.11 1.1.1)的api中的KafkaProducer写入分区策略并不相同。 即Flink写kafka使用的机制与原生接口的写入方式是有差别的。
默认分区策略强调一下:Flink写kafka使用的机制与原生接口的写入方式是有差别的。
Kafka原生API默认是DefaultPartitioner:org.apache.kafka.clients.producer.internals.DefaultPartitioner:org
如果既没有partition值又没有key值的情况下,Kafka采用Sticky Partition (黏性分区器),会随机选择1个分区, 并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
如果存在key值的情况下,则根据key值取余分区数决定写入分区。
涵括还包括共三个实现类:
指定使用:
props.put("partitioner.class", Partitioner.class);//或者 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG , RoundRobinPartitioner.class); Flink的producerAPI从构造函数来看,可以看出默认的是FlinkFixedPartitioner:org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
并行度%分区数量=0,表示并行度是kafkatopic分区数的多倍,数据的写入每个分区数据量是均衡的;并行度%分区数量≠0,那么数据量势必会在每个分区上的数据量产生倾斜;FlinkFixedPartitioner是根据Flink运行子任务的并行度进行分区数的取余写入的。parallelInstanceId来自另外一个open方法,暂时略过,等下再提及。
实操场景 先建立主题分区kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic partitionTest
分区数据获取kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic partitionTest
消费打印样例举例,本地机器四核CPU,故此dem程序默认四并行度,写入四个分区。最好是并行度和写的分区数一一致。
1> 测试数据:6
1> 测试数据:14
3> 测试数据:2
3> 测试数据:10
2> 测试数据:0
2> 测试数据:8
4> 测试数据:60
4> 测试数据:68
查询主题的offset偏移量:
这显然不是我们想要的情况,我需要他们均匀分布满足我的要求,所以进行自定义分区器的编写。
自定义Flink分区策略实际上在运行过程中,发现的的确确每一个并行度都是一个子任务线程(完整的起一模一样的producer)。
参考kafka原生和flink的默认分区器:
open:重写方法:接受并行度和id;partition:重写方法:分区逻辑方法,核心;nextValue:根据方法:原子类单调递增,computeIfAbsent如果没有则创建变量。 /** * @author cbry * @version 1.0.0 * @ClassName FlinkRoundRobinPartitioner * @createTime 2023年06月02日 11:56:00 * 轮询写入topic的分区,参考org.apache.kafka.producer.RoundRobinPartitioner * 以及(默认)org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; * * @Desc 轮询的是并行度(默认机器CPU核数)子线程的写入。 */public class FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner { private static final long serialVersionUID = -3785323215233858777L; private int parallelInstanceId; private final ConcurrentMap topicCounterMap = new ConcurrentHashMap(); public FlinkRoundRobinPartitioner() { } /** * 构建子线程创建时候执行,只执行一次相当于赋值线程名称标识:并行度 */ @Override public void open(int parallelInstanceId, int parallelInstances) { System.out.println("写入分区:open"); Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); this.parallelInstanceId = parallelInstanceId; } @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { //并行度从1开始算 , 分区从0开始算 //int writePartiton = this.nextValue(targetTopic) % partitions.length; //System.out.println("并行度" + parallelInstanceId + "写入分区:" + writePartiton); return this.nextValue(targetTopic) % partitions.length; } /** * 越界问题:Integer.MAX_VALUE + 1 = Integer.MIX_VALUE; */ private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { System.out.println("写入分区:初始化"); return new AtomicInteger(0); }); return counter.getAndIncrement(); }}我们可以明显的看到,单个并行度(子线程producer)是按轮询顺序写的,这得益于我们的原子类。
传几个数据看看:
主程调用溯源FlinkKafkaProducer构造源码可以得带:
FlinkKafkaProducer producer = new FlinkKafkaProducer("partitionTest", new SimpleStringSchema(), properties , Optional.of(new FlinkRoundRobinPartitioner())); 写在末尾主程的模拟数据源可以参考:Flink的常见算子和实例代码,自己修改定义。
实际上这也是生产上经常需要做的自定义操作之一。