使用amazonriver同步PG数据到Kafka

最近生产上,DW有实时同步数据的需求,这里我们研究了下社区的开源组件,最终选择了amazonriver这个哈罗单车开源的小工具来实现。


前提条件:

1、上游的PG主库要开启逻辑复制

2、表必须有主键


部署方案:

项目地址:https://github.com/hellobike/amazonriver

amazonriver的安装部署很简单,根据官方github操作即可。

我这里的部署路径是: /usr/local/amazonriver

root@node1:/usr/local/amazonriver
# tree
├── adb-tb1.json
├── amazonriver


cat adb-tb1.json 配置文件如下:

{
    "pg_dump_path": "/usr/pgsql-13/bin/pg_dump",
    "subscribes": [{
        "dump": false,
        "slotName": "dts_adb_tb1",
        "pgConnConf": {
            "host": "192.168.2.4",
            "port": 5432,
            "database": "adb",
            "user": "dts",
            "password": "dts"
        },
        "rules": [
            {
                "table": "tb1",
                "pks": ["id"],
                "topic": "rti_p_adb_tb1"
            }
        ],
        "kafkaConf": {
            "addrs": ["dev-kafka-01:9092"]
        },
        "retry": -1
    }],
    "prometheus_address": ":8212"
}


cat /etc/supervisord.d/adb-tb1.ini

[program:amazonriver-adb-tb1]
command=/usr/local/amazonriver/amazonriver -config /usr/local/amazonriver/adb-tb1.json
directory=/usr/local/amazonriver
redirect_stderr=true
stdout_logfile=/tmp/supervisord/%(program_name)s.log
stdout_logfile_maxbytes=100MB
stdout_logfile_backups=2
stderr_logfile=/tmp/supervisord/%(program_name)s.log
stderr_logfile_maxbytes=100MB
stderr_logfile_backups=2
process_name=%(program_name)s
numprocs=1
numprocs_start=0
autostart=true
autorestart=unexpected
startsecs=10
startretries=999
exitcodes=0,2
stopsignal=TERM
stopwaitsecs=10
stopasgroup=false
killasgroup=false


我这里使用的是supervisord控制的,直接起来即可。


注意事项:  

1、Amazonriver的配置文件中,注意下面这个参数:

  "retry": -1

这样可保证在amazonriver人工重启或者异常重启的时候数据不丢失(但是会出现下游数据多得情况),下游数据多,需要自行去重下。

可以看这个ISSUE:https://github.com/hellobike/amazonriver/issues/27


2、PG中的注意事项

如果某个amazonriver进程不在用了,意味着它创建的对应的复制槽也不再用了,记得及时删除掉,不然wallog会越积越多,把磁盘空间打满。


包装成一个api接口,方便使用:


使用amazonriver同步PG数据到Kafka










上一篇:DB-非阻塞事务创建索引


下一篇:PostgreSQL 学习笔记