一、背景
最近项目中使用Flink消费kafka消息,并将消费的消息存储到mysql中,看似一个很简单的需求,在网上也有很多flink消费kafka的例子,但看了一圈也没看到能解决重复消费的问题的文章,于是在flink官网中搜索此类场景的处理方式,发现官网也没有实现flink到mysql的Exactly-Once例子,但是官网却有类似的例子来解决端到端的仅一次消费问题。这个现成的例子就是FlinkKafkaProducer011这个类,它保证了通过FlinkKafkaProducer011发送到kafka的消息是Exactly-Once的,主要的实现方式就是继承了TwoPhaseCommitSinkFunction这个类,关于TwoPhaseCommitSinkFunction这个类的作用可以先看上一篇文章https://blog.51cto.com/simplelife/2401411。
二、实现思想
这里简单说下这个类的作用就是实现这个类的方法:beginTransaction、preCommit、commit、abort,达到事件(preCommit)预提交的逻辑(当事件进行自己的逻辑处理后进行预提交,如果预提交成功之后才进行真正的(commit)提交,如果预提交失败则调用abort方法进行事件的回滚操作),结合flink的checkpoint机制,来保存topic中partition的offset。
达到的效果我举个例子来说明下:比如checkpoint每10s进行一次,此时用FlinkKafkaConsumer011实时消费kafka中的消息,消费并处理完消息后,进行一次预提交数据库的操作,如果预提交没有问题,10s后进行真正的插入数据库操作,如果插入成功,进行一次checkpoint,flink会自动记录消费的offset,可以将checkpoint保存的数据放到hdfs中,如果预提交出错,比如在5s的时候出错了,此时Flink程序就会进入不断的重启中,重启的策略可以在配置中设置,当然下一次的checkpoint也不会做了,checkpoint记录的还是上一次成功消费的offset,本次消费的数据因为在checkpoint期间,消费成功,但是预提交过程中失败了,注意此时数据并没有真正的执行插入操作,因为预提交(preCommit)失败,提交(commit)过程也不会发生了。等你将异常数据处理完成之后,再重新启动这个Flink程序,它会自动从上一次成功的checkpoint中继续消费数据,以此来达到Kafka到Mysql的Exactly-Once。
三、具体实现代码三个类
1、StreamDemoKafka2Mysql.java
package com.fwmagic.flink.streaming; import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; /** * 消费kafka消息,sink(自定义)到mysql中,保证kafka to mysql的Exactly-Once */ @SuppressWarnings("all") public class StreamDemoKafka2Mysql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度 env.setParallelism(1); //checkpoint设置 //每隔10s进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(10000); //设置模式为:exactly_one,仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //确保检查点之间有1s的时间间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); //检查点必须在1分钟之内完成,或者被丢弃【checkpoint超时时间】 env.getCheckpointConfig().setCheckpointTimeout(60000); //同一时间只允许进行一次检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地 env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/")); //设置kafka消费参数 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hd1:9092,hd2:9092,hd3:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1"); //kafka分区自动发现周期 props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000"); /*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/ // FlinkKafkaConsumer011<String> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props); FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>("demo123", new JSONKeyValueDeserializationSchema(true), props); //加入kafka数据源 DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011); //数据传输到下游 streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink"); //触发执行 env.execute(StreamDemoKafka2Mysql.class.getName()); } }
2、MySqlTwoPhaseCommitSink.java
package com.fwmagic.flink.sink; import com.fwmagic.flink.util.DBConnectUtil; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; /** * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。 * 功能:保证kafak to mysql 的Exactly-Once */ public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } /** * 执行数据入库操作 * @param connection * @param objectNode * @param context * @throws Exception */ @Override protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception { System.err.println("start invoke......."); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); System.err.println("===>date:" + date + " " + objectNode); String value = objectNode.get("value").toString(); String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)"; PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, value); ps.setTimestamp(2, new Timestamp(System.currentTimeMillis())); //执行insert语句 ps.execute(); //手动制造异常 if(Integer.parseInt(value) == 15) System.out.println(1/0); } /** * 获取连接,开启手动提交事物(getConnection方法中) * @return * @throws Exception */ @Override protected Connection beginTransaction() throws Exception { System.err.println("start beginTransaction......."); String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true"; Connection connection = DBConnectUtil.getConnection(url, "root", "123456"); return connection; } /** * 预提交,这里预提交的逻辑在invoke方法中 * @param connection * @throws Exception */ @Override protected void preCommit(Connection connection) throws Exception { System.err.println("start preCommit......."); } /** * 如果invoke执行正常则提交事物 * @param connection */ @Override protected void commit(Connection connection) { System.err.println("start commit......."); DBConnectUtil.commit(connection); } /** * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 * @param connection */ @Override protected void abort(Connection connection) { System.err.println("start abort rollback......."); DBConnectUtil.rollback(connection); } }
3、DBConnectUtil.java
package com.fwmagic.flink.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class DBConnectUtil { /** * 获取连接 * * @param url * @param user * @param password * @return * @throws SQLException */ public static Connection getConnection(String url, String user, String password) throws SQLException { Connection conn = null; try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } conn = DriverManager.getConnection(url, user, password); //设置手动提交 conn.setAutoCommit(false); return conn; } /** * 提交事物 */ public static void commit(Connection conn) { if (conn != null) { try { conn.commit(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 事物回滚 * * @param conn */ public static void rollback(Connection conn) { if (conn != null) { try { conn.rollback(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 关闭连接 * * @param conn */ public static void close(Connection conn) { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
四、代码测试
为了方便发送消息,我用一个定时任务每秒发送一个数字,1~20,在1-10秒发送数字为1-10,成功消费数据并将数据入库,在15秒的时候,手动造一个异常,此时数据库中应该只有1-10的数据,10-15的数据并不会插入到数据库中(因为预提交已经失败,不会进行真正的提交)
五、完整代码地址:https://gitee.com/fang_wei/fwmagic-flink