Published on

[Apache Kafka][02] - Kafka Quickstart

Authors
  • avatar
    Name
    David Nguyen
    Twitter
Table of Contents

1. Cài đặt Kafka

Đầu tiên các bạn download file cài đặt Kafka tại đây

Có khá nhiều phiên bản, tại thời điểm mình download mới nhất là phiên bản 3.7.0. Các bạn có thể download các phiên bản cũ hơn nhưng không nên cũ quá.

structured blocks

Ở mỗi phiên bản lại có nhiều lựa chọn để download, trường hợp này mình sẽ cài đặt trực tiếp trên máy nên chọn phiên bản kafka-3.7.0-src.tgz như trong ảnh. Các bạn lưu ý chọn đúng phiên bản phù hợp với hệ điều hành.

Sau khi download, mình tiến hành giải nén như sau:

$ tar -xzf kafka_2.13-3.7.0.tgz
$ cd kafka_2.13-3.7.0

2. Thiết lập môi trường Kafka

Lưu ý: Máy của các bạn phải cài đặt phiên bản từ Java 8 trở lên để chạy được Kafka.

Kafka có thể được thiết lập thông qua ZooKeeper hoặc KRaft.

2.1 - Thiết lập Kafka thông qua ZooKeeper.

Các bạn di chuyển vào thư mục vừa giải nén file vừa rồi, trong trường hợp của mình là kafka_2.13-3.7.0 và chạy câu lệnh sau:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Sau đó mở thêm một cửa sổ dòng lệnh khác và chạy lệnh:

$ bin/kafka-server-start.sh config/server.properties

Đến đây chúng ta đã thành công thiết lập Kafka thông qua ZooKeeper.

2.2 - Thiết lập Kafka thông qua KRaft.

Với KRaft các bạn có thể thiết lập Kafka thông qua local scripts và những file đã download hoặc thông qua Docker image.

2.2.1 - Sử dụng các file đã download.

Các bạn chạy các lệnh sau:

Sinh ra Cluster UUID:

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Format thư mục logs:

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Thiết lập Kafka server:

$ bin/kafka-server-start.sh config/kraft/server.properties

2.2.2 - Sử dụng Docker image.

Tải Docker image.

$ docker pull apache/kafka:3.7.0

Chạy Kafka thông qua Docker container.

$ docker run -p 9092:9092 apache/kafka:3.7.0

Trên đây là nhũng cách để chúng ta thiết lập và chạy Kafka server. Bây giờ mình sẽ phần tiếp theo là tạo các topics trong Kafka.

3. Tạo Kafka Topics.

Như mình đã đề cập trong bài viết trước, Kafka là một distributed event streaming platform vì vậy khi làm việc với Kafka chúng ta có thể đọc, ghi, lưu trữ và xử lý các event (các bạn cũng có thể gọi là records, messages thay cho event trong tình huống phù hợp).

Vậy event là gì?

Event có thể là thông tin giao dịch (payment transactions), dữ liệu định vị (geolocation data), thông tin đơn hàng (shipping orders), các thông số cảm biến từ các thiết bị IoT...

Event sẽ được tổ chức (organized) và lưu trữ (stored) trong các topics.

Có thể so sánh topics tương tự như các thư mục (folders) trong khi events tương tự như các files trong folder đó.

Để tạo một topic các bạn chạy lệnh sau:

$ bin/kafka-topics.sh --create --topic test-events --bootstrap-server localhost:9092
Created topic test-events.

Vậy là một topic có tên test-events được tạo. Nếu các bạn muốn xem thông của topic vừa được tạo thì có thể chạy lệnh:

$ bin/kafka-topics.sh --describe --topic test-events --bootstrap-server localhost:9092
Topic: test-events	TopicId: T3QwQYOkSOO_U1a_ERVsoQ	PartitionCount: 1	ReplicationFactor: 1	Configs:
	Topic: test-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

4. Ghi events

Một Kafka client giao tiếp với Kafka brokers qua mạng để ghi hoặc đọc các events. Trong trường hợp ghi các events thì brokers sẽ lưu các events đó một cách an toàn trong khoảng thời gian được cấu hình hoặc thậm là mãi mãi.

Để ghi các events vào topic các bạn chạy lệnh sau:

$ bin/kafka-console-producer.sh --topic test-events --bootstrap-server localhost:9092       ✘ INT 3s
>I'm learning about the Kafka topics...
>And this is command to write some event to the topic test-events

Ở đây, mình đã ghi hai events vào topic test-events. Mặc định thì mỗi lần mình nhấn Enter sẽ được tính là một event. Các bạn có thể bấm Ctrl+C để dừng việc ghi events từ producer client.

5. Đọc events

Ghi được rồi, bây giờ chúng ta sẽ đọc các events vừa ghi. Bên trên để ghi chúng ta sử dụng một producer client, bây giờ để đọc chúng ta sử dụng một consumer client. Hai khái niệm Producer và Consumer mình cũng đã giải thích trong bài trước rồi.

Để đọc các events từ một topic các bạn dùng lệnh:

