博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过Bottledwater同步PostgreSQL中的数据变化到Kafka消息队列
阅读量:7225 次
发布时间:2019-06-29

本文共 6067 字,大约阅读时间需要 20 分钟。

当我们遇到需要捕获数据库中数据变化的时候,总是会想到通过消息队列来实现该需求,通过把数据变化发布到消息队列,来完成系统上下游的解耦。关心这些数据变化的应用可以从消息队列上获取这些数据。

Bottledwater-pg是针对PostgreSQL数据库的一种消息生产者,可以将PostgreSQL数据库的数据写入confluent Kafka,从而实时的分享给消息订阅者。支持PostgreSQL 9.4以及以上版本,支持全量快照,以及持续解析数据WAL日志中的增量数据并写入Kafka。每一张数据库表为一个topic。数据在使用decode从WAL取出后,使用Avro将数据格式化(通常格式化为JSON)再写入Kafka。

Bottledwater-pg有docker、源码编译、Ubuntu三种使用方式,本文以源码编译方式说明如何部署。

一. 环境说明

  • 源端

IP:10.19.100.23

操作系统:RHEL6.3

数据库:PostgreSQL-9.4.11

  • 目的端

IP:10.19.100.21

操作系统:RHEL6.3

Kafka:2.10-0.10.2.0

二. 源端配置(10.19.100.23)

Bottledwater-pg依赖以下软件包:

  • avro-c > =1.8.0
  • jansson
  • libcurl
  • librdkafka > =0.9.1

Bottledwater-pg可选以下软件包:

  • libsnappy
  • boost

另外编译要求较高的cmake版本,操作系统自带的cmake会出现编译错误,本文使用:

  • cmake-3.8.0

2.0 卸载系统自带PG包

root用户操作)

 RedHat根据安装模式可能自带8.4版本的PostgreSQL,请务必卸载,否则会对编译造成影响

# rpm -qa|grep postgrespostgresql-libs-8.4.11-1.el6_2.x86_64postgresql-devel-8.4.11-1.el6_2.x86_64postgresql-8.4.11-1.el6_2.x86_64# rpm -e postgresql-devel-8.4.11-1.el6_2.x86_64# rpm -e postgresql-8.4.11-1.el6_2.x86_64# rpm -e postgresql-libs-8.4.11-1.el6_2.x86_64

2.1 编译安装cmake

root用户操作)

# cd /opt# tar zxvf cmake-3.8.0.tar.gz# cd cmake-3.8.0# ./bootstrap# make# make install# cmake -versioncmake version 3.8.0CMake suite maintained and supported by Kitware (kitware.com/cmake).

2.2 编译安装jansson

root用户操作)

# cd /opt# tar jxvf jansson-2.9.tar.bz2# cd jansson-2.9# ./configure# make# make install# ls /usr/local/lib/pkgconfigjansson.pc#export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.3 编译安装avro

root用户操作)

# cd /opt# yum install -y xz-*# yum install -y zlib-devel.x86_64# tar zxvf avro-src-1.8.1.tar.gz# cd avro-src-1.8.1/lang/c# mkdir build# cd build# cmake .. -DCMAKE_INSTALL_PREFIX=/opt/avro -DCMAKE_BUILD_TYPE=Release -DTHREADSAFE=true# make# make test# make install

导入库文件

# vi /etc/ld.so.conf/opt/avro/lib# ldconfig

配置临时环境变量

# export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATH# export PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH

2.4 安装libcurl

root用户操作)

# yum install -y libcurl-devel.x86_64

2.5 编译安装librdkafka

root用户操作)

# cd /opt# unzip librdkafka-master.zip# cd librdkafka-master# ./configure# make# make install# ls /usr/local/lib/pkgconfigjansson.pc  rdkafka.pc  rdkafka++.pc

2.6 添加引用库

root用户操作)

# vi /etc/ld.so.conf.d/bottledwater.conf/opt/avro/lib/usr/local/lib/PostgreSQL/9.4/lib# ldconfig

2.7 编译安装bottledwater-pg

root用户操作)

# chown -R postgres:postgres /opt/

postgres用户操作)

配置环境变量

$ vi ~/.bash_profile# .bash_profile# Get the aliases and functionsif [ -f ~/.bashrc ]; then        . ~/.bashrcfi# User specific environment and startup programsexport PG_HOME=/PostgreSQL/9.4export LD_LIBRARY_PATH=/opt/avro/lib:$LD_LIBRARY_PATHexport PKG_CONFIG_PATH=/opt/avro/lib/pkgconfig:/usr/local/lib/pkgconfig:$PKG_CONFIG_PATHPATH=$PG_HOME/bin:$PATH:$HOME/binexport PATH

准备安装包

$ unzip bottledwater-pg-master.zip$ cd bottledwater-pg-master

这里不知道是操作系统环境的问题还是开源软件本身的问题,需要修改源码包里的2处Makefile才能通过编译。

  • 修改client/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)
  • 修改kafka/Makefile
