Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)
Hôm nay rãnh rỗi sẽ viết một bài liên quan đến Debezium và Kafka, mục đích của bài này là sử dụng Source connector và Sink connector để đẩy dữ liều từ MSSQL sang PostgreSQL theo thời gian thực.
Trước tiên thực hiện bài này nên đọc qua trước Debezium và Kafka gì, mục đích của nó là gì tại đây:
https://debezium.io/
https://debezium.io/
Còn đối với tôi thì đơn giản là Debezium là công cụ dùng để lắng nghe các thay đổi dữ liệu từ các cơ sở dữ liệu như thêm xoá sửa ... (Change Data Capture (CDC))
Kafka là một nền tảng phân tán dùng để xử lý các luồng dữ liệu theo thời gian thực.
Ở bài viết này tôi sẽ cài đặt và cấu hình trên VM, tôi sẽ dùng openSUSE Leap 15, tại vì sao tôi lại cài trên này ? Đơn giản khi bạn chịu khó làm thủ công đến khi bạn hiểu rõ bản chất của nó sau này sẽ hỗ trợ bạn khá là nhiều.
Bài lab này sẽ có 3 server sau :
openSUSE Leap Debezium và Kafka : 192.168.1.6 - Dùng để cài Leap Debezium và Kafka
MSSQL: 192.168.1.3 - Chứa data chính
PostgreSQL : 192.168.1.5 - Chứa data lấy từ data MSSQL
Tôi có kịch bản như sau : Bên phần server MSSQL nó phục vụ cho một công việc nào đó ABC trong đó có một table tên là dbo.Aircraft nó phục vụ cho một công việc nào đó, giờ câu hỏi đặt ra là có một bên dùng PostgreSQL muốn có bảng public.Aircraft với nội dụng giống vậy và nó được cập nhật theo thời gian thực, có nghĩa là bên MSSQL làm gì thay đổi gì ở table này thì bên PostgreSQL cũng thay đổi theo và nó phải chạy theo thời gian thực.
Mô hình như sau :
MSSQL ==connector===> Debezium và Kafka ===Sink==> PostgreSQL
Cài đặt Debezium và Kafka:
Java Openjdk : Debezium và Kafka chạy trên nền java nên đầu tiên cần phải cài java jdk trước
sudo zypper install java-11-openjdk
Kafka :
Hãy theo dõi link bên dưới để tim kiếm phiên bản muốn cài.
https://archive.apache.org/dist/kafka/
Hiện tại kafka_2.12-3.7.0 là mới nhất nên sẽ download bản này về cài, mới nhất ở hiện tại, sau khi kéo về xong , giải nén ra và cho vào /usr/local/kafka nơi này sẽ là nơi chúng ta làm việc chính.
wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.12-3.7.0.tgz tar -xzf kafka_2.12-3.7.0.tgz mv kafka_2.12-3.7.0 /usr/local/kafka
Zookeeper :
Sau khi tải các gói về đầy đủ thì bên trong Kafka bao gồm luôn cả Zookeeper, Kafka sử dụng Zookeeper để quản lý thông tin về các brokers, topics và Partition. Zookeeper giúp duy trì tính nhất quán và đồng bộ giữa các brokers trong cụm nên sau khi đã tải về nên cần chạy đúng các quy trình sau.
Đầu tiên để lên được kafka cần chạy Zookeeper trước, sau khi nó chạy hoàn tất sẽ tiến hành chạy Kafka, chỉ cần 2 bước này thì đã hoàn tất quá trình để có được server Kafka.
Chạy Zookeeper trước :
cd /usr/local/kafka bin/zookeeper-server-start.sh config/zookeeper.properties
cd /usr/local/kafka vi config/server.properties listeners=PLAINTEXT://192.168.1.6:9092
Sau khi chỉnh sửa xong bắt đầu chạy Kafka
cd /usr/local/kafka bin/kafka-server-start.sh config/server.properties
Debezium :
Đây là phần quan trọng và khá là thú vị, trong Debezium nó có hỗ trợ nhiều phần, ở đây tôi dùng Kafka nên cần sử dụng phần Debezium Connectors.
Nói về Debezium Connectors thì bạn cần hiểu nó dùng để theo dõi database thông qua change data capture (CDC), nhiệm vụ chính của nó là như vậy.
Một lưu ý có hai lựa chọn như sau :
Connect Standalone : Một là nếu bạn muốn chạy connect-standalone.properties thì sẽ không cần bận tâm gì nhiều, chỉ cần tải các plugin sau về và copy nó vào Kafka lib, lưu ý là connect-standalone nhé. Khi bạn tải về copy bỏ vào là xem như bạn đã giải quyết xong phần này và chạy lệnh sau là đã hoàn thành.
Làm như thế nào là chạy một lần không sử dụng được plugin.path , plugin.path là gì ? ý tôi muốn phân ra từng cụm các plugins để dễ quản lý, muốn dùng plugin nào tách biệt ra để dễ quản lý.
Phần connect-standalone này thì tôi không giải quyết được, chỉ copy mọi thứ vào Kafka libs để chạy nên không dùng được plugin.path, vì khi cấu hình plugin.path mọi thứ ok nhưng chỉ phần connect ok còn sink thì vẫn bị 500.
wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/* /usr/local/kafka/libs
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-sqlserver/* /usr/local/kafka/libs
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-postgres/* /usr/local/kafka/libs
Ở trên là đang copy 3 plugins mssql, postgress connector và jdbc connector bao gồm sink vào libs kafka, đơn giản là vậy thôi.
cd /usr/local/kafka vi config/connect-standalone.properties edit : bootstrap.servers=192.168.1.6:9092 bin/connect-standalone.sh config/connect-standalone.properties
Connect Distributed :
Để giải quyết vấn đề quản lý được các plugins và sử dụng plugin.path tôi cần dùng connect-distributed, nó có nghĩa là nếu tôi muốn dùng connector nào thì tôi bỏ nó vào một folder, sau này cần nâng cấp phiên bản hay gì tôi chỉ vào đó giải quyết, vì file khá là nhiều nếu ném tất cả vào kafka libs thì đối với bản thân tôi thấy khá là kì kì, tôi không muốn tiếp cận vấn đề như vậy.
Giải quyết việc này khá đơn giản như sau:
Đầu tiền trong dự án Kafka tôi sẽ tạo một folder connector, tôi sẽ kéo các plugin cần thiết vào đó ví dụ như debezium-connector-postgres để kết nối postgress, hoặc debezium-connector-sqlserver để kết nối mssql hoặc confluentinc-kafka-connect-jdbc-10.7.6 để sink data tới postgress ... vân vân.
Ở bài nào này chỉ cần confluentinc-kafka-connect-jdbc với debezium-connector-sqlserver là đủ nhưng tôi sẽ thêm debezium-connector-postgres để nó có tính tường minh dễ hiểu hơn.
cd /usr/local/kafka vi config/connect-distributed.properties
edit bootstrap.servers=192.168.1.6:9092 Add: plugin.path=/usr/local/kafka/plugins,
Trong connect-distributed.properties chỉ cần chỉnh sửa lại thông tin và thêm dòng plugin.path như trên, vào việc thứ 2 là tôi muốn các plugin theo sự quản lý của tôi, mọi thứ phân ra rõ ràng để vào thư mục connector trong dự án.
Lưu ý phần này libs là mặc định củ của kafka libs và không thêm bất cứ cái gì, ở đây xem như bước đầu tiên và hay bỏ qua cách làm trên của connect-standalone.
wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/lib/* /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-sqlserver/* /usr/local/kafka/connector/debezium-connector-sqlserver
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-postgres/* /usr/local/kafka/connector/debezium-connector-postgres
Sau khi mọi thứ đã copy tách biệt ra rồi, cái quan trọng là sử dụng Symbolic link để link plugin với các thư mục trên.
ln -s /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc /usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-sqlserver//usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-postgres/ /usr/local/kafka/plugins/
Vậy là xong và chậy lệnh sau là hoàn thành
cd /usr/local/kafka bin/connect-distributed.sh config/connect-distributed.properties
Đối với cách này nó sẽ tạo cho bạn connect-cluster, khi nào đăng ký connect thì có thể vào web như trên để kiểm tra, có nghĩa là chưa đăng ký nó sẽ rỗng.
Sau khi mọi thứ tôi chắc chắn đã chạy ngon lành rồi thì sẽ cần config service để sau này có thể chạy nó lên không cần phải mở terminal để chạy từng cái một nưa.
sudo vi /etc/systemd/system/zookeeper.service [Unit] Description= Zookeeper After=network.target [Service] User=sa Group=users ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties LimitNOFILE=10240 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target sudo systemctl enable zookeeper.service sudo systemclt start zookeeper.service
sudo vi /etc/systemd/system/kafka.service [Unit] Description= Kafka After=network.target [Service] User=sa Group=users ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties LimitNOFILE=10240 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target sudo systemclt enable kafka.service sudo systemclt start kafka.service
sudo vi /etc/systemd/system/debezium-standalone-kafka.service [Unit] Description= Debezium Standalone Kafka After=network.target [Service] User=sa Group=users ExecStart=/usr/local/kafka/bin/connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties LimitNOFILE=10240 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target sudo systemctl enable debezium-standalone-kafka.service
sudo systemclt start debezium-standalone-kafka.service
sudo vi /etc/systemd/system/debezium-distributed-kafka.service [Unit] Description= Debezium Kafka After=network.target [Service] User=sa Group=users ExecStart=/usr/local/kafka/bin/connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties LimitNOFILE=10240 Restart=on-failure RestartSec=5s [Install] WantedBy=multi-user.target sudo systemctl enable debezium-connect-distributed-kafka.service sudo systemclt start debezium-connect-distributed-kafka.service
EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_disable_db;
EXEC sys.sp_cdc_enable_db; EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'Aircraft', @supports_net_changes = 0; @role_name = NULL;
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"topic.prefix" : "sqlserver",
"database.hostname" : "192.168.1.3",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "dev@@123",
"database.names" : "OPERATION",
"schema.history.internal.kafka.bootstrap.servers" : "192.168.1.6:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"database.encrypt": "false"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @sql.json
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "sqlserver.OPERATION.dbo.Aircraft",
"connection.url": "jdbc:postgresql://192.168.1.5:5432/OPERATION",
"connection.user": "root",
"connection.password": "root",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "RegistrationID",
"pk.mode": "record_key",
"table.name.format": "Aircraft"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @pos.json