Singer 学习三 使用Singer进行mongodb 2 postgres 数据转换

Singer 可以方便的进行数据的etl 处理,我们可以处理的数据可以是api 接口,也可以是数据库数据,或者
是文件
备注: 测试使用docker-compose 运行&&提供数据库内容,使用virtualenv && python 3.5 以及以上

环境准备

  • docker-compose 文件
 
version: "3"
services:
  gogs-service:
    image: gogs/gogs
    ports:
      - "10022:22"
      - "10080:3000"
  mongodb:
    image: mongo:3.4
    ports:
    - "27017:27017"
  mysql:
    image: mysql:5.7.16
    ports:
      - 3306:3306
    command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
    environment:
      MYSQL_ROOT_PASSWORD: dalongrong
      MYSQL_DATABASE: gogs
      MYSQL_USER: gogs
      MYSQL_PASSWORD: dalongrong
      TZ: Asia/Shanghai
  postgres:
    image: postgres:9.6.11
    ports:
    - "5432:5432"
    environment:
    - "POSTGRES_PASSWORD:dalong"
 
 
  • postgres target 配置
    target.json
 
{
    "host": "localhost",
    "port": 5432,
    "dbname": "postgres",
    "user": "postgres",
    "password": "postgres",
    "schema": "public"
}
 
 
  • 创建mongodb virtualenv
 
virtualenv mongodb  
source ./mongodb/bin/activate
git clone https://github.com/singer-io/tap-mongodb.git
cd ap-mongodb  && pip install .
 
 
  • 创建mongodb tap 配置
  • mongodb tap 配置文件
    格式如下:
 
{
    "host": "localhost",
    "port": "27017",
    "dbname": "usersapp"
}
 
  • 添加mongodb 数据
    Singer 学习三 使用Singer进行mongodb  2 postgres 数据转换
  • mongodb discover 获取collection 信息
 
  ./mongodb/bin/tap-mongodb -c mongo.json --discover > usersapp.json
  • 修改同步配置&&schema properties
    修改usersapp.json 文件
 
{
  "streams": [
    {
      "table_name": "loginusers",
      "stream": "loginusers",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "database-name": "usersapp",
            "row-count": 3,
+ "selected": true,
+ "replication-method": "FULL_TABLE",
+ "custom-select-clause": "_id,name,age"
          }
        }
      ],
      "tap_stream_id": "usersapp-loginusers",
      "schema": {
        "type": "object",
+ "properties": {
+ "name": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ "age": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "number"
+ ]
+ },
+ "type": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ },
+ "_id": {
+ "inclusion": "available",
+ "maxLength": 255,
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ }
+ }
+ }
+ ]
}
 
 
  • 说明
    注意因为mongodb is schemaless 我们需要配置同步的信息,不然运行会报错,官方文档写的不是很清晰,参考
    上边的usersapp.json 内容

运行&&效果

  • 运行
 
./mongodb/bin/tap-mongodb -c mongo.json --properties usersapp.json | ./postgres/bin/target-po
stgres -c target.json
 
  • 效果
INFO Starting full table replication for table usersapp.loginusers
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 0, "tags": {}}
INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.012291193008422852, "tags": {"job_type": "sync_table", "database": "user
sapp", "table": "loginusers", "status": "succeeded"}}
INFO Table 'loginusers' does not exist. Creating... CREATE TABLE public.loginusers ("_id" character varying, "age" numeric, "name" character
 varying, "type" character varying, PRIMARY KEY ("_id"))
INFO Loading 3 rows into 'loginusers'
INFO COPY loginusers_temp ("_id", "age", "name", "type") FROM STDIN WITH (FORMAT CSV, ESCAPE '\')
INFO UPDATE 0
INFO INSERT 0 3
{"currently_syncing": null, "bookmarks": {"usersapp-loginusers": {"initial_full_table_complete": true}}}
 
 Singer 学习三 使用Singer进行mongodb  2 postgres 数据转换

参考资料

https://github.com/singer-io/tap-gitlab
https://github.com/rongfengliang/singer-mysql2postges-demo

上一篇:实战项目中Java heap space错误的解决


下一篇:Python-模块