Contents
Kafka
一、介绍
1、重要概念
- Broker: Kafka 的基本单元,负责存储和传递消息。一个 Kafka 集群由多个 Broker 组成。
- Topic: 消息的分类,每条消息都属于一个特定的 Topic。用户可以通过 Topic 来组织和管理消息。
- Producer: 消息的生产者,负责将消息发送到指定的 Topic。
- Consumer: 消息的消费者,从 Kafka 中读取消息的客户端。
- Consumer Group: 一组消费者,能够共同消费同一 Topic 的消息。每条消息只能被同一 Consumer Group 中的一个 Consumer 消费,但可以被多个不同的 Consumer Group 消费。
- Partition: 每个 Topic 可以分为多个 Partition,Partition 内部消息有序存储,并且可以分布在不同的 Broker 上,以实现负载均衡和高可用性。
2、使用场景
Kafka 的应用场景非常广泛,包括但不限于:
- 日志收集: 用于集中收集和处理来自不同服务的日志信息。
- 消息系统: 解耦生产者和消费者,支持异步通信。
- 用户活动追踪: 记录用户在网站或应用上的行为,以便进行实时分析。
- 运营指标监控: 收集和分析系统性能指标,实现实时监控。
- 流处理: 通过 Kafka 构建实时数据管道,与大数据处理框架(如 Spark 和 Flink)结合使用。
1)解藕
比如用于注册成功后,需要给用户发送邮件;如果用户注册的同时还需要处理给用户发送邮件等其它繁琐操作,一来代码会非常混乱(耦合太高)、二来接口流程会特别长(都放到一个接口里处理,响应会很慢)
2)缓冲(销峰平谷)
比如电商网站,遇到促销搞活动这种流量都会很高,而我们服务器的处理能力是有限的,如果把整个消息的消费都放在一起处理,会拖慢整个服务;我们常用的做法是,把一些耗时操作可以放到消息队列中,服务器按自己的节奏去处理消息,不致于一瞬间压垮服务器。
3、使用
1)安装
version: '3.8'
services:
kafka:
image: registry.cn-hangzhou.aliyuncs.com/serialt/cp-kafka:7.7.8
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
volumes:
- kafka_data:/var/lib/kafka/data
environment:
# ================= KRaft 模式 =================
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
# ================= 监听器配置 =================
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.16.1.53:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# ================= 单节点开发配置 =================
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-ui:
image: registry.cn-hangzhou.aliyuncs.com/serialt/kafka-ui:v1.4.2
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8089:8080"
environment:
DYNAMIC_CONFIG_ENABLED: "true"
KAFKA_CLUSTERS_0_NAME: confluent-local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
volumes:
kafka_data:
2)go 连接
生产者
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
// Kafka 地址
brokers := []string{"172.16.1.53:9092"}
topic := "test-topic"
// 配置
config := sarama.NewConfig()
config.Version = sarama.V3_6_0_0
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 5
config.Producer.Partitioner = sarama.NewHashPartitioner
// 创建同步生产者
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("创建生产者失败: %v", err)
}
defer producer.Close()
// 发送消息
for i := 0; i < 5; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka %d", i)),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("消息发送失败: %v", err)
continue
}
fmt.Printf("消息发送成功: partition=%d offset=%d\n", partition, offset)
time.Sleep(time.Second)
}
}
消费者
package main
import (
"context"
"fmt"
"log"
"github.com/IBM/sarama"
)
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
fmt.Println("消费者启动...")
return nil
}
func (ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("消费者关闭...")
return nil
}
func (ConsumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for message := range claim.Messages() {
fmt.Printf("收到消息: topic=%s partition=%d offset=%d key=%s value=%s\n",
message.Topic,
message.Partition,
message.Offset,
string(message.Key),
string(message.Value),
)
session.MarkMessage(message, "")
}
return nil
}
func main() {
brokers := []string{"172.16.1.53:9092"}
topic := "test-topic"
groupID := "test-group"
config := sarama.NewConfig()
config.Version = sarama.V3_6_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("创建消费者组失败: %v", err)
}
defer consumerGroup.Close()
handler := ConsumerGroupHandler{}
ctx := context.Background()
for {
err := consumerGroup.Consume(ctx, []string{topic}, handler)
if err != nil {
log.Fatalf("消费失败: %v", err)
}
}
}