Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

 Hôm nay rãnh rỗi sẽ viết một bài liên quan đến Debezium và Kafka, mục đích của bài này là sử dụng Source connector và Sink connector để đẩy dữ liều từ MSSQL sang PostgreSQL theo thời gian thực.

Trước tiên thực hiện bài này nên đọc qua trước Debezium và Kafka gì, mục đích của nó là gì tại đây:

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)
https://debezium.io/
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


https://debezium.io/


Còn đối với tôi thì đơn giản là Debezium là công cụ dùng để lắng nghe các thay đổi dữ liệu từ các cơ sở dữ liệu như thêm xoá sửa ... (Change Data Capture (CDC))

Kafka là một nền tảng phân tán dùng để xử lý các luồng dữ liệu theo thời gian thực.

Ở bài viết này tôi sẽ cài đặt và cấu hình trên VM, tôi sẽ dùng openSUSE Leap 15, tại vì sao tôi lại cài trên này ? Đơn giản khi bạn chịu khó làm thủ công đến khi bạn hiểu rõ bản chất của nó sau này sẽ hỗ trợ bạn khá là nhiều.

Bài lab này sẽ có 3 server sau :

openSUSE Leap Debezium và Kafka : 192.168.1.6 - Dùng để cài Leap Debezium và Kafka

MSSQL: 192.168.1.3 - Chứa data chính

PostgreSQL : 192.168.1.5 - Chứa data lấy từ data MSSQL


Tôi có kịch bản như sau : Bên phần server MSSQL nó phục vụ cho một công việc nào đó ABC trong đó có một table tên là dbo.Aircraft nó phục vụ cho một công việc nào đó, giờ câu hỏi đặt ra là có một bên dùng PostgreSQL muốn có bảng public.Aircraft với nội dụng giống vậy và nó được cập nhật theo thời gian thực, có nghĩa là bên MSSQL làm gì thay đổi gì ở table này thì bên PostgreSQL cũng thay đổi theo và nó phải chạy theo thời gian thực. 

Mô hình như sau :

MSSQL ==connector===> Debezium và Kafka ===Sink==> PostgreSQL


Cài đặt Debezium và Kafka:

Java Openjdk : Debezium và Kafka chạy trên nền java nên đầu tiên cần phải cài java jdk trước    

sudo zypper install java-11-openjdk


Kafka : 

Hãy theo dõi link bên dưới để tim kiếm phiên bản muốn cài.

https://archive.apache.org/dist/kafka/

Hiện tại kafka_2.12-3.7.0 là mới nhất nên sẽ download bản này về cài, mới nhất ở hiện tại, sau khi kéo về xong , giải nén ra và cho vào /usr/local/kafka nơi này sẽ là nơi chúng ta làm việc chính.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -xzf kafka_2.12-3.7.0.tgz
mv kafka_2.12-3.7.0 /usr/local/kafka


Zookeeper :

Sau khi tải các gói về đầy đủ thì bên trong Kafka bao gồm luôn cả ZookeeperKafka sử dụng Zookeeper để quản lý thông tin về các brokers, topics và Partition. Zookeeper giúp duy trì tính nhất quán và đồng bộ giữa các brokers trong cụm nên sau khi đã tải về nên cần chạy đúng các quy trình sau.

Đầu tiên để lên được kafka cần chạy Zookeeper trước, sau khi nó chạy hoàn tất sẽ tiến hành chạy Kafka, chỉ cần 2 bước này thì đã hoàn tất quá trình để  có được server Kafka.

Chạy Zookeeper trước : 

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Chỉnh sửa vài thông tin cho Kafka, ở đây tôi muốn dùng bằng IP nên sẽ chỉnh sửa một số thứ như sau :
Thói quen của tôi hay chỉ định IP cho server cụ thể nên sẽ fix cứng nó là IP. Và port mặc định nó sẽ chạy 9092.
cd /usr/local/kafka
vi config/server.properties
listeners=PLAINTEXT://192.168.1.6:9092

Sau khi chỉnh sửa xong bắt đầu chạy Kafka

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Debezium : 

Đây là phần quan trọng và khá là thú vị, trong Debezium nó có hỗ trợ nhiều phần, ở đây tôi dùng Kafka nên cần sử dụng phần Debezium Connectors.

Nói về Debezium Connectors  thì bạn cần hiểu nó dùng để theo dõi database  thông qua change data capture (CDC), nhiệm vụ chính của nó là như vậy.

Một lưu ý có hai lựa chọn như sau :

Connect Standalone : Một là nếu bạn muốn chạy connect-standalone.properties thì sẽ không cần bận tâm gì nhiều, chỉ cần tải các plugin sau về và copy nó vào Kafka lib, lưu ý là connect-standalone nhé. Khi bạn tải về copy bỏ vào là xem như bạn đã giải quyết xong phần này và chạy lệnh sau là đã hoàn thành.