PG_CFLAGS = -I$(shell pg_config --includedir) -I$(shell pg_config --includedir-server) -I$(shell pg_config --pkgincludedir)LDFLAGS=-L/usr/lib64 $(CURL_LDFLAGS) $(PG_LDFLAGS) $(KAFKA_LDFLAGS) $(AVRO_LDFLAGS) $(JSON_LDFLAGS)

编译并安装bottledwater-pg

$ make$ make install

安装完成后会自动在PostgreSQL数据库扩展包目录下生成扩展库文件和扩展库控制文件。

$ ls /PostgreSQL/9.4/lib/postgresql/bottledwater*/PostgreSQL/9.4/lib/postgresql/bottledwater.so$ ls /PostgreSQL/9.4/share/postgresql/extension/bottledwater*/PostgreSQL/9.4/share/postgresql/extension/bottledwater--0.1.sql/PostgreSQL/9.4/share/postgresql/extension/bottledwater.control

2.7 数据库配置

试验环境中PostgreSQL已有数据库mas,用户mas,密码mas。

Mas库中有一张表:

mastest(col1 numeric primary key,col2 character varying(10));
  • 修改数据库配置文件
$ vi /PostgreSQL/9.4/data/postgresql.confmax_worker_processes = 8        # 至少为8wal_level = logical             # 至少为logical,可以更高max_wal_senders = 8             # 至少为8wal_keep_segments = 256         # 至少为256max_replication_slots = 4       # 至少为4
  • 修改数据库白名单配置文件

  *这部分权限可能过大,可以精简

$ vi /PostgreSQL/9.4/data/pg_hba.conflocal   replication     all                                     trusthost    replication     all             127.0.0.1/32            trusthost    replication     all             0.0.0.0/0               md5host    all             postgres        10.19.100.23/32         trust
  • 重启数据库并对要监控的库创建bottledwater扩展
$ pg_ctl restart -D /PostgreSQL/9.4/data/ -l /PostgreSQL/9.4/data/pglog.log -m fast$ psql -U postgres -d mas -c "create extension bottledwater;"Password for user postgres:CREATE EXTENSION

三. 测试同步

  • 在源端(10.19.100.23)启动bottledwater

 

$ cd /opt/bottledwater-pg-master/kafka$ ./bottledwater --postgres=postgres://postgres:123456@10.19.100.23:5432/mas --broker=10.19.100.21:9092 -f json[INFO] Writing messages to Kafka in JSON format[INFO] Created replication slot "bottledwater", capturing consistent snapshot "00000718-1".INFO:  bottledwater_export: Table mas.mastest is keyed by index mastest_pkey[INFO] Snapshot complete, streaming changes from 0/1749F58.
  • 向源端数据库写入数据
mas=> insert into mastest values(1,'A');INSERT 0 1mas=> insert into mastest values(2,'B');INSERT 0 1mas=> update mastest set col2='C' where col1=2;UPDATE 1mas=> delete from mastest where col1=1;DELETE 1
  • 目的端查看消息事件
# bin/kafka-topics.sh --list --zookeeper localhost:2181mas.mastest# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mas.mastest --from-beginning --property print.key=true{
"col1": {
"double": 1.0}} {
"col1": {
"double": 1.0}, "col2": {
"string": "A"}}{
"col1": {
"double": 2.0}} {
"col1": {
"double": 2.0}, "col2": {
"string": "B"}}{
"col1": {
"double": 2.0}} {
"col1": {
"double": 2.0}, "col2": {
"string": "C"}}{
"col1": {
"double": 1.0}} null

说明:

1. 目的端Kafka配置文件中,Listener的配置不能用默认值localhost

2. 每一张PostgreSQL中的表都会被做为一个topic,每个topic是自动创建的不需要人工干预,即使后期新建的表也是如此。

3. 消息事件以“主键列 + 变更后的所有列”作为消息内容

4. 主从复制模式下,slave不能启动bottledwater,因为备库不产生wal日志

转载于:https://www.cnblogs.com/aegis1019/p/9051462.html

你可能感兴趣的文章
Atlassian是怎样进行持续交付的?且听 Steve Smith一一道来
查看>>
Web Storage相关
查看>>
[PHP内核探索]PHP中的哈希表
查看>>
Apache-drill Architechture
查看>>
WordPress 5.2 Beta 3 发布,要求 PHP 5.6.20 以上版本
查看>>
通通连起来——无处不在的流
查看>>
互联网+时代,看云计算如何改变传统行业
查看>>
ZFS ARC & L2ARC zfs-$ver/module/zfs/arc.c
查看>>
c++类默认拷贝构造函数---浅复制
查看>>
2019年最火热的Golang项目
查看>>
可实现RSSD云硬盘120万IOPS的SPDK IO路径优化实践
查看>>
Vue项目部署遇到的坑(你肯定会遇到!)
查看>>
资源分享计划第三期 0511
查看>>
awk 文本处理
查看>>
【JSConf EU 2018】主题总结 (部分主题已有中文文章)
查看>>
JavaScript面向对象名词详解
查看>>
Java设计模式学习 - 责任链模式
查看>>
JVM,DVM,ART
查看>>
webgl滤镜--会呼吸的痛
查看>>
用Go语言实现微信支付SDK
查看>>