Singer 可以方便的进行数据的etl 处理,我们可以处理的数据可以是api 接口,也可以是数据库数据,或者
是文件
备注: 测试使用docker-compose 运行&&提供数据库内容,使用virtualenv && python 3.5 以及以上
环境准备
- docker-compose 文件
version: "3"
services:
gogs-service:
image: gogs/gogs
ports:
- "10022:22"
- "10080:3000"
mongodb:
image: mongo:3.4
ports:
- "27017:27017"
mysql:
image: mysql:5.7.16
ports:
- 3306:3306
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
MYSQL_ROOT_PASSWORD: dalongrong
MYSQL_DATABASE: gogs
MYSQL_USER: gogs
MYSQL_PASSWORD: dalongrong
TZ: Asia/Shanghai
postgres:
image: postgres:9.6.11
ports:
- "5432:5432"
environment:
- "POSTGRES_PASSWORD:dalong"
- postgres target 配置
target.json
{
"host": "localhost",
"port": 5432,
"dbname": "postgres",
"user": "postgres",
"password": "postgres",
"schema": "public"
}
- 创建mongodb virtualenv
virtualenv mongodb
source ./mongodb/bin/activate
git clone https://github.com/singer-io/tap-mongodb.git
cd ap-mongodb && pip install .
- 创建mongodb tap 配置
- mongodb tap 配置文件
格式如下:
{
"host": "localhost",
"port": "27017",
"dbname": "usersapp"
}
- 添加mongodb 数据
- mongodb discover 获取collection 信息
./mongodb/bin/tap-mongodb -c mongo.json --discover > usersapp.json
- 修改同步配置&&schema properties
修改usersapp.json 文件
{
"streams": [
{
"table_name": "loginusers",
"stream": "loginusers",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"database-name": "usersapp",
"row-count": 3,
+ "selected": true,
+ "replication-method": "FULL_TABLE",
+ "custom-select-clause": "_id,name,age"
}
}
],
"tap_stream_id": "usersapp-loginusers",
"schema": {
"type": "object",
+ "properties": {
+ "name": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ "age": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "number"
+ ]
+ },
+ "type": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ "_id": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ }
+ }
+ }
+ ]
}
- 说明
注意因为mongodb is schemaless 我们需要配置同步的信息,不然运行会报错,官方文档写的不是很清晰,参考
上边的usersapp.json 内容
运行&&效果
- 运行
./mongodb/bin/tap-mongodb -c mongo.json --properties usersapp.json | ./postgres/bin/target-po
stgres -c target.json
- 效果
INFO Starting full table replication for table usersapp.loginusers
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 0, "tags": {}}
INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.012291193008422852, "tags": {"job_type": "sync_table", "database": "user
sapp", "table": "loginusers", "status": "succeeded"}}
INFO Table 'loginusers' does not exist. Creating... CREATE TABLE public.loginusers ("_id" character varying, "age" numeric, "name" character
varying, "type" character varying, PRIMARY KEY ("_id"))
INFO Loading 3 rows into 'loginusers'
INFO COPY loginusers_temp ("_id", "age", "name", "type") FROM STDIN WITH (FORMAT CSV, ESCAPE '\')
INFO UPDATE 0
INFO INSERT 0 3
{"currently_syncing": null, "bookmarks": {"usersapp-loginusers": {"initial_full_table_complete": true}}}
参考资料
https://github.com/singer-io/tap-gitlab
https://github.com/rongfengliang/singer-mysql2postges-demo