flink-connector-starrocks sql写入 starrocks

一,代码开发

1.写入doris的案例代码

public class DorisSqlTest2 {
    private  static final String DORIS_SQL =  "CREATE TABLE doris_test (\n"+
            " order_number STRING,\n" +
            " order_key STRING,\n" +
            " stock_order STRING,\n" +
            " stock_code STRING,\n" +
            " intermediary STRING,\n" +
            " intermediary_name STRING,\n" +
            " intermediary_accountid STRING,\n" +
            " intermediary_phone STRING,\n" +
            " canal_type STRING\n" +
            " ) WITH (\n"+
            "   'connector' = 'doris',\n"+
            "   'doris.host' = '192.168.6.143',\n"+
            "   'doris.port' = '8030',\n"+
            "   'database-name' = 'example_db',\n"+
            "   'table-name' = 'assure_orders2',\n"+
            "   'username' = 'root',\n"+
            "   'password' = 'root',\n"+
            "   'max.batch' = '500'\n"+
            " )";
 
 
 
    private static final String DATA_GEN =  "CREATE TABLE datagen (\n" +
            " id STRING,\n" +
            " name STRING,\n" +
            " user_age INT,\n" +
            " user_other STRING,\n" +
            " ts AS localtimestamp\n" +
            ") WITH (\n" +
            " 'connector' = 'datagen',\n" +
            " 'rows-per-second'='500',\n" +
            " 'fields.id.length'='7',\n" +
            " 'fields.user_age.min'='1',\n" +
            " 'fields.user_age.max'='100',\n" +
            " 'fields.name.length'='2',\n" +
            " 'fields.user_other.length'='10'\n" +
            ")";
 
 
 
    private  static final String KAFKA_SQL = "CREATE TABLE kafka_test (\n" +
           " order_number STRING,\n" +
            " order_key STRING,\n" +
            " stock_order STRING,\n" +
            " stock_code STRING,\n" +
            " intermediary STRING,\n" +
            " intermediary_name STRING,\n" +
            " intermediary_accountid STRING,\n" +
            " intermediary_phone STRING,\n" +
            " canal_type STRING\n" +
            ") WITH (\n" +
            " 'connector' = 'kafka',\n" +
            " 'topic' = 'dwd.rds_core.plateform_stable.assure_orders',\n" +
            " 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',\n" +
            " 'properties.group.id' = 'testGroup',\n" +
            " 'format' = 'json',\n" +
            " 'scan.startup.mode' = 'earliest-offset'\n" +
            ")";
 
    public static void main(String[] args) {
 
//        System.out.println("DORIS_SQL = " + DORIS_SQL);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env);
        bsTableEnv.executeSql(DORIS_SQL);
        bsTableEnv.executeSql(DATA_GEN);
        bsTableEnv.executeSql(KAFKA_SQL);
 
        bsTableEnv.executeSql("insert into doris_test select order_number,order_key,stock_order,stock_code,intermediary,intermediary_name,intermediary_accountid,intermediary_phone,canal_type from kafka_test");
//        bsTableEnv.executeSql("insert into kafkaTable select id,name from datagen");
    }
}

2.指定DorisDynamicTableSourceFactory

创建相对应的路径,并且创建类

flink-connector-starrocks sql写入 starrocks

 

 

 

3.DorisDynamicTableSourceFactory 具体实现

package org.apache.flink.connector.doris.table;
 
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
 
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
 
/**
 * @program: flink-neiwang-dev
 * @description: 1
 * @author: Mr.Wang
 * @create: 2020-11-12 14:04
 **/
public class DorisDynamicTableSourceFactory implements DynamicTableSinkFactory {
    //todo 名称叫doris
    public static final String IDENTIFIER = "doris";
 
    public static final ConfigOption<String> DORIS_HOST = ConfigOptions
            .key("doris.host")
            .stringType()
            .noDefaultValue()
            .withDescription("the doris database url.");
 
 
    public static final ConfigOption<String> DORIS_HTTP_PORT = ConfigOptions
            .key("doris.port")
            .stringType()
            .noDefaultValue()
            .withDescription("the doris database url.");
 
 
    public static final ConfigOption<String> DATABASE_NAME = ConfigOptions
            .key("database-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the database name.");
 
 
    public static final ConfigOption<String> TABLE_NAME = ConfigOptions
            .key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc table name.");
    public static final ConfigOption<String> USERNAME = ConfigOptions
            .key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc user name.");
    public static final ConfigOption<String> PASSWORD = ConfigOptions
            .key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc password.");
 
    public static final ConfigOption<Long> WINDOW_TIME = ConfigOptions
            .key("window")
            .longType()
            .defaultValue(10000L)
            .withDescription("window time.");
 
