Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

 Hôm nay rãnh rỗi ngồi chém gió về Metrics, Tracing, Logging qua ví dụ cụ thể, qua đây sẽ giúp hình dung ra nó là cái gì, điều này sẽ giúp ích cho bạn khá nhiều.


Nói về ba phương thức này thực sự nó rất là nhiều, xem như nó là một keyword để bạn tìm hiểu sau, mục đích của bài viết này giúp bạn có cái nhìn khái quát hơn, hình dung ra một bức tranh khái quát và tổng thể hơn.

Metrics

Như câu chuyện xóm tôi mùa UEFA Euro 2024, bọn trộm chó và trộm gà đá có một chiếc xe Exciter dùng đi ăn trộm. Bọn trộm muốn biết chiếc xe tụi nó chạy ok không, nó sẽ theo dõi các chỉ số như: 

Tốc độ xe khi chạy bao nhiêu, sau một cuốc thì tốn bao nhiêu xăng, khi bị người dân rượt đuổi thì chạy nhanh động cơ, nhông sên đĩa sẽ co giãn nhiều không, máy như thế nào... 

Tương tự trong phần mềm cũng vậy, Metrics giúp theo dõi số lượng request lớn hay nhỏ, CPU xử lý nhanh hay chậm, thời gian phản hồi bao lâu, hệ thống sẽ sữ lý như thế nào... 

Đó là nhiệm vụ cơ bản của Metrics, giúp bạn theo dõi các metrics để phát hiện một Metrics nào đó bất thường, bạn có thể biết rằng có ổn không và cần phải kiểm tra hay không. Thông qua đó bạn có thể xác định được nguyên nhân và đưa ra chách khắc phục. 

Thông thường theo dõi metrics theo thời gian, bạn có thể xác định các điểm tắt nghẽn và tối ưu hóa hệ thống để hoạt động hiệu quả hơn. Dữ liệu metrics giúp bạn dự đoán nhu cầu trong tương lai và lên kế hoạch nâng cấp hệ thống phù hợp.


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Tracing :

Như ví dụ trên, ngôi nhà của bạn bao gồm hàng rào xung quanh bao bọc kỹ càng lưới B40 ... như một hệ thống của bạn. Xung quanh nhà bạn setup các chuồng gà đá, chuồng chó, khu vực chó đi tới đi lui, những khu vực này xem như một service trong một hệ thống.

Bọn trộm chó, trộm gà khi nhảy vào hàng rào nha bạn trước tiên bọn nó sẽ đi tìm từng ngõ ngách xung quanh nhà để đi tìm chuồng nào nhốt gà, chuồng nào nhốt chó ... Khi nó đi tới nơi nào giống như người dùng tiếp cận một service, tracing cũng giống như bạn gắn camera để  giám sát biết rằng thằng trộm nó đi tới chuồng gà lúc mấy giờ, tới chuồng chó lúc mấy giờ, lúc nó đi, nó bắt con gà, con chó nào ...

Thông qua đây bạn sẽ biết được thời gian, hành vi nó như thế nào, lục lọi tìm những gì ... Thông qua tracing ta có thể phân tích được hành vì như thằng này nó thích tập trung bắt nhiều gà trước hay bắt chó trước... Tương tự như phần mềm ta sẽ biết được service nào dùng nhiều nhất, service nào xử lý lâu nhất hoặc cái nào có vấn đề... Tóm lại Tracing sẽ giúp bạn theo dõi toàn bộ hành trình của request yêu cầu về hệ thống, từ dich vụ này đến dịch vụ khác trên hệ thống và ghi lại chi tiết điều đó.

Logging :

Có nhiệm vụ lưu lại các sự kiện xảy ra trong hệ thống của bạn, giống như bọ trộm gà trộm chó đến chuồng gà , chuôngg chó là một sự kiện, cạy khoá chuồng gà, chuồng chó là một sự kiện nó sẽ lưu lại thông tin cụ thể. Mọi thứ được ghi lại cụ thể theo từng tiến trình, quá trình thao tác nhằm mục đích xác định các hành vi, thao tác đã xảy ra...

Logging là quá trình ghi lại các sự kiện xảy ra trong hệ thống phần mềm để hỗ trợ giám sát, phân tích, và khắc phục sự cố. Thông qua bon trộm gà trômh chó  chúng ta có thể thấy cách logging giúp theo dõi các cách thức hoạt động và cung cấp dữ liệu cần thiết để duy trì và cải thiện hệ thống

