如何通过API构建自动补数据工具

Dataphin版本:2.9.2及以上

需开通OpenAPI模块:Dataphin-OpenAPI(运维)


Dataphin平台提供了补数据功能,当需要补数据时,用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中,有些上游的数据到达的时间晚于预期时间,比如门店的数据延迟几天或者一个月的时间才收集上报,或者上游的数据错误需进行更正时,就需要进行补数据操作。此类操作重复度高,且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤:

  1. 根据需要补数据的节点的特点查询节点:ListNode
  2. 选择需要补数据的节点的下游节点,可利用QueryDagFromPhysicalNode查询下游节点
  3. 选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流:CreatePhysicalNodeSupplement
  4. 查询补数据流下的每个业务日期下对应的DagRun的运行状态:ListSupplementDagrun
  5. 查询补数据工作流下的业务日期的补数据实例并获取实例状态:ListSupplementInstance


请注意,批量补数据时,若不想影响线上正常的周期调度任务,您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... */
/**
 * 搜索节点,对节点及下游补数据,并查看补数据实例,搜索时假设不确定是哪一个节点,
 * 即不指定node_id,对返回的结果取第一个节点补数据(为方便示例,假设该节点存在下游)
 *
 * @date 2021-05-17 17:22
 */
public class SupplementTest {
    private static final String ENV = "PROD";
    private static final Long testProjectId = 1022928L;
    // 实际使用请通过AK/endpoint自行创建API client
    private DataphinAcsClient client = LocalDataphinAcsClient.getTestEnvClient();
    public void supplementTest() throws ClientException {
        //Step1: 根据需要补数据的节点的特点查询节点
        NodeOverview nodeOverview = getOneNode();
        //Step2: 获取需要补数据的下游节点,此处假设获取下两层的下游节点
        int downStreamDepth,
        List<String> downstreamNodes = getDownNode(nodeOverview.getNodeId(), downStreamDepth);
        //Step3:选择需要补数据的节点及其下游节点和需要补数据的业务日期,创建补数据工作流
        String flowId = createSupplementFlow(nodeOverview, downstreamNodes, "2021-05-11", "2021-05-15");
        //Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态
        List<ScheduleDagrun> dagRunList = listSupplementDagrun(flowId);
        //Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态
        //查看 2021-05-11 的 补数据实例
        List<Instance> instances = Lists.newArrayList();
        dagRunList.stream().filter(d -> d.getBizDate().equals("2021-05-11"))
            .findFirst()
            .ifPresent(d -> instances.addAll(listInstanceByDagRun(d.getDagrunId())));
        // 查看实例的状态
        instances.forEach(ins -> System.out.println(ins.getStatus()));
    }
    /**
     * 列出 dagRun下面的实例
     *
     * @param dagRunId dagRunId
     * @return dagRun下面的实例列表
     */
    private List<Instance> listInstanceByDagRun(String dagRunId) {
        ListSupplementInstanceRequest request = new ListSupplementInstanceRequest();
        request.setDagRunId(dagRunId);
        request.setEnv(ENV);
        try {
            ListSupplementInstanceResponse response = client.getAcsResponse(request);
            if(null == response.getData() || response.getData().size() == 0) {
                throw new RuntimeException("not found any instance by dagrun id " + dagRunId);
            }
            return response.getData();
        } catch (ClientException e) {
            throw new RuntimeException(e.getMessage());
        }
    }
    /**
     * 列出补数据的 dagrun,当前例子中,正常应该有5个
     *
     * @param flowId 补数据的工作流的ID
     * @return dagrun 列表
     */
    private List<ScheduleDagrun> listSupplementDagrun(String flowId) throws ClientException {
        ListSupplementDagrunRequest request = new ListSupplementDagrunRequest();
        request.setEnv(ENV);
        request.setFlowId(flowId);
        ListSupplementDagrunResponse response = client.getAcsResponse(request);
        if(response.getData() == null || response.getData().size() == 0) {
            throw new RuntimeException("not found any dagrun by flow id " + flowId);
        }
        return response.getData();
    }
    /**
     * 创建补数据工作流
     *
     * @param nodeOverview    补数据起始节点
     * @param downStreamDepth 补数据的层级深度
     * @return 补数据的工作流ID
     */
    private String createSupplementFlow(NodeOverview nodeOverview, List<String> downstreamNodes, String minPartition, String maxPartition) throws ClientException {
        
        CreatePhysicalNodeSupplementRequest createNodeSupplementRequest = new CreatePhysicalNodeSupplementRequest();
        NodeSupplementCommand nodeSupplementCommand = new NodeSupplementCommand();
        nodeSupplementCommand.setName("open_api_test_20210508_" + System.currentTimeMillis());
        nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
        nodeSupplementCommand.setMinPartition(minPartition);
        nodeSupplementCommand.setMaxPartition(maxPartition);
        nodeSupplementCommand.setParallelism(1);
        nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
        //包含全部下游,或者这里可以选择部分下游
        nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
        createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
        createNodeSupplementRequest.setEnv(ENV);
        CreatePhysicalNodeSupplementResponse createNodeSupplementResponse = client.getAcsResponse(
            createNodeSupplementRequest);
        return createNodeSupplementResponse.getFlowId();
    }
    /**
     * 查询节点的下游
     *
     * @param startNodeId     起始节点
     * @param downStreamDepth 下游层级深度
     * @return 下游节点的ID列表
     */
    private List<String> getDownNode(String startNodeId, int downStreamDepth) throws ClientException {
        QueryDagFromPhysicalNodeRequest queryNodeDagDownStreamRequest = new QueryDagFromPhysicalNodeRequest();
        NodeDagQueryCommand nodeDagQueryDownStreamCommand = new NodeDagQueryCommand();
        nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
        //只搜索向下两层的节点
        nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
        nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
        queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
        queryNodeDagDownStreamRequest.setEnv(ENV);
        QueryDagFromPhysicalNodeResponse queryNodeDagDownStreamResponse =
            client.getAcsResponse(queryNodeDagDownStreamRequest);
        NodeDagInfo nodeDagDownStreamInfo = queryNodeDagDownStreamResponse.getNodeDagInfo();
        List<LogicalNodeInfo> downNodes = nodeDagDownStreamInfo.getNodes();
        if(null == downNodes) {
            return Lists.newArrayListWithCapacity(0);
        }
        return downNodes.stream().map(n -> n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
    }
    /**
     * 列出节点列表
     */
    private NodeOverview getOneNode() throws ClientException {
        ListNodesRequest.PageParam pageParam = new PageParam();
        pageParam.setPageNum(1);
        pageParam.setPageSize(20);
        ListNodesRequest listNodesRequest = new ListNodesRequest();
        ListNodesRequest.NodeQueryCommand nodeQueryCommand = new NodeQueryCommand();
        nodeQueryCommand.setNodeBizType("SCRIPT");
        nodeQueryCommand.setNodeScheduleType("NORMAL");
        listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
        listNodesRequest.setPageParam(pageParam);
        listNodesRequest.setProjectId(testProjectId);
        listNodesRequest.setEnv(ENV);
        ListNodesResponse listNodesResponse = client.getAcsResponse(listNodesRequest);
        PagedData overviewPagedData = listNodesResponse.getPagedNodes();
        List<NodeOverview> nodeOverviewList = overviewPagedData.getData();
        if(null == nodeOverviewList || nodeOverviewList.size() == 0) {
            throw new RuntimeException("not found any node");
        }
        return nodeOverviewList.get(0);
    }
}


点击这里查看Dataphin OpenAPI概览。

上一篇:2020阿里云上云采购季个人新用户专场云服务器配置价格汇总


下一篇:Ext.Ajax.request和formPanel.getForm().submit两种提交方法的异同