数据中台之数据血缘的具体实现

目录

前言

前置知识

Hive Hook

?Neo4j基础入门

数据血缘数据结构设计

数据录入测试代码

主逻辑实现

参考文章


前言

数据中台之元数据管理系统的搭建一文中我们提到了自己实现数据血缘,本文就这个话题进行详细展开。如果采集后的血缘存储在mysql等传统数据中,随着采集sql的增多很快会出现性能瓶颈,而且不易查询。所以本文基于hive hook和图数据库neo4j来实现数据血缘。基本思路是在hive的执行引擎操作完成之后利用hook进行拦截,判断是否是有效的生成血缘的sql语句,是的话记录发送一条kafka消息,并且在消费kafka消息时入库到mysql和neo4j中。mysql也存一份是为了进行数据备份,防止neo4j宕机引起的数据丢失,同时可以起到对重复提交的sql进行过滤。

?

前置知识

Hive Hook

其实Apache Atlas对于Hive的元数据管理,使用的是Hive的Hooks

Pre-semantic-analyzer hooks:在Hive在查询字符串上运行语义分析器之前调用。

Post-semantic-analyzer hooks:在Hive在查询字符串上运行语义分析器之后调用。

Pre-driver-run hooks:在driver执行查询之前调用。

Post-driver-run hooks:在driver执行查询之后调用。

Pre-execution hooks:在执行引擎执行查询之前调用。请注意,这个目的是此时已经为Hive准备了一个优化的查询计划。

Post-execution hooks:在查询执行完成之后以及将结果返回给用户之前调用。

Failure-execution hooks:当查询执行失败时调用。

下面看下我们将会用到的核心类?

ExecuteWithHookContext可以实现3种类型的hook,分别是

pre-execution 在执行引擎执行查询之前调用

post-execution 在执行引擎执行查询之后调用

execution-failure 在执行引擎执行查询失败之后调用

public class MyExecuteWithHookContext implements ExecuteWithHookContext {


    public void run(HookContext hookContext) throws Exception {

        if(hookContext.getHookType()==HookContext.HookType.PRE_EXEC_HOOK){

        }else if(hookContext.getHookType()==HookContext.HookType.POST_EXEC_HOOK){
 

        }else if(hookContext.getHookType()==HookContext.HookType.ON_FAILURE_HOOK){


        }

 
    }
 
}

?Neo4j基础入门

Neo4j是一个高性能的,NOSQL图形数据库,它将结构化数据存储在网络上而不是表中。它是一个嵌入式的基于磁盘的、具备完全的事务特性的Java持久化引擎,但是它将结构化数据存储在网络(从数学角度叫做图)上而不是表中。Neo4j也可以被看作是一个高性能的图引擎,该引擎具有成熟数据库的所有特性。程序员工作在一个面向对象的、灵活的网络结构下而不是严格、静态的表中——但是他们可以享受到具备完全的事务特性、企业级的数据库的所有好处。

下面介绍一些基础概念名词(见图1)

1. 标签(Label)

Neo4j中,一个节点可以有一个以上的标签,从现实世界的角度去看,一个标签可以认为节点的某个类别,比如BOOKMOVIE等等。

2. 节点(Node)

节点是指一个实实在在的对象,这个对象可以有好多的标签,表示对象的种类,也可以有好多的属性,描述其特征,节点与节点之间还可以形成多个有方向(或者没有方向)的关系。

3. 关系(Relationship)

用来描述节点与节点之间的关系,关系可以拥有属性。

4. 属性(Property)

描述节点的特性,采用的是Key-Value结构,可以随意设定来描述节点的特征。

常用的查询语法(CQL)

1.查找指定节点、指定属性、指定关系的节点、关系

# MATCH 匹配命令
# return 后面的别名p还可以利用as 设置指定的返回值名称,如 p as userName

match (p:PERSON {name:"Mask"})-[r]-(n) return p,r,n

2. 对查找结果进行排序order by,并限制返回条数 limit

order by关键字与SQL里面是一样的操作,后面跟上需要根据排序的关键字,limit的操作是指定输出前几条
# 这里利用order by来指定返回按照Person.name来排序
# limit 表示只返回前3条数据
match(p:Person) return p order by p.name limit 3

?3.删除节点delete命令

