数据治理平台系统文章:
DataHub: 现代数据栈的元数据平台系列之一
什么是datahub
DataHub是为现代数据栈【Modern Data Stack】构建的第三代元数据平台,支持数据发现、协作、治理和端到端可观察性。DataHub采用模型优先的理念,专注于解锁不同工具和系统之间的互操作性。
DataHub架构图
通过Datahub的架构图可以清晰的了解Datahub的架构组成。
主要分为三部分:
- Datahub frontend作为前端的页面展示,基于React框架研发
- Datahub serving来提供后端的存储服务。后端开发语言为Python,存储基于MySQL、ES、Neo4J
- Datahub ingestion则用于抽取元数据信息。提供了如下2种方式,这对于元数据的获取非常的灵活。
- 基于API元数据主动拉取方式,
- 基于Kafka的实时元数据获取方式。
DataHub组成部分
Metadata Store
- 负责存储构成元数据图的 Entities 和 Aspects
- Entities: 实体是元数据图中的主节点,由类型、唯一标识符和元数据属性组【即Aspect】 构成
- Aspects:一个Aspect是描述实体的特定方面的属性集合
- 它们是DataHub中写操作的最小原子单位,与同一实体相关联的多个Aspect可以独立更新。
- Aspect可以在实体之间共享,如Ownership【表示实体Entity所属的用户和组】
- 这包括提供用于摄取元数据、按主键获取元数据、搜索实体和获取实体间关系的API
- 它由以下2个部分组成:
- a Spring Java Service 负责提供Rest.li API
- MySQL, Elasticsearch, & Kafka 负责存储和索引
Metadata Models
- 元数据模型是定义构成元数据图的实体和方面的形状的schemas,以及它们之间的关系。
- schema使用PDL定义的,PDL是一种建模语言,在序列化为JSON时,其形式与Protobuf非常相似。PDL的使用参见pdl_schema
- 实体代表一个特定的元数据资产类,比如数据集、仪表盘、数据管道等等
- 实体的每个实例都由一个称为urn的唯一标识符标识
- Aspects表示附加到实体实例的相关数据描述,如针对实体实例的descriptions, tags
Ingestion Framework
Ingestion框架是一个模块化的、可扩展的Python库,用于从外部源系统(如kafka、mysql)提取元数据,将其转换为DataHub的元数据模型,并通过Kafka或直接使用元数据存储Rest API将其写入DataHub。
元数据Ingestion Plugin支持的列表详见metadata-ingestion plugin,针对mysql、hive、kafka、postgre等,还可以通过sqlalchemy支持所有jdbc源
使用示例:datahub ingest -c mysql_to_datahub_rest.yml
GraphQL API
GraphQL API提供了一个强类型的、面向实体的API,它使得与构成元数据图的实体的交互变得简单,包括添加和删除标签、所有者、链接和更多的元数据实体!
如何使用参见GraphQL API getting-started
User Interface
DataHub提供了一个React UI,包括一组不断发展的特性,使发现、管理和调试你的数据资产变得简单和愉快。
提供的功能包括如下:
- End-to-end Search and Discovery
- 通过跨平台、数据集、管道、图表和仪表板跟踪数据血缘,轻松理解数据的端到端旅程
- 当您浏览数据血缘图时,快速获取有关实体的上下文
- DataHub提供数据集概要分析和使用统计信息,让你对数据集的准确性和相关性获得信心
- 健壮的文档和标记
- 通过API和/或DataHub UI获取和维护数据实体相关的知识(文档等等)
- 通过API和/或DataHub UI创建和定义新Tag
- Browse and search specific tags to fast-track discovery across entities
- 数据治理能力
- 快速为用户和/或用户组分配资产所有权
- 使用策略管理细粒度访问控制
- Metadata quality & usage analytics
- DataHub is a Platform for Developer
- DataHub是一个API和流优先的平台,允许开发人员实现一个为他们特定的数据栈量身定制的实例。
- 不断增长的灵活集成模型集允许推送和拉入元数据
- 快速启动和运行的无代码元数据模型扩展。
- 支持众多数据来源(Dataset Sources)
- Identity Management
- 支持BI Tools、ETL / EL、Workflow Orchestration、Data Observability、ML Platform
快速安装部署
准备安装环境
需要提前安装 docker,jq,docker-compose。同时保证系统的python版本为 Python 3.6+。
针对系统的要求至少满足: 2 CPUs, 8GB RAM, 2GB Swap area, and 10GB disk space.
- 安装docker、docker-compose可以参见以下文档:
centos7.2上部署docker、docker-compose的步骤
Docker安装
Docker-Compose安装 - python的安装请参考:使用PyFlink, 如何在 zeppelin 里高效的开发 PyFlink Job? 中的
准备conda环境
部分
安装与启动datahub
安装
pip install --upgrade pip wheel setuptools
pip uninstall datahub acryl-datahub || true
pip install --upgrade acryl-datahub
datahub version
如果能正常显示datahub的版本信息,则说明安装成功!
启动
因为要下载docker image,所以会经过漫长的下载过程,需要耐心等待。本人安装时,主要碰到如下错误:
- 碰到端口冲突导致的某一个服务不能启动,则需要将占用端口的程序停止,然后重新执行命令
- 因为网络的问题导致的错误,直接重新执行命令
datahub docker quickstart
- 查看docker启动情况:
docker ps --format "table {{.ID}}\t{{.Names}}\t{{.Ports}}\t{{.Status}}"
- WEB访问: http://ip:9002/, 用户名 / 密码:datahub / datahub
摄取元数据到Datahub
元数据摄取使用的是插件架构,你仅需要安装所需的插件。
安装摄取元数据插件
摄取MySQL源的元数据,需要安装如下插件:pip install 'acryl-datahub[mysql]'
检查安装情况:datahub check plugins
可以看到mysql 后面的 (disabled) 没有了,
配置摄取的recipe
recipe一个配置文件,它告诉摄取脚本从哪里提取数据(源)和把数据放在哪里(接收),示例如下:
`vim mysql_to_datahub_rest.yml``
source:
type: mysql
config:
host_port: '172.25.21.123:3306'
username: videoweb
password: suntek
database: md_data_dictionary
include_views: false
profiling:
enabled: false
transformers:
-
type: simple_add_dataset_tags
config:
tag_urns:
- 'urn:li:tag:172.25.21.123'
sink:
type: datahub-rest
config:
server: 'http://172.25.21.22:8080'
运行元数据摄取
datahub ingest -c mysql_to_datahub_rest.yml
运行后,会打元数据摄取报告,如下:
Source (mysql) report:
{'entities_profiled': 0,
'failures': {},
'filtered': [],
'query_combiner': None,
'soft_deleted_stale_entities': [],
'tables_scanned': 1836,
'views_scanned': 0,
'warnings': {},
'workunit_ids': ['zyt_test.person',
......
'zyt_test.user'],
'workunits_produced': 1836}
Sink (datahub-rest) report:
{'downstream_end_time': datetime.datetime(2022, 1, 28, 11, 14, 49, 4127),
'downstream_start_time': datetime.datetime(2022, 1, 28, 11, 14, 17, 910894),
'downstream_total_latency_in_seconds': 31.093233,
'failures': [],
'records_written': 1836,
'warnings': []}
Pipeline finished successfully
导入样例数据
摄取样例元数据,请从终端运行以下CLI命令:datahub docker ingest-sample-data
运行结果如下:
Source (file) report:
{'failures': {},
'warnings': {},
'workunit_ids': ['file:///tmp/tmpnpyfcgho.json:0',
'file:///tmp/tmpnpyfcgho.json:1',
'file:///tmp/tmpnpyfcgho.json:2',
......
'file:///tmp/tmpnpyfcgho.json:83',
'file:///tmp/tmpnpyfcgho.json:84',
'file:///tmp/tmpnpyfcgho.json:85'],
'workunits_produced': 86}
Sink (datahub-rest) report:
{'downstream_end_time': datetime.datetime(2022, 1, 28, 12, 55, 37, 412504),
'downstream_start_time': datetime.datetime(2022, 1, 28, 12, 55, 36, 319103),
'downstream_total_latency_in_seconds': 1.093401,
'failures': [],
'records_written': 86,
'warnings': []}
Pipeline finished successfully
查看元数据
在摄取元数据后,刷新datahub页面,能正常的查看元数据信息:
根据Tag查看元数据
填坑记录
port冲突导致一些服务不能启动
datahub采用docker-compose、docker安装时,默认会占用如下端口,如果被占用,会导致相应的服务不能启动,导致安装失败
- datahub-frontend-react 9002
- datahub-gms 8080
- schema-registry 8081
- mysql 3306
- kafka broker 9092
- zookeeper 2181
- elasticsearch 9200
通过如下命令,查看docker使用port的情况docker ps --format "table {{.ID}}\t{{.Names}}\t{{.Ports}}\t{{.Status}}"
安装acryl-datahub[mysql]时报错
在使用pip安装acryl-datahub[mysql] ingestion插件时,最后报:Cannot uninstall ‘ruamel-yaml’. It is a distutils installed project
安装命令如下:pip install 'acryl-datahub[mysql]'
解决方案参见: fail-to-install-great-expectation-error-cannot-uninstall-ruamel-yaml
将安装命令调整为:pip install 'acryl-datahub[mysql]' --ignore-installed ruamel.yaml
执行datahub docker ingest-sample-data报错
解决步骤如下:
- 找到python模块的安装位置:
pip show acryl_datahub
, 如/opt/anaconda3/lib/python3.7/site-packages cd /opt/anaconda3/lib/python3.7/site-packages
vi datahub/cli/docker.py
- 将
issues = check_local_docker_containers()
修改为issues = False
,即跳过docker container检查
def ingest_sample_data(path: Optional[str]) -> None:
"""Ingest sample data into a running DataHub instance."""
if path is None:
click.echo("Downloading sample data...")
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp_file:
path = str(pathlib.Path(tmp_file.name))
# Download the bootstrap MCE file from GitHub.
mce_json_download_response = requests.get(GITHUB_BOOTSTRAP_MCES_URL)
mce_json_download_response.raise_for_status()
tmp_file.write(mce_json_download_response.content)
click.echo(f"Downloaded to {path}")
# Verify that docker is up.
issues = check_local_docker_containers()