知方号

知方号

Flink从Kafka到Kafka

Flink从Kafka到Kafka

为什么要写这篇文章?

Flink出来已经好几年了,现在release版本已经发布到1.10.0(截止2023-05-05),统一了批处理和流处理,很多大公司也都用到生实际务中,跑得也很high。这些大家都知道,但是当我开始考虑怎么在工作中落地flink的时候,我不知道怎么入手。公司比较小,目前没有实时计算,但是etl任务跑得比较慢,效率上有些跟不上。我的思路是想先试着用Flink来处理一些离线任务,看看能不能提升效率,同时为落地实时计算做准备。全网找了半天资料,文章倒是很多,包括一些付费资源,大部分的实例代码都跑不通,真的是跑不通。当然有部分原因是因为我对flink了解太少,但是完整的跑通除了word count之外的代码不应该是一件比较麻烦的事。

功能说明

1.生成json格式数据写入kafka topic1

2.消费topic1中的消息,写入topic2

目的很简单,如果要落地到具体业务免不了需要做多次的数据处理,Flink虽说是可以做批处理,但是支持得最好的还是流数据,确切的说是kafka的数据,跑通了这个流程,实际上Flink的落地就只差业务逻辑了,现在有Flink SQL,实现业务逻辑也是分分钟的事。

代码

其实只有4个文件

代码语言:javascript复制├── flink-learn-kafka-sink.iml├── pom.xml└── src ├── main │   ├── java │   │   └── org │   │   └── apache │   │   └── flink │   │   └── learn │   │   ├── Sink2Kafka.java │   │   ├── model │   │   │   └── FamilyMemberTemperatureRecord.java │   │   └── utils │   │   ├── GsonUtil.java │   │   └── KafkaGenDataUtil.java │   └── resources └── test └── java

pom依赖

代码语言:javascript复制 UTF-8 1.10.0 1.8 2.11 ${java.version} ${java.version} com.google.code.gson gson 2.8.5 org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} org.apache.kafka kafka-clients 0.11.0.2

model

新冠肺炎影响身边每一个人,举了一个测体温记录测例子

代码语言:java复制package org.apache.flink.learn.model;public class FamilyMemberTemperatureRecord { private int id; // 测量次数 private String name; // 姓名 private String temperature; // 体温 private String measureTime; // 测量时间 public FamilyMemberTemperatureRecord(int id, String name, String temperature, String measureTime) { this.id = id; this.name = name; this.temperature = temperature; this.measureTime = measureTime; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTemperature() { return temperature; } public void setTemperature(String temperature) { this.temperature = temperature; } public String getMeasureTime() { return measureTime; } public void setMeasureTime(String measureTime) { this.measureTime = measureTime; }}

json工具类

将对象解析为json格式的数据发给kafka

代码语言:java复制package org.apache.flink.learn.utils;import com.google.gson.Gson;import java.nio.charset.Charset;/** * Desc: json工具类 * Created by suddenly on 2023-05-05 */ public class GsonUtil { private final static Gson gson = new Gson(); public static T fromJson(String value, Class type) { return gson.fromJson(value, type); } public static String toJson(Object value) { return gson.toJson(value); } public static byte[] toJSONBytes(Object value) { return gson.toJson(value).getBytes(Charset.forName("UTF-8")); }}

数据生成工具类

代码语言:java复制package org.apache.flink.learn.utils;import org.apache.flink.learn.model.FamilyMemberTemperatureRecord;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.commons.lang3.RandomUtils;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Properties;/** * Desc: 生成数据,写到kafka中 * Created by suddenly on 2023-05-05 */ public class KafkaGenDataUtil { private static final String broker_list = "localhost:9092"; private static final String topic = "tempeature-source"; // 数据源topic public static void genDataToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); try { for (int i = 1; i

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至lizi9903@foxmail.com举报,一经查实,本站将立刻删除。