# 删除指定条件的节点 # 先通过匹配关键字match找到匹配元素,然后通过delete关键字指定删除 
match(p:PERSON {name:"teacher_wange"}) delete p 
# 删除节点和节点相关的关系 
match (p:Person {name:"lisi"})-[r]-() delete p,r

数据血缘数据结构设计

neo4j

import org.neo4j.ogm.annotation.*;

@RelationshipEntity(type = "blood")
public class Blood {

    public Blood() {// 从 Neo4j API 2.0.5开始需要无参构造函数
    }


    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private String fromKey;
    private String toKey;
    private String fromTable;
    private String toTable;

    @StartNode // 关系开始的节点
    private TableNode formNode;
    @EndNode // 关系目标的节点
    private TableNode toNode;


    public Blood(String fromKey, String toKey, String fromTable, String toTable) {
        this.fromKey = fromKey;
        this.toKey = toKey;
        this.fromTable = fromTable;
        this.toTable = toTable;
        this.name=fromTable+"->"+toTable;
    }

   //get and set
   ...
}
import org.neo4j.ogm.annotation.GeneratedValue;
import org.neo4j.ogm.annotation.Id;
import org.neo4j.ogm.annotation.NodeEntity;
import java.io.Serializable;

@NodeEntity
public class TableNode implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String tableName;

    public TableNode( ) {// 从 Neo4j API 2.0.5开始需要无参构造函数
    }
  
    //get and set
     ...
 
}

mysql

public class DataBlood {

    private Long id;
    private String fromTable;
    private String toTable;
    private String fromKey;
    private String toKey;
    private String sqls;

     //get and set
     ...
}

数据录入测试代码

<properties>
	  <ogm.properties>ogm-bolt.properties</ogm.properties>
</properties>


<dependency>
	 <groupId>mysql</groupId>
	 <artifactId>mysql-connector-java</artifactId>
	 <version>5.1.46</version>
</dependency>


<dependency>
	 <groupId>org.mybatis</groupId>
	 <artifactId>mybatis</artifactId>
	 <version>3.5.3</version>
</dependency>

<!-- Neo4j OGM -->
<dependency>
	<groupId>org.neo4j</groupId>
	<artifactId>neo4j-ogm-core</artifactId>
	<version>${neo4j.ogm.version}</version>
</dependency>

<dependency>
	<groupId>org.neo4j</groupId>
	<artifactId>neo4j-ogm-api</artifactId>
	<version>${neo4j.ogm.version}</version>
</dependency>

<dependency>
	<groupId>org.neo4j</groupId>
	<artifactId>neo4j-ogm-bolt-driver</artifactId>
	<version>${neo4j.ogm.version}</version>
</dependency>

<dependency>
	<groupId>org.neo4j</groupId>
	<artifactId>neo4j-ogm-test</artifactId>
	<version>${neo4j.ogm.version}</version>
	<scope>test</scope>
	<exclusions>
		<exclusion>
			<groupId>org.neo4j</groupId>
			<artifactId>neo4j-security-enterprise</artifactId>
		</exclusion>
	</exclusions>
</dependency>
import com.test.mapper.DataBloodMapper;
import com.test.po.Blood;
import com.test.po.DataBlood;
import com.test.po.TableNode;
import com.test.util.JsonUtil;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.ogm.config.*;
import org.neo4j.ogm.session.Session;
import org.neo4j.ogm.session.SessionFactory;