OK, bạn cứ hiểu nôm na như vậy, hôm nay sẽ dùng những thứ sau để thực hiện bài này :

Code: Tôi sẽ dùng nestjs để tạo ra api mẫu để client gởi request

Metrics: Sẽ dùng Prometheus Server và thư viện @willsoto/nestjs-prometheus cho nestjs

Tracing: Sẽ dùng thư viện OpenTelemetry và Jaeger xem thông tin ở local

Logging: Sẽ dùng Loki server để lưu logs, và loki có api hỗ trợ post metho, quá đây sẽ post logs đẩy lên Loki.

Monitoring : Tổng hợp mọi thứ lại để xem trực quan trên Grafana, thông qua Grafana sẽ chia sẽ mọi người sử dụng tổng hợp mọi thứ ở trên để trực quan hơn.


Phần code : 

Tối có tạo project nestjs đơn giản sau, gọi phương thức Get với 2 api /tromcho và /tromga


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Trong code tôi sử dụng libs opentelemetry(Otel) để tracing và exporter thông tin đến jaeger


"@opentelemetry/exporter-jaeger": "^1.25.0",
"@opentelemetry/instrumentation-express": "^0.40.1",
"@opentelemetry/instrumentation-http": "^0.52.0",
"@opentelemetry/instrumentation-nestjs-core": "^0.38.0",
"@opentelemetry/resources": "^1.25.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "^1.25.0",
"@opentelemetry/semantic-conventions": "^1.25.0",


import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { NodeSDK } from "@opentelemetry/sdk-node";
import * as process from "process";
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express";
import { NestInstrumentation } from "@opentelemetry/instrumentation-nestjs-core";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
require("dotenv").config();

const jaegerExporter = new JaegerExporter({
  endpoint: `http://${process.env.JAEGER_EXPORT_URL}/api/traces`,
});

const traceExporter = jaegerExporter;

export const otelSDK = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: `nestjs-otel`,
  }),
  spanProcessor: new SimpleSpanProcessor(traceExporter),
  instrumentations: [
    new HttpInstrumentation(),
    new ExpressInstrumentation(),
    new NestInstrumentation(),
  ],
});

process.on("SIGTERM", () => {
  otelSDK
    .shutdown()
    .then(
      () => console.log("SDK shut down successfully"),
      (err) => console.log("Error shutting down SDK", err)
    )
    .finally(() => process.exit(0));
});

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì



Sử dụng libs nestjs-prometheus để tạo metrics theo dõi server app để đẩy lên Prometheus

"@willsoto/nestjs-prometheus": "^6.0.1",


import { Controller, Get, Res } from "@nestjs/common";
import { PrometheusController } from "@willsoto/nestjs-prometheus";

@Controller("metrics")
export class PromController extends PrometheusController {
  @Get()
  metrics(@Res({ passthrough: true }) response: Response) {
    return super.index(response);
  }
}


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì



Trên Loki có một api cho sử dụng post method và sử dụng axios để đẩy logs lên Loki, ở đây sẽ bắt 2 sự kiện, thông tin của controller, có nghĩa mọi thông tin của controller sẽ bắt logs khi gọi controller và đẩy nó vào Loki.

Cái thứ 2 sẽ custom logs, dùng nó định nghĩa cho một logs nào đó cần dùng, thường như service bắt lỗi hay gì đó cần dùng lưu logs sẽ dùng nó.

