PostgreSQL
pg+debezium+kafka实时同步
三机patroni+etcd高可用
基于Python实现大批量dataframe格式数据快速存入postgresql
python数据批量插入postgreSQL数据库
pg 插件扩展(extension)开发
psql 连接数据库的方法
pg 函数返回表、数据集、结果集
pg 日志记录 异常处理
pg 作业调度器 pg_timetable
pg 查询对象的权限
正则表达式去除注释
pg 数据库高可用及负载均衡JDBC参数
pg 生成节假日
pg_recvlogical 解析日志
pg 查看锁表
pg upsert写法
pg 查看函数或存储过程的定义
plpgsql 代码块
pg 查看表的膨胀率
pg 获取表大小
pg 非物化视图所有者刷新物化视图
本站点使用 MrDoc 构建
-
+
首页
pg+debezium+kafka实时同步
# PostgreSQL数据库的准备 ## 下载安装包 ``` $ wget https://ftp.postgresql.org/pub/source/v12.7/postgresql-12.7.tar.gz ``` ## 安装依赖 ``` $ yum install -y gcc gcc-c++ make readline readline-devel systemd systemd-devel zlib zlib-devel openssl openssl-devel ``` ## 编译安装 ``` $ tar -zxf postgresql-12.7.tar.gz $ cd $ ./configure --prefix=/opt/module/pg-12.7 --with-openssl --with-systemd $ make world $ make install-world ``` ## 设置环境变量 ```sh export PGDATA=/opt/module/pg-12.7/data export LD_LIBRARY_PATH=/opt/module/pg-12.7/lib:$LD_LIBRARY_PATH ``` ## 配置pg数据库 ``` $ cd /opt/module/pg-12.7/ $ mkdir data $ bin/initdb -D $PGDATA -U postgres $ bin/pg_ctl -D /opt/module/pg-12.7/data -l logfile start $ bin/psql -U postgres postgres=# \l List of databases Name | Owner | Encoding | Collate | Ctype | Access privileges -----------+----------+----------+-------------+-------------+----------------------- postgres | postgres | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | template0 | postgres | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/postgres + | | | | | postgres=CTc/postgres template1 | postgres | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/postgres + | | | | | postgres=CTc/postgres $ vim data/postgresql.conf listen_addresses = '*' wal_level = logical max_wal_senders = 2 max_replication_slots = 1 $ vim data/pg_hba.conf host replication all 0.0.0.0/0 trust $ bin/pg_ctl -D /opt/module/pg-12.7/data -l logfile restart ``` ## 准备数据 ``` $ bin/psql -U postgres postgres=# CREATE USER yxz WITH PASSWORD '1233456'; postgres=# CREATE DATABASE yxz_db OWNER yxz; postgres=# ALTER USER yxz REPLICATION; postgres=# \q $ bin/psql -U yxz -d yxz_db yxz_db=> CREATE SCHEMA yxz AUTHORIZATION yxz; yxz_db=> CREATE TABLE yxz.student(id int8,name varchar(32)); yxz_db=> INSERT INTO yxz.student VALUES(1,'Lisa'); yxz_db=> select * from yxz.student; id | name ----+------ 1 | Lisa (1 row) ``` # 部署kafka ## 安装kafka ``` $ tar -xzf kafka_2.11-2.4.1.tgz -C /opt/module $ cd /opt/module/kafka_2.11-2.4.1/ $ vim config/server.properties zookeeper.connect=localhost:2181/kafka ``` ## 启动zk和kafka ``` $ nohup bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null 2>&1 & $ jps 128038 QuorumPeerMain 130649 Jps $ bin/zookeeper-shell.sh localhost:2181 ls -R / ``` ``` $ bin/kafka-server-start.sh -daemon config/server.properties $ jps 3237 Kafka 128038 QuorumPeerMain 3274 Jps ``` # 安装postgresql connector ## 下载解压 ``` $ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.9.5.Final/debezium-connector-postgres-1.9.5.Final-plugin.tar.gz $ mkdir -p /opt/module/debezium/connector $ tar -zxf debezium-connector-postgres-1.9.5.Final-plugin.tar.gz -C /opt/module/debezium/connector ``` ## kafka配置connetor ``` $ vim config/connect-distributed.properties bootstrap.servers=localhost:9092 group.id=connect-yxz plugin.path=/opt/module/debezium/connector ``` ## 重启kafka ``` $ bin/kafka-server-stop.sh $ bin/kafka-server-start.sh -daemon config/server.properties ``` ## 启动kafka connector ``` $ bin/connect-distributed.sh -daemon config/connect-distributed.properties $ jps 13146 ConnectDistributed ``` ## 检查kafka connector是否正常工作 ``` $ curl -H "Accept:application/json" localhost:8083 {"version":"2.4.1","commit":"c57222ae8cd7866b","kafka_cluster_id":"ytb9XqzEREimiN_D3ZJI2w"} ``` ## 检查kafka connector注册的连接器列表 ``` $ curl -H "Accept:application/json" localhost:8083/connectors/ [] ``` ## 连接器配置信息 ```json { "name": "yxz-postgresql-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "yxz", "database.password": "123456", "database.dbname" : "yxz_db", "database.server.name": "yxz", "table.include.list": "public.inventory", "plugin.name": "pgoutput" } } ``` ## 注册连接器 ``` curl -X POST localhost:8083/connectors -H "Content-Type:application/json" -d '{ "name": "yxz-postgresql-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "yxz", "database.password": "123456", "database.dbname" : "yxz_db", "database.server.name": "yxz", "table.include.list": "public.inventory", "plugin.name": "pgoutput" } }' ``` ## 获取连接器信息 ``` curl -H "Accept:application/json" localhost:8083/connectors/yxz-postgresql-connector curl -H "Accept:application/json" localhost:8083/connectors/yxz-postgresql-connector/config ``` ## 修改连接器信息 ``` curl -X PUT localhost:8083/connectors/yxz-postgresql-connector/config -H "Content-Type:application/json" -d '{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "yxz", "database.password": "123456", "database.dbname" : "yxz_db", "database.server.name": "yxz", "schema.include.list": "public,yxz", "plugin.name": "pgoutput" }' ``` ## 重启connector ``` curl -X POST localhost:8083/connectors/yxz-postgresql-connector/restart?includeTasks=true ```
vleity
2025年5月17日 16:40
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码