    public static final ConfigOption<Integer> MAX_BATCH_NUMBER = ConfigOptions
            .key("max.batch")
            .intType()
            .defaultValue(500)
            .withDescription("batch max number.");
 
 
    public static final ConfigOption<String> LOCAL = ConfigOptions
            .key("local")
            .stringType()
            .noDefaultValue()
            .withDescription("run in local or not.");
 
 
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig readableConfig = helper.getOptions();
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 
        //todo 调用 DorisDynamicTableSink ,传入参数
        return new DorisDynamicTableSink(readableConfig, physicalSchema);
 
    }
 
    @Override
    public String factoryIdentifier() {
        //todo 设置名称
        return IDENTIFIER;
    }
 
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
 
 
        //todo 必要的参数
        Set<ConfigOption<?>> requiredOptions = new HashSet();
        requiredOptions.add(DORIS_HOST);
        requiredOptions.add(DORIS_HTTP_PORT);
        requiredOptions.add(DATABASE_NAME);
        requiredOptions.add(TABLE_NAME);
        requiredOptions.add(USERNAME);
        requiredOptions.add(PASSWORD);
        requiredOptions.add(MAX_BATCH_NUMBER);
        return requiredOptions;
 
    }
 
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        //todo 可选参数
        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
        optionalOptions.add(LOCAL);
        return optionalOptions;
    }
}

下面接着就是对应的数据sink函数

4.DorisSinkFunction具体实现

import com.alibaba.fastjson.JSONObject;
import com.sjb.flink.dwd.test.dorisDev_2021_03_01.DorisStreamLoadLocalTest2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
 
 
public class DorisSinkFunction extends RichSinkFunction<RowData> {
    private String[] fieldNames;
    private DataType[] fieldDataTypes;
    private ReadableConfig options;
    private int aa = 0;
    private DorisStreamLoadLocalTest2 dorisStreamLoad;
    private DataTransferRunner dataTransferRunner;
    private transient JSONObject json;
 
 
    public static final int TIMEOUT = 60000;
 
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
 
 
        this.dorisStreamLoad = new DorisStreamLoadLocalTest2(options);
 
        Integer MAX_BATCH_NUMBER = options.get(DorisDynamicTableSourceFactory.MAX_BATCH_NUMBER);
        this.dataTransferRunner = new DataTransferRunner(MAX_BATCH_NUMBER, dorisStreamLoad);
        Thread thread = new Thread(dataTransferRunner);
        thread.start();
    }
 
    public DorisSinkFunction(ReadableConfig readableConfig, DataType[] dataType, String[] fieldNamesArray) {
        this.options = readableConfig;
        this.fieldDataTypes = dataType;
        this.fieldNames = fieldNamesArray;
 
    }
 
 
    @Override
    public void invoke(RowData value, Context context) throws Exception {
 
        try {
            json = new JSONObject();
            for (int i = 0; i < fieldNames.length; i++) {
                String dataType = fieldDataTypes[i].toString();
                String fieldName = fieldNames[i];
 
                if (dataType.toLowerCase().equals("int")) {
                    json.put(fieldName, value.getInt(i));
                } else if (dataType.toLowerCase().equals("long")) {
                    json.put(fieldName, value.getLong(i));
 
                } else if (dataType.toLowerCase().equals("double")) {
                    json.put(fieldName, value.getDouble(i));
                } else if (dataType.toLowerCase().equals("float")) {
                    json.put(fieldName, value.getFloat(i));
                } else {
                    GenericRowData rowData = (GenericRowData) value;
                    if (null != rowData.getField(i)) {
                        json.put(fieldName, rowData.getField(i).toString());
 
                    }
                }
 
            }
 
            aa++;
            if (aa % 1000 ==0){
                System.out.println("计数  = " + aa);
            }
 
 
            dataTransferRunner.sendData(json);
 
            if (dataTransferRunner.hasException()) {
                //todo 任务异常
                System.out.println("%%%%%%%%%%%%%%%%%%%%%%%任务异常。。。");
                Exception exception = dataTransferRunner.getException();
                exception.printStackTrace();
 
            }
 
        } catch (Exception ex) {
            ex.printStackTrace();
            throw new Exception(ex.getMessage() + ",异常,任务失败,时间: " + System.currentTimeMillis());
     /*       String clazzName = this.getClass().getName();
            String class_name = clazzName.substring(clazzName.lastIndexOf('.') + 1,clazzName.lastIndexOf('$'));
            String content = "[" + class_name + "],测试异常:{" + ex.getMessage() + "},任务失败,时间: " + System.currentTimeMillis();
//                DingProd.sendDingRabotTest(content);
            throw new Exception(ex.getMessage() + ",异常,任务失败,时间: " + System.currentTimeMillis());*/
        }
 
    }
 
 
}

