Flink出来已经好几年了,现在release版本已经发布到1.10.0(截止2023-05-05),统一了批处理和流处理,很多大公司也都用到生实际务中,跑得也很high。这些大家都知道,但是当我开始考虑怎么在工作中落地flink的时候,我不知道怎么入手。公司比较小,目前没有实时计算,但是etl任务跑得比较慢,效率上有些跟不上。我的思路是想先试着用Flink来处理一些离线任务,看看能不能提升效率,同时为落地实时计算做准备。全网找了半天资料,文章倒是很多,包括一些付费资源,大部分的实例代码都跑不通,真的是跑不通。当然有部分原因是因为我对flink了解太少,但是完整的跑通除了word count之外的代码不应该是一件比较麻烦的事。
功能说明1.生成json格式数据写入kafka topic1
2.消费topic1中的消息,写入topic2
目的很简单,如果要落地到具体业务免不了需要做多次的数据处理,Flink虽说是可以做批处理,但是支持得最好的还是流数据,确切的说是kafka的数据,跑通了这个流程,实际上Flink的落地就只差业务逻辑了,现在有Flink SQL,实现业务逻辑也是分分钟的事。
代码其实只有4个文件
代码语言:javascript复制├── flink-learn-kafka-sink.iml├── pom.xml└── src ├── main │ ├── java │ │ └── org │ │ └── apache │ │ └── flink │ │ └── learn │ │ ├── Sink2Kafka.java │ │ ├── model │ │ │ └── FamilyMemberTemperatureRecord.java │ │ └── utils │ │ ├── GsonUtil.java │ │ └── KafkaGenDataUtil.java │ └── resources └── test └── javapom依赖
代码语言:javascript复制 UTF-8 1.10.0 1.8 2.11 ${java.version} ${java.version} com.google.code.gson gson 2.8.5 org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} org.apache.kafka kafka-clients 0.11.0.2model
新冠肺炎影响身边每一个人,举了一个测体温记录测例子
代码语言:java复制package org.apache.flink.learn.model;public class FamilyMemberTemperatureRecord { private int id; // 测量次数 private String name; // 姓名 private String temperature; // 体温 private String measureTime; // 测量时间 public FamilyMemberTemperatureRecord(int id, String name, String temperature, String measureTime) { this.id = id; this.name = name; this.temperature = temperature; this.measureTime = measureTime; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTemperature() { return temperature; } public void setTemperature(String temperature) { this.temperature = temperature; } public String getMeasureTime() { return measureTime; } public void setMeasureTime(String measureTime) { this.measureTime = measureTime; }}json工具类
将对象解析为json格式的数据发给kafka
代码语言:java复制package org.apache.flink.learn.utils;import com.google.gson.Gson;import java.nio.charset.Charset;/** * Desc: json工具类 * Created by suddenly on 2023-05-05 */ public class GsonUtil { private final static Gson gson = new Gson(); public static T fromJson(String value, Class type) { return gson.fromJson(value, type); } public static String toJson(Object value) { return gson.toJson(value); } public static byte[] toJSONBytes(Object value) { return gson.toJson(value).getBytes(Charset.forName("UTF-8")); }}数据生成工具类
代码语言:java复制package org.apache.flink.learn.utils;import org.apache.flink.learn.model.FamilyMemberTemperatureRecord;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.commons.lang3.RandomUtils;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Properties;/** * Desc: 生成数据,写到kafka中 * Created by suddenly on 2023-05-05 */ public class KafkaGenDataUtil { private static final String broker_list = "localhost:9092"; private static final String topic = "tempeature-source"; // 数据源topic public static void genDataToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); try { for (int i = 1; i