import axios from "axios";
import {
  Injectable,
  NestInterceptor,
  ExecutionContext,
  CallHandler,
  Logger,
} from "@nestjs/common";
import { Observable } from "rxjs";
import { tap } from "rxjs/operators";
import { v4 as uuidv4 } from "uuid";
require("dotenv").config();
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  private readonly logger = new Logger(LoggingInterceptor.name);
  httpService: any;

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    if (context.getType() === "http") {
      return this.logHttpCall(context, next);
    }
  }

  private logHttpCall(context: ExecutionContext, next: CallHandler) {
    const request = context.switchToHttp().getRequest();
    const userAgent = request.get("user-agent") || "";
    const { ip, method, url, body, query, params } = request;
    const correlationKey = uuidv4();
    const userId = request.user?.id || 'anonymous';
    const className = context.getClass().name;
    const handlerName = context.getHandler().name;

    this.logger.log(
      `[${correlationKey}] ${userAgent} ${ip} ${method} ${url} ${JSON.stringify(body)} ${JSON.stringify(query)} ${JSON.stringify(params)} ${userId}`
    );
    const now = Date.now();
    return next.handle().pipe(
      tap(async () => {
        const response = context.switchToHttp().getResponse();
        const { statusCode } = response;

        const logData = `[${correlationKey}] ${userAgent} ${ip} ${method} ${url} ${JSON.stringify(body,)} ${JSON.stringify(query)} ${JSON.stringify(params)} ${userId}  ${Date.now() - now}ms`

        await this.pushToLoki(logData,className,handlerName);
      })
    );
  }

  private async pushToLoki(logData: any,contextStream: any, traceStream: any) {
    try {
      const lokiData = {
        streams: [
          {
            stream: {
              env: "nestjs",
              context: contextStream,
              trace: traceStream,
            },
            values: [[(Date.now() * 1e6).toString(), logData]],
          },
        ],
      };

      await axios.post(`http://${process.env.LOKI_SERVER_URL}/loki/api/v1/push`, lokiData);
      this.logger.log("Log sent to Loki server successfully.");
    } catch (error) {
      this.logger.error("Error sending log to Loki server:", error);
    }
  }
}

import { Injectable, Logger } from "@nestjs/common";
import axios from "axios";
require("dotenv").config();
@Injectable()
export class LokiLogger {
  constructor(private readonly logger: Logger) {}

  error(message: any, trace?: string, context?: string) {
    this.sendToLoki("error", message, trace, context);
  }

  warn(message: any, context?: string) {
    this.sendToLoki("warn", message, undefined, context);
  }

  log(message: any, context?: string) {
    this.sendToLoki("info", message, undefined, context);
  }

  private async sendToLoki(
    level: string,
    message: any,
    trace?: string,
    context?: string
  ) {
    try {
      const lokiData = {
        streams: [
          {
            stream: {
              env: "nestjs",
              level: level,
              context: context,
              trace: trace,
            },
            values: [[(Date.now() * 1e6).toString(), message]],
          },
        ],
      };

      await axios.post(`http://${process.env.LOKI_SERVER_URL}/loki/api/v1/push`, lokiData);
      this.logger.log(`[${level}] Log sent to Loki server successfully.`);
    } catch (error) {
      this.logger.error("Error sending log to Loki server:", error);
    }
  }
}

Tất cả ở trên từ Jarger đến prometheus mục đích test local để xem logic đúng không, giờ cần phải đẩy lên Grafana để quản lý tập trung, vì nó bao gồm cả user/pass đăng nhập nên việc này sẽ thuận tiện hơn.

Data sources: Chọn đúng loại data sources cần dùng, như bài này sẽ dùng như sau, cứ add và cung cấp đúng thông tin sẽ nhận


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Dashboards:  Hiện tại sử dụng Dashboards ID : 11159 có tên NodeJS Application Dashboard thấy cũng khá ok.

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Sau khi đã tích hợp Logging và Tracing thì Explore sẽ như sau


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Và Tracing nó sẽ như sau

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Đây là link project demo, có tham khảo link sau:


https://github.com/tieukhachngao/Metrics-Tracing-Logging


Nhãn:

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

 Hôm nay rãnh rỗi sẽ viết một bài liên quan đến Debezium và Kafka, mục đích của bài này là sử dụng Source connector và Sink connector để đẩy dữ liều từ MSSQL sang PostgreSQL theo thời gian thực.

Trước tiên thực hiện bài này nên đọc qua trước Debezium và Kafka gì, mục đích của nó là gì tại đây:

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)
https://debezium.io/
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


https://debezium.io/


Còn đối với tôi thì đơn giản là Debezium là công cụ dùng để lắng nghe các thay đổi dữ liệu từ các cơ sở dữ liệu như thêm xoá sửa ... (Change Data Capture (CDC))

Kafka là một nền tảng phân tán dùng để xử lý các luồng dữ liệu theo thời gian thực.

Ở bài viết này tôi sẽ cài đặt và cấu hình trên VM, tôi sẽ dùng openSUSE Leap 15, tại vì sao tôi lại cài trên này ? Đơn giản khi bạn chịu khó làm thủ công đến khi bạn hiểu rõ bản chất của nó sau này sẽ hỗ trợ bạn khá là nhiều.