注意:

1,能在open()方法里面创建对象,对象初始化的就在open方法里面做。

2,这是跟flink 的底层序列化有关系,遇到的坑就是本地执行没问题,在集群执行有问题,报序列化的问题。

flink-connector-starrocks sql写入 starrocks

 

 

 3,这里是使用的阻塞队列

5.队列类实现

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.sjb.flink.dwd.test.dorisDev_2021_03_01.DorisStreamLoadLocalTest2;
 
 
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
 
public class DataTransferRunner implements Runnable, Serializable {
 
    private static final long serialVersionUID = 1L;
    private volatile boolean continued = true;
    private Exception exception = null;
    private Integer BATCH_SIZE = 500;
    private LinkedBlockingQueue<JSONObject> linkedBlockingQueue = new LinkedBlockingQueue<JSONObject>(BATCH_SIZE * 3);
    private List<JSONObject> tmpList = new ArrayList<JSONObject>(BATCH_SIZE);
 
    private DorisStreamLoadLocalTest2 dorisStreamLoad;
    private  JSONArray jsonArray;
 
    public DataTransferRunner(Integer maxBatch, DorisStreamLoadLocalTest2 dorisStreamLoad) {
        this.dorisStreamLoad = dorisStreamLoad;
        this.BATCH_SIZE = maxBatch;
 
    }
 
    @Override
    public void run() {
 
        while (this.continued) {
            try {
                tmpList.clear();
                this.linkedBlockingQueue.drainTo(tmpList, BATCH_SIZE);
                while (this.continued && tmpList.isEmpty()) {
                    //todo 队列没数据 睡眠
                    Thread.sleep(200);
                    this.linkedBlockingQueue.drainTo(tmpList, BATCH_SIZE);
                }
 
                jsonArray = JSONArray.parseArray(JSON.toJSONString(tmpList));
                dorisStreamLoad.batchInsertDoris(jsonArray.toJSONString());
 
            } catch (final Exception exception) {
                System.out.println("进入异常.............");
                this.exception = exception;
                break;
            }
        }
    }
 
 
    public void sendData(JSONObject value) throws InterruptedException {
 
        int offerTryCount = 0;
        //todo 后端消费不过来 队列写不进去,重试2秒之后 还不能写入,会返回true ,然后进行下面的操作 阻塞{}秒。
        while (!linkedBlockingQueue.offer(value, 1000, TimeUnit.MILLISECONDS)) {
 
            if (++offerTryCount % 10 == 0) {
                //todo 下游消费不过来,代表等待了10秒了,要进行下一步的处理
                this.exception = new RuntimeException("下游消费不过来,代表等待了10秒了,要进行下一步的处理\"");
 
            }
        }
    }
 
    public boolean hasException() {
        return this.exception != null;
    }
 
    public Exception getException() {
        return this.exception;
    }
 
}

6.doris sink代码,这个比较简单。注意传参就ok了。

 
 
import com.sjb.common.exceptions.BizException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.doris.table.DorisDynamicTableSourceFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpVersion;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
 
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
 
 
public class DorisStreamLoadLocalTest2 implements Serializable {
    private static final long serialVersionUID = 1L;
 
 
    private static final String CHARSET = "UTF-8";
 
    private static final Integer HTTP_OK = 200;
 
    private static final String CONTENT_TYPE_NAME = "Content-Type";
 
    private static final String CONTENT_TYPE_JSON = "application/json;charset=UTF-8";
 
    private static final String CONTENT_TYPE_XML = "text/xml;charset=UTF-8";
 
    private static final String CONTENT_TYPE_FORM = "application/x-www-form-urlencoded;charset=UTF-8";
 
    private static final String ACCEPT_NAME = "Accept";
    private static final String ACCEPT = "application/json;charset=UTF-8";
    private static ReadableConfig options;
 
    //这个要弄长点
    public static final int TIMEOUT = 60000;
    private static String loadUrl;
 
