DataHub通过DataConnector流转到MaxCompute全链路测试

概述

前面通过博客:流数据同步DataConnector测试整理简要介绍了DataConnector的配置。下面通过一个示例,从maxcompute建表开始,介绍整个链路的实现。

实验目的

使用Datahub SDK写入数据到Topic,进而经过Dataconnector推送数据到maxcompute的数据表。

实验步骤

1、Dataworks建表SQL脚本

CREATE TABLE IF NOT EXISTS ods_log_tracker( ip STRING COMMENT 'client ip address', user STRING, accesstime string, method STRING COMMENT 'HTTP request type, such as GET POST...', url STRING, protocol STRING, status BIGINT COMMENT 'HTTP reponse code from server', byte_cnt BIGINT, referer STRING, agent STRING) PARTITIONED BY(dt STRING);

DataHub通过DataConnector流转到MaxCompute全链路测试

2、Datahub控制台创建Topic,并关联创建DataConnector

DataHub通过DataConnector流转到MaxCompute全链路测试
DataHub通过DataConnector流转到MaxCompute全链路测试

关于分区选择细节参考链接

3、创建效果
DataHub通过DataConnector流转到MaxCompute全链路测试

4、Java SDK发送信息到Topic

  • pom.xml
        <dependency>
            <groupId>com.aliyun.datahub</groupId>
            <artifactId>aliyun-sdk-datahub</artifactId>
            <version>2.13.0-public</version>
        </dependency>
  • Java Code Sample
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.*;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.*;
import java.util.ArrayList;
import java.util.List;

public class SendData1 {

    public static void main(String[] args) {

        // Endpoint以Region: 华北2为例,其他Region请按实际情况填写
        String endpoint = "http://dh-cn-beijing.aliyuncs.com";
        String accessId = "********";
        String accessKey = "********";
        String projectName = "odpsdemo";  // project项目名称
        String topicName = "ods_log_tracker";  // topic名称
        String shardId = "0";  // 分区ID
        // 创建DataHubClient实例
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig(endpoint,
                                // 是否开启二进制传输,服务端2.12版本开始支持
                                new AliyunAccount(accessId, accessKey), true))
                //专有云使用出错尝试将参数设置为       false
                // HttpConfig可不设置,不设置时采用默认值
                .setHttpConfig(new HttpConfig().setConnTimeout(10000))
                .build();

        // 写入Tuple型数据
        RecordSchema recordSchema = datahubClient.getTopic(projectName, topicName).getRecordSchema();
        // 生成100条数据
        List<RecordEntry> recordEntries = new ArrayList<>();
            for (int i = 0; i < 100; ++i) {
                RecordEntry recordEntry = new RecordEntry();
                TupleRecordData data = new TupleRecordData(recordSchema);
                data.setField("ip","ip");
                data.setField("user","user");
                data.setField("accesstime","accesstime");
                data.setField("method","method");
                data.setField("url","url");
                data.setField("protocol","protocol");
                data.setField("referer","referer");
                data.setField("agent","agent");
                data.setField("dt","dt");
                data.setField("status",1L);
                data.setField("byte_cnt",100L);

                recordEntry.setRecordData(data);
                recordEntry.setShardId(shardId);
                recordEntries.add(recordEntry);
            }
            try {
                // 服务端从2.12版本开始支持,之前版本请使用putRecords接口
                //datahubClient.putRecordsByShard(Constant.projectName, Constant.topicName, shardId, recordEntries);
                datahubClient.putRecords(projectName, topicName, recordEntries);
                System.out.println("write data successful");
            } catch (InvalidParameterException e) {
                System.out.println("invalid parameter, please check your parameter");
                System.exit(1);
            } catch (AuthorizationFailureException e) {
                System.out.println("AK error, please check your accessId and accessKey");
                System.exit(1);
            } catch (ResourceNotFoundException e) {
                System.out.println("project or topic or shard not found");
                System.exit(1);
            } catch (ShardSealedException e) {
                System.out.println("shard status is CLOSED, can not write");
                System.exit(1);
            } catch (DatahubClientException e) {
                System.out.println("other error");
                System.out.println(e);
                System.exit(1);
            }
    }
}
  • 写入Topic情况查看
    DataHub通过DataConnector流转到MaxCompute全链路测试

4、数据流转情况查看
DataHub通过DataConnector流转到MaxCompute全链路测试
DataHub通过DataConnector流转到MaxCompute全链路测试

参考链接

DataHub Java SDK介绍
DataWorks 创建业务流程

上一篇:阿里云新版人脸识别综述


下一篇:通过OSS直接上传数据到DSW实例