$ bin/kafka-console-consumer.sh --topic test-events --from-beginning --bootstrap-server localhost:9092
I'm learning about the Kafka topics...
And this is command to write some event to the topic test-events

Vậy là consumer client đã đọc được hai events mình ghi ở bước trước. Bây giờ các bạn có thể test thêm bằng cách ghi thêm một event và xem consumer client có đọc được event mới thêm không, ví dụ như sau:

$ bin/kafka-console-producer.sh --topic test-events --bootstrap-server localhost:9092
>Now I'm testing how to read event from Kafka topics
$ bin/kafka-console-consumer.sh --topic test-events --from-beginning --bootstrap-server localhost:9092              ✘ INT
I'm learning about the Kafka topics...
And this is command to write some event to the topic test-events
Now I'm testing how to read event from Kafka topics

Vậy là chúng ta đã biết làm sao để tạo ra một topics, làm sao để ghi và đọc events từ một topics cụ thể trong Kafka. Tiếp mình sẽ cùng các bạn sử dụng Kafka connect để import/export dữ liệu.

6. Import/Export dữ liệu thông qua Kafka Connect.

Trong thực tế dữ liệu đến từ rất nhiều nguồn khác nhau ví dụ từ các cơ sở dữ liệu quan hệ hoặc từ một hệ thống messenging truyền thống, cùng với đó là có rất nhiều ứng dụng, dịch vụ trong hệ thống tiêu thụ những nguồn dữ liệu đó.

Kafka Connect là một công cụ cho phép chúng ta liên tục ghi dữ liệu từ một hệ thống bên ngoài vào Kafka và ngược lại. Có thể nói Kafka Connect được sinh ra để giúp cho việc tương tác, tích hợp với các hệ thống bên ngoài dễ dàng hơn.

Trong bài viết này mình sẽ cùng các bạn thực hiện một ví dụ đơn giản sử dụng Kafka Connect để đọc dữ liệu từ một file nguồn (source file) vào topic và ghi dữ liệu từ topic đó ra một file đích (target file)

6.1. Cấu hình

Các bạn mở file: config/connect-standalone.properties và cập nhật tham số plugin.path như sau:

$ plugin.path=libs/connect-file-3.7.0.jar

Lưu ý: Ở đây mình sử dụng đường dẫn tương đối libs/connect-file-3.7.0.jar vì mình đang chạy Kafka ngay tại thư mục cài đặt (thư mục mình download ở phần 1). Tuy nhiên ở môi trường production các bạn nên sử dụng dường dẫn tuyệt đối tránh trường hợp không tìm thấy file.

Tiếp theo, tạo file nguồn (source file) để đọc dữ liệu và Kafka topic.

$ echo -e "message_1\nmessage_2\nmessage_3" > test.txt

Ở đây mình tạo một file .txt và thêm vào đó dữ liệu với ba message: message_1, message_2, message_3

6.2. Đọc/Ghi dữ liệu qua Kafka Connect.

Để khởi chạy các connectors sẽ có hai chế độ là standalonedistributed. Trong bài viết này mình sẽ chạy ở chế độ standalone, distributed thường chạy ở môi trường product.

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Có 3 tham số là 3 file trong câu lệnh trên:

  • config/connect-standalone.properties: là file dùng để cấu hình Kafka Connect, bao gồm các cấu hình chung.

  • config/connect-file-source.properties và config/connect-file-sink.properties : là các file cấu hình cho file nguồn (đọc dữ liệu) và file đích (ghi dữ liệu)

Các bạn có thể mở hai file này vào thay đổi thông tin cấu hình file nguồn, tên topic.

Sau khi chạy xong câu lệnh trên, các bạn có thể kiểm tra file đích xem đã được ghi dữ liệu vào chưa (file đích sẽ tự động được tạo ra theo cấu hình trong file config/connect-file-sink.properties)

$ more test.sink.txt
message_1
message_2
message_3

Vậy là dữ liệu đã được ghi vào file đích.

Lưu ý: Ở đây bản chất dữ liệu vẫn đươc ghi vào Kafka topic, trong trường hợp này topic là connect-test (định nghĩ trong file config/connect-file-source.properties) nên chúng ta hoàn toàn có thể đọc trực tiếp dữ liệu từ topic này thông qua các Kafka Consumer client.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"message_1"}
{"schema":{"type":"string","optional":false},"payload":"message_2"}
{"schema":{"type":"string","optional":false},"payload":"message_3"}

Các bạn có thể thêm bất kỳ message nào vào file test.txt ban đầu, Kafka Connect sẽ ngay lập tức đọc dữ liệu, ghi vào connect-test topic, và ghi ra file test.sink.txt.

$ echo message_4 >> test.txt
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"message_1"}
{"schema":{"type":"string","optional":false},"payload":"message_2"}
{"schema":{"type":"string","optional":false},"payload":"message_3"}
{"schema":{"type":"string","optional":false},"payload":"message_4"}

7. Tham khảo

Hẹn gặp lại các bạn trong các bài viết tiếp theo.