import java.io.IOException;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class HiveHookTest{


    private Session neo4jSession;


    private static SqlSession sqlSession;


    @Before
    public void init() throws IOException {


        neo4j/
        ConfigurationSource configurationSource = new ClasspathConfigurationSource("application.properties");

        Properties properties = configurationSource.properties();

        String uri = properties.get("spring.data.neo4j.uri") == null ? null : properties.get("spring.data.neo4j.uri").toString();
        String username = properties.get("spring.data.neo4j.username") == null ? null : properties.get("spring.data.neo4j.username").toString();
        String password = properties.get("spring.data.neo4j.password") == null ? null : properties.get("spring.data.neo4j.password").toString();

        Configuration config = new Configuration.Builder()
                .uri(uri)
                .credentials(username, password)
                .build();
        SessionFactory sessionFactory = new SessionFactory(config, "com.dataqin.po");
        neo4jSession = sessionFactory.openSession();


        mybatis/
        Reader reader = null;
        reader = Resources.getResourceAsReader("mybatis-config.xml");
        SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader);
        sqlSession = sqlSessionFactory.openSession();


    }


    @Test
    public void testSaveTableNode() throws Exception {


        TableNode rootTable = new TableNode("rootTable");
        TableNode tablename1 = new TableNode("tablename1");
        TableNode tablename2 = new TableNode("tablename2");
        TableNode tablename3 = new TableNode("tablename3");

        neo4jSession.save(rootTable);
        neo4jSession.save(tablename1);
        neo4jSession.save(tablename2);
        neo4jSession.save(tablename3);


        Blood c1 = new Blood("test.rootTable", "test.tablename1", "rootTable", "tablename1");
        c1.setFormNode(rootTable);
        c1.setToNode(tablename1);


        Blood c2 = new Blood("test.rootTable", "test.tablename2", "rootTable", "tablename2");
        c2.setFormNode(rootTable);
        c2.setToNode(tablename2);


        Blood c3 = new Blood("test.tablename2", "test.tablename3", "tablename2", "tablename3");
        c3.setFormNode(tablename2);
        c3.setToNode(tablename3);


        neo4jSession.save(c1);
        neo4jSession.save(c2);
        neo4jSession.save(c3);


    }



    @Test
    public void saveDataBlood() {

        DataBlood dataBlood = new DataBlood();
        dataBlood.setFromKey("test.rootTable");
        dataBlood.setToKey("test.tablename1");
        dataBlood.setFromTable("rootTable");
        dataBlood.setToTable("tablename1");
        dataBlood.setToKey("rootTable");
        dataBlood.setSqls("select * from a");


        // 获取UserMapper接口
        try {
            DataBloodMapper dataBloodMapper = sqlSession.getMapper(DataBloodMapper.class);
            System.out.println(dataBloodMapper.save(dataBlood));
            sqlSession.commit();
        } finally {
            // 关闭SqlSession
            sqlSession.close();
        }

    }

}

执行上述的testSaveTableNode后neo4j生成如下图

数据中台之数据血缘的具体实现图1

主逻辑实现

import com.dataqin.util.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;


public class MyExecuteWithHookContext implements ExecuteWithHookContext {


    /**
     * ExecuteWithHookContext可以实现3种类型的hook,分别是
     * pre-execution 在执行引擎执行查询之前调用
     * post-execution 在执行引擎执行查询之后调用
     * execution-failure 在执行引擎执行查询失败之后调用
     * @param hookContext
     * @throws Exception
     */
    public void run(HookContext hookContext) throws Exception {

       if(hookContext.getHookType()==HookContext.HookType.POST_EXEC_HOOK){

            //采集sql的主逻辑
            System.out.println("***Hello from the hook !!***");
            String user = SessionState.getUserFromAuthenticator();
            System.out.println(user);
            SessionState session = SessionState.get();

            System.out.println("CMD  "+session.getCmd());
 
            if(StringUtils.isNotEmpty(session.getCmd())){
                //只有同时包含insert和select的sql语句需要采集
                session.setCmd(session.getCmd().toLowerCase());
                if(session.getCmd().contains(Constants.INSERT) && session.getCmd().contains(Constants.SELECT)){

                    //发送kafka消息通知消费端把数据入库到neo4j和mysql
                }
            }

 

        } 
 
    }
 
}

在hive配置文件中增加如下配置

<property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.atlas.hive.hook.HiveHook<value/>
</property>

1.通过Hook监听Hive的各种事件,比如创建表,修改表等,然后按同时包含insert和select的sql语句的数据推送到Kafka(防止数据产生过多未及时入库而造成的数据丢失)把上述代码打包丢入hive的lib包下就会在执行hive相关的产生数据血缘的语句时发送kafka消息。

2.写个项目专门消费kafka消息并存储数据血缘到neo4j和mysql中。

3.前端传入表名时后端调用如下代码查询neo4j获取血缘数据再按照和前端约定好的数据格式进行拼装返回给前端

@Test
public void findBloods() {
        Map param = new HashMap<String, String>();
        param.put("tableName", "rootTable");
        Iterable<Blood> cl2 = neo4jSession.query(Blood.class, "match (x)-[r]-(y) where r.fromTable={tableName} return x,r,y", param);
        for (Blood cl : cl2) {
            System.out.println(JsonUtil.toJson(cl));
        }
}

?

数据中台之数据血缘的具体实现

上一篇:17. Merge Two Binary Trees 融合二叉树


下一篇:四、分布式编程概述