    private static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    return true;
                }
            });
 
    private static RequestConfig requestConfig = RequestConfig.custom()
            .setSocketTimeout(TIMEOUT)
            .setConnectTimeout(TIMEOUT)
            .setConnectionRequestTimeout(TIMEOUT)
            .build();
 
 
    public DorisStreamLoadLocalTest2() {
 
    }
 
    public DorisStreamLoadLocalTest2(ReadableConfig ReadableConfig) {
        options = ReadableConfig;
 
        String DORIS_HOST = options.get(DorisDynamicTableSourceFactory.DORIS_HOST);
        String DORIS_HTTP_PORT = options.get(DorisDynamicTableSourceFactory.DORIS_HTTP_PORT);
        String DATABASE_NAME = options.get(DorisDynamicTableSourceFactory.DATABASE_NAME);
        String TABLE_NAME = options.get(DorisDynamicTableSourceFactory.TABLE_NAME);
 
        loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                DORIS_HOST,
                DORIS_HTTP_PORT,
                DATABASE_NAME,
                TABLE_NAME);
    }
 
 
    public Boolean batchInsertDoris(String content) {
 
        CloseableHttpClient client = httpClientBuilder.build();
 
//        String DORIS_HOST = "dev-hadoop-ct7-6-143";
//        String DORIS_HTTP_PORT = "8030";
//        String DATABASE_NAME = "example_db";
//        String TABLE_NAME = "assure_orders2";
//        String USERNAME ="root";
//        String PASSWORD = "root";
 
        String loadResult = "";
        CloseableHttpResponse response = null;
 
 
        try {
 
 
            HttpPut httpPut = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(content, "UTF-8");
            httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
            String USERNAME = options.get(DorisDynamicTableSourceFactory.USERNAME);
            String PASSWORD = options.get(DorisDynamicTableSourceFactory.PASSWORD);
            httpPut.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USERNAME, PASSWORD));
            httpPut.setHeader("strip_outer_array", "true");
            httpPut.setHeader("format", "json");
            httpPut.setHeader("merge_type", "MERGE");
            httpPut.setHeader("delete", "canal_type=\"DELETE\"");
            httpPut.setProtocolVersion(HttpVersion.HTTP_1_0);
            httpPut.addHeader(CONTENT_TYPE_NAME, CONTENT_TYPE_FORM);
            httpPut.addHeader(ACCEPT_NAME, ACCEPT);
//            httpPut.addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
 
 
            //todo 输出参数
            httpPut.setConfig(requestConfig);
            httpPut.setEntity(entity);
            response = client.execute(httpPut);
            HttpEntity httpEntity = response.getEntity();
 
            //todo 调用方法
            if (httpEntity != null) {
                loadResult = EntityUtils.toString(httpEntity);
            }
            final int statusCode = response.getStatusLine().getStatusCode();
 
            EntityUtils.consume(httpEntity);
 
            if (statusCode != 200) {
 
                System.out.println("loadResult = " + loadResult);
                throw new BizException("写入失败,loadResult=" + loadResult);
            } else {
                if (loadResult.contains("OK") && loadResult.contains("Success")) {
                    //todo 正式环境要去掉
                    System.out.println("写入成功,loadResult = " + loadResult);
                    return true;
//                    throw new BizException("[DorisStreamLoadProd]出错,异常信息:[ 1111],具体错误地址: [" + 1111 + "]");
                } else {
                    System.out.println("写入失败,loadResult=" + loadResult);
                    throw new BizException("写入失败,loadResult=" + loadResult);
 
                }
            }
 
 
        } catch (Exception ex) {
            // 数组转字符串(逗号分隔)(推荐)
            String error_info = StringUtils.join(ex.getStackTrace(), "\n");
            throw new BizException("[DorisStreamLoadLocalTest2]出错,异常信息:[" + ex.toString() + "],具体错误地址: [" + error_info + "]");
 
        } finally {
 
            try {
                if (null != response) {
                    response.close();
                }
                if (null != client) {
                    client.close();
                }
 
 
            } catch (Exception ex) {
                String error_info = StringUtils.join(ex.getStackTrace(), "\n");
                throw new BizException("[DorisStreamLoadProd],response关闭异常,异常信息:[" + ex.toString() + "],具体错误地址: [" + error_info + "]");
            }
 
        }
 
 
    }
 
 
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
  
 
}

二,打包细节

1,在集群运行找不到

org.apache.flink.connector.doris.table.DorisDynamicTableSourceFactory

flink-connector-starrocks sql写入 starrocks

 

 

 解决方案,在pom文件加上:

<transformers>
   <transformer                                     
   implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <artifactSet>
                                <excludes>
                                    <exclude>junit:junit</exclude>
                                </excludes>
                            </artifactSet>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
 
                        </configuration>
 
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

将jar包放入flink集群 目录。

三,总结

思路就是模仿flink-mysql-connecter,底层用doris的stream load方式批量写入到doris,我代码里面用了到队列的原因就是:

1,如果上游数据源数据是有界的,某段时候如果没有数据过来的话,会导致最后一批数据没有写入到doris,所以这里要用队列,循环去取,如果没有新数据,要时间过期之后自己触发写入操作,并且要考虑到积压的问题。

 

上一篇:pip 设置代理


下一篇:Flink tableapi数据写入MySQL