Làm như thế nào là chạy một lần không sử dụng được plugin.path , plugin.path là gì ? ý tôi muốn phân ra từng cụm các plugins để dễ quản lý, muốn dùng plugin nào tách biệt ra để dễ quản lý. 

Phần connect-standalone này thì tôi không giải quyết được, chỉ copy mọi thứ vào Kafka libs để chạy nên không dùng được plugin.path,  khi cấu hình plugin.path mọi thứ ok nhưng chỉ phần connect ok còn sink thì vẫn bị 500.


wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-sqlserver/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-postgres/* /usr/local/kafka/libs


Ở trên là đang copy 3 plugins mssql, postgress connector và jdbc connector bao gồm sink vào libs kafka, đơn giản là vậy thôi.


cd /usr/local/kafka
vi config/connect-standalone.properties
edit : bootstrap.servers=192.168.1.6:9092

bin/connect-standalone.sh config/connect-standalone.properties
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Connect Distributed : 

Để giải quyết vấn đề quản lý được các plugins và sử dụng plugin.path tôi cần dùng connect-distributed, nó có nghĩa là nếu tôi muốn dùng connector nào thì tôi bỏ nó vào một folder, sau này cần nâng cấp phiên bản hay gì tôi chỉ vào đó giải quyết, vì file khá là nhiều nếu ném tất cả vào kafka libs thì đối với bản thân tôi thấy khá là kì kì, tôi không muốn tiếp cận vấn đề như vậy.

Giải quyết việc này khá đơn giản như sau:

Đầu tiền trong dự án Kafka tôi sẽ tạo một folder connector, tôi sẽ kéo các plugin cần thiết vào đó ví dụ như debezium-connector-postgres để kết nối postgress,  hoặc debezium-connector-sqlserver để kết nối mssql hoặc confluentinc-kafka-connect-jdbc-10.7.6 để sink data tới postgress ...  vân vân.

Ở bài nào này chỉ cần confluentinc-kafka-connect-jdbc với debezium-connector-sqlserver là đủ nhưng tôi sẽ thêm debezium-connector-postgres để nó có tính tường minh dễ hiểu hơn.

cd /usr/local/kafka
vi config/connect-distributed.properties
edit bootstrap.servers=192.168.1.6:9092 Add: plugin.path=/usr/local/kafka/plugins,

Trong connect-distributed.properties chỉ cần chỉnh sửa lại thông tin và thêm dòng plugin.path như trên, vào việc thứ 2 là tôi muốn các plugin theo sự quản lý của tôi, mọi thứ phân ra rõ ràng để vào thư mục connector trong dự án.

Lưu ý phần này libs là mặc định củ của kafka libs và không thêm bất cứ cái gì, ở đây xem như bước đầu tiên và hay bỏ qua cách làm trên của connect-standalone.

wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/lib/* /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-sqlserver/* /usr/local/kafka/connector/debezium-connector-sqlserver
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-postgres/* /usr/local/kafka/connector/debezium-connector-postgres

Sau khi mọi thứ đã copy tách biệt ra rồi, cái quan trọng là sử dụng Symbolic link để link plugin với các thư mục trên.

ln -s /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc /usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-sqlserver//usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-postgres/ /usr/local/kafka/plugins/

Vậy là xong và chậy lệnh sau là hoàn thành

cd /usr/local/kafka
bin/connect-distributed.sh config/connect-distributed.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Đối với cách này nó sẽ tạo cho bạn connect-cluster, khi nào đăng ký connect thì có thể vào web như trên để kiểm tra, có nghĩa là chưa đăng ký nó sẽ rỗng.

Sau khi mọi thứ tôi chắc chắn đã chạy ngon lành rồi thì sẽ cần config service để sau này có thể chạy nó lên không cần phải mở terminal để chạy từng cái một nưa.

Service Zookeeper:

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description= Zookeeper
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable zookeeper.service
sudo systemclt start zookeeper.service

Service Kafka:

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description= Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemclt enable kafka.service
sudo systemclt start kafka.service

Service Debezium Kafka Standalone : (Lưu ý chỉ chọn 1 trong 2 giữa Standalone và Distributed, 2 cái này sẽ chung một port 8083)

sudo vi /etc/systemd/system/debezium-standalone-kafka.service

[Unit]
Description= Debezium Standalone Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-standalone-kafka.service
sudo systemclt start debezium-standalone-kafka.service

Service Debezium Kafka Distributed :

sudo vi /etc/systemd/system/debezium-distributed-kafka.service

[Unit]
Description= Debezium Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-connect-distributed-kafka.service
sudo systemclt start debezium-connect-distributed-kafka.service

Như vậy là đã hoàn thành xong phần cài đặt và cấu hình Debezium và Kafka , tuy nó dài vậy nhưng khi thao tác nếu yêu cầu mạng download nhanh thì chưa tới 5p đã xong.


Microsoft SQL Server : 

Đây là phần data source, nơi để nhận data đưa nó vào Kafka, phần này thì câu hỏi bạn đặt ra đầu tiên sẽ như sau:

Hey, mày đang dùng MSSQL phiên bản nào vậy, nếu Microsoft SQL Server Express thì nghỉ chơi nhé, tao không làm bài này được đâu. Nếu cài bản cao hơn thì, Hey, mày có enable SQL Server Agent lên chưa, nếu chưa hãy enable nó lên nhé.

Tiếp theo bạn cần phải hiểu change data capture (CDC) là gì nếu bạn đang nắm phần MSSQL, nó đơn giản khái quát như sau : 
Dùng để thu thập dữ liệu thay đổi sử dụng SQL Server Agent để ghi nhật ký  thêm, xoá, sửa trong table, để làm chi, nó giúp bạn xác định data của bạn có thay đổi không, nếu nó thay đổi thì sẽ kích hoạt cập nhật lại data mới nhất từ MSSQL sang PostgreSQL.

Các lệnh cơ bản sau để giúp bạn bật tắt CDC:
Enable CDC:
EXEC sys.sp_cdc_enable_db;
Disable CDC
EXEC sys.sp_cdc_disable_db;
Giờ tôi có database tên : OPERATION và table là Aircraft tôi sẽ tiến hành như sau:

  EXEC sys.sp_cdc_enable_db;

  EXEC sys.sp_cdc_enable_table 
   @source_schema = N'dbo', 
   @source_name = N'Aircraft', 
   @supports_net_changes = 0;
   @role_name = NULL;
Cái chú ý ở đây là @supports_net_changes , tuỳ vào mục đích và hệ thống hiện tại của bạn chọn sao cho phù hợp.

1: Hiểu nôm na như là một bảng tóm tắt thông tin, chỉ cập nhật thông tin thực tế của dữ liệu, ví dụ bạn làm gì làm nó sẽ hiển thị kết quả thực và trong các phạm vi được chỉ định.
0: Nó sẽ ghi lại tất cả các thông tin bạn thêm , xoá sửa, chi tiết từng cái một, tuỳ vào mục đích của bạn cần dùng là gì, ở đây tôi sẽ để là 0, nếu thêm xoá sửa gì thì sẽ cập nhật hết.

Sau khi chạy lệnh trên sẽ có kết quả giống bên dưới, đối với CDC kết hợp với kafka này, khi đăng ký trên kafka lần đầu tiên khi nhận topics nó sẽ nhận hết trong các table, nó sẽ chụp lại 1 bản sao, còn theo dõi thì chỉ phụ thuộc vào cdc bật trên bảng nào.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

PostgreSQL Server: Cái này đơn giản là cung cấp cho một user và password có đủ quyền đọc, ghi, tạo table..., còn lại sẽ tự giải quyết cho bạn.

Quay lại phần Kafka, đăng ký connector như sau: 

Hiện tại đang dùng connect-distributed , đối với standalone cũng vậy chỉ cần cú pháp sau.

Tạo file đăng ký connector MSSQL

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "sqlserver",
        "database.hostname" : "192.168.1.3",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "dev@@123",
        "database.names" : "OPERATION",
        "schema.history.internal.kafka.bootstrap.servers" : "192.168.1.6:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.encrypt": "false"
    }
}

Sau khi tạo file chạy đăng ký

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @sql.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Như vậy đã có các topics tạo ra, như đã nói ở trên nó đã tạo ra các topics như vậy và khi nào có CDC bạn mới theo dõi real time được. Tương tự với postgres cũng tương tự như trên làm tương tự nha trên.

Tạo file đăng ký connector PostgreSQL:


{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "sqlserver.OPERATION.dbo.Aircraft",
        "connection.url": "jdbc:postgresql://192.168.1.5:5432/OPERATION",
        "connection.user": "root",
        "connection.password": "root",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "RegistrationID",
        "pk.mode": "record_key",
        "table.name.format": "Aircraft"
    }
}

Sau khi chạy xong nó sẽ tự hộ trợ tạo table và kiểu dữ liệu luôn cho bạn, để đảm bảo kiểu dữ liệu từ MSSQL sang PostgreSQL sẽ phù hợp.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @pos.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Thành quả sẽ như sau, nếu bạn có thay đổi bất cứ gì từ MSSQL thì bên PostgreSQL cũng sẽ thay đổi theo, quá trình này sẽ real time.



Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Nhãn: ,