Bài lab này sẽ có 3 server sau :

openSUSE Leap Debezium và Kafka : 192.168.1.6 - Dùng để cài Leap Debezium và Kafka

MSSQL: 192.168.1.3 - Chứa data chính

PostgreSQL : 192.168.1.5 - Chứa data lấy từ data MSSQL


Tôi có kịch bản như sau : Bên phần server MSSQL nó phục vụ cho một công việc nào đó ABC trong đó có một table tên là dbo.Aircraft nó phục vụ cho một công việc nào đó, giờ câu hỏi đặt ra là có một bên dùng PostgreSQL muốn có bảng public.Aircraft với nội dụng giống vậy và nó được cập nhật theo thời gian thực, có nghĩa là bên MSSQL làm gì thay đổi gì ở table này thì bên PostgreSQL cũng thay đổi theo và nó phải chạy theo thời gian thực. 

Mô hình như sau :

MSSQL ==connector===> Debezium và Kafka ===Sink==> PostgreSQL


Cài đặt Debezium và Kafka:

Java Openjdk : Debezium và Kafka chạy trên nền java nên đầu tiên cần phải cài java jdk trước    

sudo zypper install java-11-openjdk


Kafka : 

Hãy theo dõi link bên dưới để tim kiếm phiên bản muốn cài.

https://archive.apache.org/dist/kafka/

Hiện tại kafka_2.12-3.7.0 là mới nhất nên sẽ download bản này về cài, mới nhất ở hiện tại, sau khi kéo về xong , giải nén ra và cho vào /usr/local/kafka nơi này sẽ là nơi chúng ta làm việc chính.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -xzf kafka_2.12-3.7.0.tgz
mv kafka_2.12-3.7.0 /usr/local/kafka


Zookeeper :

Sau khi tải các gói về đầy đủ thì bên trong Kafka bao gồm luôn cả ZookeeperKafka sử dụng Zookeeper để quản lý thông tin về các brokers, topics và Partition. Zookeeper giúp duy trì tính nhất quán và đồng bộ giữa các brokers trong cụm nên sau khi đã tải về nên cần chạy đúng các quy trình sau.

Đầu tiên để lên được kafka cần chạy Zookeeper trước, sau khi nó chạy hoàn tất sẽ tiến hành chạy Kafka, chỉ cần 2 bước này thì đã hoàn tất quá trình để  có được server Kafka.

Chạy Zookeeper trước : 

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Chỉnh sửa vài thông tin cho Kafka, ở đây tôi muốn dùng bằng IP nên sẽ chỉnh sửa một số thứ như sau :
Thói quen của tôi hay chỉ định IP cho server cụ thể nên sẽ fix cứng nó là IP. Và port mặc định nó sẽ chạy 9092.
cd /usr/local/kafka
vi config/server.properties
listeners=PLAINTEXT://192.168.1.6:9092

Sau khi chỉnh sửa xong bắt đầu chạy Kafka

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Debezium : 

Đây là phần quan trọng và khá là thú vị, trong Debezium nó có hỗ trợ nhiều phần, ở đây tôi dùng Kafka nên cần sử dụng phần Debezium Connectors.

Nói về Debezium Connectors  thì bạn cần hiểu nó dùng để theo dõi database  thông qua change data capture (CDC), nhiệm vụ chính của nó là như vậy.

Một lưu ý có hai lựa chọn như sau :

Connect Standalone : Một là nếu bạn muốn chạy connect-standalone.properties thì sẽ không cần bận tâm gì nhiều, chỉ cần tải các plugin sau về và copy nó vào Kafka lib, lưu ý là connect-standalone nhé. Khi bạn tải về copy bỏ vào là xem như bạn đã giải quyết xong phần này và chạy lệnh sau là đã hoàn thành.

Làm như thế nào là chạy một lần không sử dụng được plugin.path , plugin.path là gì ? ý tôi muốn phân ra từng cụm các plugins để dễ quản lý, muốn dùng plugin nào tách biệt ra để dễ quản lý. 

Phần connect-standalone này thì tôi không giải quyết được, chỉ copy mọi thứ vào Kafka libs để chạy nên không dùng được plugin.path,  khi cấu hình plugin.path mọi thứ ok nhưng chỉ phần connect ok còn sink thì vẫn bị 500.


wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-sqlserver/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-postgres/* /usr/local/kafka/libs


Ở trên là đang copy 3 plugins mssql, postgress connector và jdbc connector bao gồm sink vào libs kafka, đơn giản là vậy thôi.


cd /usr/local/kafka
vi config/connect-standalone.properties
edit : bootstrap.servers=192.168.1.6:9092

bin/connect-standalone.sh config/connect-standalone.properties
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Connect Distributed : 

Để giải quyết vấn đề quản lý được các plugins và sử dụng plugin.path tôi cần dùng connect-distributed, nó có nghĩa là nếu tôi muốn dùng connector nào thì tôi bỏ nó vào một folder, sau này cần nâng cấp phiên bản hay gì tôi chỉ vào đó giải quyết, vì file khá là nhiều nếu ném tất cả vào kafka libs thì đối với bản thân tôi thấy khá là kì kì, tôi không muốn tiếp cận vấn đề như vậy.

Giải quyết việc này khá đơn giản như sau:

Đầu tiền trong dự án Kafka tôi sẽ tạo một folder connector, tôi sẽ kéo các plugin cần thiết vào đó ví dụ như debezium-connector-postgres để kết nối postgress,  hoặc debezium-connector-sqlserver để kết nối mssql hoặc confluentinc-kafka-connect-jdbc-10.7.6 để sink data tới postgress ...  vân vân.

Ở bài nào này chỉ cần confluentinc-kafka-connect-jdbc với debezium-connector-sqlserver là đủ nhưng tôi sẽ thêm debezium-connector-postgres để nó có tính tường minh dễ hiểu hơn.

cd /usr/local/kafka
vi config/connect-distributed.properties
edit bootstrap.servers=192.168.1.6:9092 Add: plugin.path=/usr/local/kafka/plugins,

Trong connect-distributed.properties chỉ cần chỉnh sửa lại thông tin và thêm dòng plugin.path như trên, vào việc thứ 2 là tôi muốn các plugin theo sự quản lý của tôi, mọi thứ phân ra rõ ràng để vào thư mục connector trong dự án.

Lưu ý phần này libs là mặc định củ của kafka libs và không thêm bất cứ cái gì, ở đây xem như bước đầu tiên và hay bỏ qua cách làm trên của connect-standalone.

wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/lib/* /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-sqlserver/* /usr/local/kafka/connector/debezium-connector-sqlserver
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-postgres/* /usr/local/kafka/connector/debezium-connector-postgres

Sau khi mọi thứ đã copy tách biệt ra rồi, cái quan trọng là sử dụng Symbolic link để link plugin với các thư mục trên.

ln -s /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc /usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-sqlserver//usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-postgres/ /usr/local/kafka/plugins/

Vậy là xong và chậy lệnh sau là hoàn thành

cd /usr/local/kafka
bin/connect-distributed.sh config/connect-distributed.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Đối với cách này nó sẽ tạo cho bạn connect-cluster, khi nào đăng ký connect thì có thể vào web như trên để kiểm tra, có nghĩa là chưa đăng ký nó sẽ rỗng.

Sau khi mọi thứ tôi chắc chắn đã chạy ngon lành rồi thì sẽ cần config service để sau này có thể chạy nó lên không cần phải mở terminal để chạy từng cái một nưa.

Service Zookeeper:

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description= Zookeeper
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable zookeeper.service
sudo systemclt start zookeeper.service

Service Kafka:

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description= Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemclt enable kafka.service
sudo systemclt start kafka.service

Service Debezium Kafka Standalone : (Lưu ý chỉ chọn 1 trong 2 giữa Standalone và Distributed, 2 cái này sẽ chung một port 8083)

sudo vi /etc/systemd/system/debezium-standalone-kafka.service

[Unit]
Description= Debezium Standalone Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-standalone-kafka.service
sudo systemclt start debezium-standalone-kafka.service

Service Debezium Kafka Distributed :

sudo vi /etc/systemd/system/debezium-distributed-kafka.service

[Unit]
Description= Debezium Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-connect-distributed-kafka.service
sudo systemclt start debezium-connect-distributed-kafka.service

Như vậy là đã hoàn thành xong phần cài đặt và cấu hình Debezium và Kafka , tuy nó dài vậy nhưng khi thao tác nếu yêu cầu mạng download nhanh thì chưa tới 5p đã xong.


Microsoft SQL Server : 

Đây là phần data source, nơi để nhận data đưa nó vào Kafka, phần này thì câu hỏi bạn đặt ra đầu tiên sẽ như sau:

Hey, mày đang dùng MSSQL phiên bản nào vậy, nếu Microsoft SQL Server Express thì nghỉ chơi nhé, tao không làm bài này được đâu. Nếu cài bản cao hơn thì, Hey, mày có enable SQL Server Agent lên chưa, nếu chưa hãy enable nó lên nhé.

Tiếp theo bạn cần phải hiểu change data capture (CDC) là gì nếu bạn đang nắm phần MSSQL, nó đơn giản khái quát như sau : 
Dùng để thu thập dữ liệu thay đổi sử dụng SQL Server Agent để ghi nhật ký  thêm, xoá, sửa trong table, để làm chi, nó giúp bạn xác định data của bạn có thay đổi không, nếu nó thay đổi thì sẽ kích hoạt cập nhật lại data mới nhất từ MSSQL sang PostgreSQL.

Các lệnh cơ bản sau để giúp bạn bật tắt CDC:
Enable CDC:
EXEC sys.sp_cdc_enable_db;
Disable CDC
EXEC sys.sp_cdc_disable_db;
Giờ tôi có database tên : OPERATION và table là Aircraft tôi sẽ tiến hành như sau:

  EXEC sys.sp_cdc_enable_db;

  EXEC sys.sp_cdc_enable_table 
   @source_schema = N'dbo', 
   @source_name = N'Aircraft', 
   @supports_net_changes = 0;
   @role_name = NULL;
Cái chú ý ở đây là @supports_net_changes , tuỳ vào mục đích và hệ thống hiện tại của bạn chọn sao cho phù hợp.

1: Hiểu nôm na như là một bảng tóm tắt thông tin, chỉ cập nhật thông tin thực tế của dữ liệu, ví dụ bạn làm gì làm nó sẽ hiển thị kết quả thực và trong các phạm vi được chỉ định.
0: Nó sẽ ghi lại tất cả các thông tin bạn thêm , xoá sửa, chi tiết từng cái một, tuỳ vào mục đích của bạn cần dùng là gì, ở đây tôi sẽ để là 0, nếu thêm xoá sửa gì thì sẽ cập nhật hết.

Sau khi chạy lệnh trên sẽ có kết quả giống bên dưới, đối với CDC kết hợp với kafka này, khi đăng ký trên kafka lần đầu tiên khi nhận topics nó sẽ nhận hết trong các table, nó sẽ chụp lại 1 bản sao, còn theo dõi thì chỉ phụ thuộc vào cdc bật trên bảng nào.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

PostgreSQL Server: Cái này đơn giản là cung cấp cho một user và password có đủ quyền đọc, ghi, tạo table..., còn lại sẽ tự giải quyết cho bạn.

Quay lại phần Kafka, đăng ký connector như sau: 

Hiện tại đang dùng connect-distributed , đối với standalone cũng vậy chỉ cần cú pháp sau.

Tạo file đăng ký connector MSSQL

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "sqlserver",
        "database.hostname" : "192.168.1.3",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "dev@@123",
        "database.names" : "OPERATION",
        "schema.history.internal.kafka.bootstrap.servers" : "192.168.1.6:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.encrypt": "false"
    }
}

Sau khi tạo file chạy đăng ký

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @sql.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Như vậy đã có các topics tạo ra, như đã nói ở trên nó đã tạo ra các topics như vậy và khi nào có CDC bạn mới theo dõi real time được. Tương tự với postgres cũng tương tự như trên làm tương tự nha trên.

Tạo file đăng ký connector PostgreSQL:


{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "sqlserver.OPERATION.dbo.Aircraft",
        "connection.url": "jdbc:postgresql://192.168.1.5:5432/OPERATION",
        "connection.user": "root",
        "connection.password": "root",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "RegistrationID",
        "pk.mode": "record_key",
        "table.name.format": "Aircraft"
    }
}

Sau khi chạy xong nó sẽ tự hộ trợ tạo table và kiểu dữ liệu luôn cho bạn, để đảm bảo kiểu dữ liệu từ MSSQL sang PostgreSQL sẽ phù hợp.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @pos.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Thành quả sẽ như sau, nếu bạn có thay đổi bất cứ gì từ MSSQL thì bên PostgreSQL cũng sẽ thay đổi theo, quá trình này sẽ real time.



Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Nhãn: ,