如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

本文主要介绍如何通过 Flink 将流式数据实时写入 ADB PG中,并提供一个项目代码demo。

版本说明:
Flink 为社区1.7.2版本。
ADB PG为阿里云AnalyticDB for PostgreSQL 6.0版。

使用方法

使用 Flink 作为流处理计算引擎时,可以通过sink connector,将Flink中的数据写入到目标端 。
本文demo中以socketStream作为源端(data source),ADB PG作为目标端(data sink),并提供ADB PG sink connector代码样例 ,完成数据流式写入。
实际应用中,可能需要更改为对应的source connector ,并修改字段映射关系。

步骤一:在ADBPG目标库中建表

create table test(id int,name text);

步骤二:基于nc工具启动socket stream,并向9000端口写入数据

nc -l 9000
如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

步骤三:启动 flink demo

bin/flink run -c Adb4PgSinkDemo /root/flinktest-1.0-SNAPSHOT-jar-with-dependencies.jar
如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

步骤四:目标端ADB PG观察数据同步情况

如何基于Flink将流式数据实时写入AnalyticDB for PostgreSQL

使用参数和设置说明

source connector 设置

demo的源端采用socket stream,可以参照flink官网换为kafka/ES/Hadoop 等source connector。
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/
对于不同的source stream一个通用的要求是每条数据的格式为:各列以英文逗号为分隔符拼接成单行文本。
例如我们想写入一条id=1,name=hello的数据,格式应转换为:

1,hello

sink connector 设置

sink端需要传入primaryKeys(主键字段), fieldNames(全部字段名), types(全部字段类型)三个参数来向ADB PG sink connector传入数据元信息,并传入自己的host address(格式为jdbc:postgresql://host:port/dbName),tableName,userName,password来初始化一个Adb4PgTableSink。

ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
MySchema schema = new MySchema(primaryKeys, fieldNames, types);

DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "your-password", schema, schema.getPrimaryKeys()));

附:demo代码

完整项目代码请参考 链接

demo 主流程代码如下

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.*;

public class Adb4PgSinkDemo extends RichSinkFunction<Row>{

    private transient static final Logger LOG = LoggerFactory.getLogger(Adb4PgSinkDemo.class);

    public static void main(String[] args) throws Exception {
        //设置环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //Source: 本地监听端口9000获取数据
        DataStream<String> sourceStream = env.socketTextStream("127.0.0.1", 9000);

        ArrayList<String> primaryKeys = new ArrayList<String>(Arrays.asList(new String[]{"id"}));
        ArrayList<String> fieldNames = new ArrayList<String>(Arrays.asList(new String[]{"id", "name"}));
        ArrayList<Class<?>> types = new ArrayList<Class<?>>(Arrays.asList(new Class<?>[]{Integer.class, String.class}));
        MySchema schema = new MySchema(primaryKeys, fieldNames, types);

        DataStream<Row> messageStream = sourceStream.map(new InputMap(schema));

        messageStream.addSink(new Adb4PgTableSink("jdbc:postgresql://gp-****.gpdb.rds.aliyuncs.com:3432/your-dbname", "your-tablename", "your-username", "password", schema, schema.getPrimaryKeys()));
        env.execute("Example");
    }

    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Row> {
        private static final long serialVersionUID = 1L;

        MySchema schema;

        public InputMap(MySchema schema) {
            this.schema = schema;
        }

        //@Override
        public Row map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.split(",");
            int columnLen = this.schema.getLength();
            if (arr.length == columnLen) {
                Row row = new Row(columnLen);
                for(int i = 0; i < columnLen; i++){
                    row.setField(i, arr[i]);
                }
                return row;
            }
            return null;
        }
    }

    public static class MySchema implements Serializable {
        private static final long serialVersionUID = 10L;
        private ArrayList<String> primaryKeys;
        private ArrayList<String> fieldNames;
        private ArrayList<Class<?>> types;
        private int length;
        public MySchema(ArrayList<String> primaryKeys, ArrayList<String> fieldNames, ArrayList<Class<?>> types){
            this.primaryKeys = primaryKeys;
            this.fieldNames = fieldNames;
            this.types = types;
            length = fieldNames.size();
        }
        public ArrayList<String> getPrimaryKeys() {
            return primaryKeys;
        }
        public int getLength(){
            return length;
        }
        public ArrayList<String> getFieldNames() {
            return fieldNames;
        }
        public ArrayList<Class<?>> getTypes() {
            return types;
        }
        public int getFieldIndex(String key) {
            for(int i=0 ; i < fieldNames.size();i++) {
                if (key.equalsIgnoreCase(fieldNames.get(i))) {
                    return i;
                }
            }
            return -1;
        }
    }

}
上一篇:每天五分钟linux(5)-rm


下一篇:阿里云RDS for PostgreSQL用户如何定制数据库参数