一,代码开发
1.写入doris的案例代码
public class DorisSqlTest2 { private static final String DORIS_SQL = "CREATE TABLE doris_test (\n"+ " order_number STRING,\n" + " order_key STRING,\n" + " stock_order STRING,\n" + " stock_code STRING,\n" + " intermediary STRING,\n" + " intermediary_name STRING,\n" + " intermediary_accountid STRING,\n" + " intermediary_phone STRING,\n" + " canal_type STRING\n" + " ) WITH (\n"+ " 'connector' = 'doris',\n"+ " 'doris.host' = '192.168.6.143',\n"+ " 'doris.port' = '8030',\n"+ " 'database-name' = 'example_db',\n"+ " 'table-name' = 'assure_orders2',\n"+ " 'username' = 'root',\n"+ " 'password' = 'root',\n"+ " 'max.batch' = '500'\n"+ " )"; private static final String DATA_GEN = "CREATE TABLE datagen (\n" + " id STRING,\n" + " name STRING,\n" + " user_age INT,\n" + " user_other STRING,\n" + " ts AS localtimestamp\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='500',\n" + " 'fields.id.length'='7',\n" + " 'fields.user_age.min'='1',\n" + " 'fields.user_age.max'='100',\n" + " 'fields.name.length'='2',\n" + " 'fields.user_other.length'='10'\n" + ")"; private static final String KAFKA_SQL = "CREATE TABLE kafka_test (\n" + " order_number STRING,\n" + " order_key STRING,\n" + " stock_order STRING,\n" + " stock_code STRING,\n" + " intermediary STRING,\n" + " intermediary_name STRING,\n" + " intermediary_accountid STRING,\n" + " intermediary_phone STRING,\n" + " canal_type STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'dwd.rds_core.plateform_stable.assure_orders',\n" + " 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'earliest-offset'\n" + ")"; public static void main(String[] args) { // System.out.println("DORIS_SQL = " + DORIS_SQL); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env); bsTableEnv.executeSql(DORIS_SQL); bsTableEnv.executeSql(DATA_GEN); bsTableEnv.executeSql(KAFKA_SQL); bsTableEnv.executeSql("insert into doris_test select order_number,order_key,stock_order,stock_code,intermediary,intermediary_name,intermediary_accountid,intermediary_phone,canal_type from kafka_test"); // bsTableEnv.executeSql("insert into kafkaTable select id,name from datagen"); } }
2.指定DorisDynamicTableSourceFactory
创建相对应的路径,并且创建类
3.DorisDynamicTableSourceFactory 具体实现
package org.apache.flink.connector.doris.table; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; import java.time.Duration; import java.util.HashSet; import java.util.Set; /** * @program: flink-neiwang-dev * @description: 1 * @author: Mr.Wang * @create: 2020-11-12 14:04 **/ public class DorisDynamicTableSourceFactory implements DynamicTableSinkFactory { //todo 名称叫doris public static final String IDENTIFIER = "doris"; public static final ConfigOption<String> DORIS_HOST = ConfigOptions .key("doris.host") .stringType() .noDefaultValue() .withDescription("the doris database url."); public static final ConfigOption<String> DORIS_HTTP_PORT = ConfigOptions .key("doris.port") .stringType() .noDefaultValue() .withDescription("the doris database url."); public static final ConfigOption<String> DATABASE_NAME = ConfigOptions .key("database-name") .stringType() .noDefaultValue() .withDescription("the database name."); public static final ConfigOption<String> TABLE_NAME = ConfigOptions .key("table-name") .stringType() .noDefaultValue() .withDescription("the jdbc table name."); public static final ConfigOption<String> USERNAME = ConfigOptions .key("username") .stringType() .noDefaultValue() .withDescription("the jdbc user name."); public static final ConfigOption<String> PASSWORD = ConfigOptions .key("password") .stringType() .noDefaultValue() .withDescription("the jdbc password."); public static final ConfigOption<Long> WINDOW_TIME = ConfigOptions .key("window") .longType() .defaultValue(10000L) .withDescription("window time."); public static final ConfigOption<Integer> MAX_BATCH_NUMBER = ConfigOptions .key("max.batch") .intType() .defaultValue(500) .withDescription("batch max number."); public static final ConfigOption<String> LOCAL = ConfigOptions .key("local") .stringType() .noDefaultValue() .withDescription("run in local or not."); @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); ReadableConfig readableConfig = helper.getOptions(); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); //todo 调用 DorisDynamicTableSink ,传入参数 return new DorisDynamicTableSink(readableConfig, physicalSchema); } @Override public String factoryIdentifier() { //todo 设置名称 return IDENTIFIER; } @Override public Set<ConfigOption<?>> requiredOptions() { //todo 必要的参数 Set<ConfigOption<?>> requiredOptions = new HashSet(); requiredOptions.add(DORIS_HOST); requiredOptions.add(DORIS_HTTP_PORT); requiredOptions.add(DATABASE_NAME); requiredOptions.add(TABLE_NAME); requiredOptions.add(USERNAME); requiredOptions.add(PASSWORD); requiredOptions.add(MAX_BATCH_NUMBER); return requiredOptions; } @Override public Set<ConfigOption<?>> optionalOptions() { //todo 可选参数 Set<ConfigOption<?>> optionalOptions = new HashSet<>(); optionalOptions.add(LOCAL); return optionalOptions; } }
下面接着就是对应的数据sink函数
4.DorisSinkFunction具体实现
import com.alibaba.fastjson.JSONObject; import com.sjb.flink.dwd.test.dorisDev_2021_03_01.DorisStreamLoadLocalTest2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; public class DorisSinkFunction extends RichSinkFunction<RowData> { private String[] fieldNames; private DataType[] fieldDataTypes; private ReadableConfig options; private int aa = 0; private DorisStreamLoadLocalTest2 dorisStreamLoad; private DataTransferRunner dataTransferRunner; private transient JSONObject json; public static final int TIMEOUT = 60000; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.dorisStreamLoad = new DorisStreamLoadLocalTest2(options); Integer MAX_BATCH_NUMBER = options.get(DorisDynamicTableSourceFactory.MAX_BATCH_NUMBER); this.dataTransferRunner = new DataTransferRunner(MAX_BATCH_NUMBER, dorisStreamLoad); Thread thread = new Thread(dataTransferRunner); thread.start(); } public DorisSinkFunction(ReadableConfig readableConfig, DataType[] dataType, String[] fieldNamesArray) { this.options = readableConfig; this.fieldDataTypes = dataType; this.fieldNames = fieldNamesArray; } @Override public void invoke(RowData value, Context context) throws Exception { try { json = new JSONObject(); for (int i = 0; i < fieldNames.length; i++) { String dataType = fieldDataTypes[i].toString(); String fieldName = fieldNames[i]; if (dataType.toLowerCase().equals("int")) { json.put(fieldName, value.getInt(i)); } else if (dataType.toLowerCase().equals("long")) { json.put(fieldName, value.getLong(i)); } else if (dataType.toLowerCase().equals("double")) { json.put(fieldName, value.getDouble(i)); } else if (dataType.toLowerCase().equals("float")) { json.put(fieldName, value.getFloat(i)); } else { GenericRowData rowData = (GenericRowData) value; if (null != rowData.getField(i)) { json.put(fieldName, rowData.getField(i).toString()); } } } aa++; if (aa % 1000 ==0){ System.out.println("计数 = " + aa); } dataTransferRunner.sendData(json); if (dataTransferRunner.hasException()) { //todo 任务异常 System.out.println("%%%%%%%%%%%%%%%%%%%%%%%任务异常。。。"); Exception exception = dataTransferRunner.getException(); exception.printStackTrace(); } } catch (Exception ex) { ex.printStackTrace(); throw new Exception(ex.getMessage() + ",异常,任务失败,时间: " + System.currentTimeMillis()); /* String clazzName = this.getClass().getName(); String class_name = clazzName.substring(clazzName.lastIndexOf('.') + 1,clazzName.lastIndexOf('$')); String content = "[" + class_name + "],测试异常:{" + ex.getMessage() + "},任务失败,时间: " + System.currentTimeMillis(); // DingProd.sendDingRabotTest(content); throw new Exception(ex.getMessage() + ",异常,任务失败,时间: " + System.currentTimeMillis());*/ } } }
注意:
1,能在open()方法里面创建对象,对象初始化的就在open方法里面做。
2,这是跟flink 的底层序列化有关系,遇到的坑就是本地执行没问题,在集群执行有问题,报序列化的问题。
3,这里是使用的阻塞队列
5.队列类实现
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.sjb.flink.dwd.test.dorisDev_2021_03_01.DorisStreamLoadLocalTest2; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class DataTransferRunner implements Runnable, Serializable { private static final long serialVersionUID = 1L; private volatile boolean continued = true; private Exception exception = null; private Integer BATCH_SIZE = 500; private LinkedBlockingQueue<JSONObject> linkedBlockingQueue = new LinkedBlockingQueue<JSONObject>(BATCH_SIZE * 3); private List<JSONObject> tmpList = new ArrayList<JSONObject>(BATCH_SIZE); private DorisStreamLoadLocalTest2 dorisStreamLoad; private JSONArray jsonArray; public DataTransferRunner(Integer maxBatch, DorisStreamLoadLocalTest2 dorisStreamLoad) { this.dorisStreamLoad = dorisStreamLoad; this.BATCH_SIZE = maxBatch; } @Override public void run() { while (this.continued) { try { tmpList.clear(); this.linkedBlockingQueue.drainTo(tmpList, BATCH_SIZE); while (this.continued && tmpList.isEmpty()) { //todo 队列没数据 睡眠 Thread.sleep(200); this.linkedBlockingQueue.drainTo(tmpList, BATCH_SIZE); } jsonArray = JSONArray.parseArray(JSON.toJSONString(tmpList)); dorisStreamLoad.batchInsertDoris(jsonArray.toJSONString()); } catch (final Exception exception) { System.out.println("进入异常............."); this.exception = exception; break; } } } public void sendData(JSONObject value) throws InterruptedException { int offerTryCount = 0; //todo 后端消费不过来 队列写不进去,重试2秒之后 还不能写入,会返回true ,然后进行下面的操作 阻塞{}秒。 while (!linkedBlockingQueue.offer(value, 1000, TimeUnit.MILLISECONDS)) { if (++offerTryCount % 10 == 0) { //todo 下游消费不过来,代表等待了10秒了,要进行下一步的处理 this.exception = new RuntimeException("下游消费不过来,代表等待了10秒了,要进行下一步的处理\""); } } } public boolean hasException() { return this.exception != null; } public Exception getException() { return this.exception; } }
6.doris sink代码,这个比较简单。注意传参就ok了。
import com.sjb.common.exceptions.BizException; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.doris.table.DorisDynamicTableSourceFactory; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpVersion; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import java.io.Serializable; import java.nio.charset.StandardCharsets; public class DorisStreamLoadLocalTest2 implements Serializable { private static final long serialVersionUID = 1L; private static final String CHARSET = "UTF-8"; private static final Integer HTTP_OK = 200; private static final String CONTENT_TYPE_NAME = "Content-Type"; private static final String CONTENT_TYPE_JSON = "application/json;charset=UTF-8"; private static final String CONTENT_TYPE_XML = "text/xml;charset=UTF-8"; private static final String CONTENT_TYPE_FORM = "application/x-www-form-urlencoded;charset=UTF-8"; private static final String ACCEPT_NAME = "Accept"; private static final String ACCEPT = "application/json;charset=UTF-8"; private static ReadableConfig options; //这个要弄长点 public static final int TIMEOUT = 60000; private static String loadUrl; private static HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { return true; } }); private static RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(TIMEOUT) .setConnectTimeout(TIMEOUT) .setConnectionRequestTimeout(TIMEOUT) .build(); public DorisStreamLoadLocalTest2() { } public DorisStreamLoadLocalTest2(ReadableConfig ReadableConfig) { options = ReadableConfig; String DORIS_HOST = options.get(DorisDynamicTableSourceFactory.DORIS_HOST); String DORIS_HTTP_PORT = options.get(DorisDynamicTableSourceFactory.DORIS_HTTP_PORT); String DATABASE_NAME = options.get(DorisDynamicTableSourceFactory.DATABASE_NAME); String TABLE_NAME = options.get(DorisDynamicTableSourceFactory.TABLE_NAME); loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", DORIS_HOST, DORIS_HTTP_PORT, DATABASE_NAME, TABLE_NAME); } public Boolean batchInsertDoris(String content) { CloseableHttpClient client = httpClientBuilder.build(); // String DORIS_HOST = "dev-hadoop-ct7-6-143"; // String DORIS_HTTP_PORT = "8030"; // String DATABASE_NAME = "example_db"; // String TABLE_NAME = "assure_orders2"; // String USERNAME ="root"; // String PASSWORD = "root"; String loadResult = ""; CloseableHttpResponse response = null; try { HttpPut httpPut = new HttpPut(loadUrl); StringEntity entity = new StringEntity(content, "UTF-8"); httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); String USERNAME = options.get(DorisDynamicTableSourceFactory.USERNAME); String PASSWORD = options.get(DorisDynamicTableSourceFactory.PASSWORD); httpPut.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USERNAME, PASSWORD)); httpPut.setHeader("strip_outer_array", "true"); httpPut.setHeader("format", "json"); httpPut.setHeader("merge_type", "MERGE"); httpPut.setHeader("delete", "canal_type=\"DELETE\""); httpPut.setProtocolVersion(HttpVersion.HTTP_1_0); httpPut.addHeader(CONTENT_TYPE_NAME, CONTENT_TYPE_FORM); httpPut.addHeader(ACCEPT_NAME, ACCEPT); // httpPut.addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE); //todo 输出参数 httpPut.setConfig(requestConfig); httpPut.setEntity(entity); response = client.execute(httpPut); HttpEntity httpEntity = response.getEntity(); //todo 调用方法 if (httpEntity != null) { loadResult = EntityUtils.toString(httpEntity); } final int statusCode = response.getStatusLine().getStatusCode(); EntityUtils.consume(httpEntity); if (statusCode != 200) { System.out.println("loadResult = " + loadResult); throw new BizException("写入失败,loadResult=" + loadResult); } else { if (loadResult.contains("OK") && loadResult.contains("Success")) { //todo 正式环境要去掉 System.out.println("写入成功,loadResult = " + loadResult); return true; // throw new BizException("[DorisStreamLoadProd]出错,异常信息:[ 1111],具体错误地址: [" + 1111 + "]"); } else { System.out.println("写入失败,loadResult=" + loadResult); throw new BizException("写入失败,loadResult=" + loadResult); } } } catch (Exception ex) { // 数组转字符串(逗号分隔)(推荐) String error_info = StringUtils.join(ex.getStackTrace(), "\n"); throw new BizException("[DorisStreamLoadLocalTest2]出错,异常信息:[" + ex.toString() + "],具体错误地址: [" + error_info + "]"); } finally { try { if (null != response) { response.close(); } if (null != client) { client.close(); } } catch (Exception ex) { String error_info = StringUtils.join(ex.getStackTrace(), "\n"); throw new BizException("[DorisStreamLoadProd],response关闭异常,异常信息:[" + ex.toString() + "],具体错误地址: [" + error_info + "]"); } } } private String basicAuthHeader(String username, String password) { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } }
二,打包细节
1,在集群运行找不到
org.apache.flink.connector.doris.table.DorisDynamicTableSourceFactory
解决方案,在pom文件加上:
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <artifactSet> <excludes> <exclude>junit:junit</exclude> </excludes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
将jar包放入flink集群 目录。
三,总结
思路就是模仿flink-mysql-connecter,底层用doris的stream load方式批量写入到doris,我代码里面用了到队列的原因就是:
1,如果上游数据源数据是有界的,某段时候如果没有数据过来的话,会导致最后一批数据没有写入到doris,所以这里要用队列,循环去取,如果没有新数据,要时间过期之后自己触发写入操作,并且要考虑到积压的问题。