Contents

Kafka

一、介绍

1、重要概念

  • Broker: Kafka 的基本单元,负责存储和传递消息。一个 Kafka 集群由多个 Broker 组成。
  • Topic: 消息的分类,每条消息都属于一个特定的 Topic。用户可以通过 Topic 来组织和管理消息。
  • Producer: 消息的生产者,负责将消息发送到指定的 Topic。
  • Consumer: 消息的消费者,从 Kafka 中读取消息的客户端。
  • Consumer Group: 一组消费者,能够共同消费同一 Topic 的消息。每条消息只能被同一 Consumer Group 中的一个 Consumer 消费,但可以被多个不同的 Consumer Group 消费。
  • Partition: 每个 Topic 可以分为多个 Partition,Partition 内部消息有序存储,并且可以分布在不同的 Broker 上,以实现负载均衡和高可用性。

2、使用场景

Kafka 的应用场景非常广泛,包括但不限于:

  • 日志收集: 用于集中收集和处理来自不同服务的日志信息。
  • 消息系统: 解耦生产者和消费者,支持异步通信。
  • 用户活动追踪: 记录用户在网站或应用上的行为,以便进行实时分析。
  • 运营指标监控: 收集和分析系统性能指标,实现实时监控。
  • 流处理: 通过 Kafka 构建实时数据管道,与大数据处理框架(如 Spark 和 Flink)结合使用。

1)解藕

比如用于注册成功后,需要给用户发送邮件;如果用户注册的同时还需要处理给用户发送邮件等其它繁琐操作,一来代码会非常混乱(耦合太高)、二来接口流程会特别长(都放到一个接口里处理,响应会很慢)

2)缓冲(销峰平谷)

比如电商网站,遇到促销搞活动这种流量都会很高,而我们服务器的处理能力是有限的,如果把整个消息的消费都放在一起处理,会拖慢整个服务;我们常用的做法是,把一些耗时操作可以放到消息队列中,服务器按自己的节奏去处理消息,不致于一瞬间压垮服务器。

3、使用

1)安装

version: '3.8'

services:
  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/serialt/cp-kafka:7.7.8
    container_name: kafka
    hostname: kafka
    ports:
      - "9092:9092"
    volumes:
      - kafka_data:/var/lib/kafka/data
    environment:
      # ================= KRaft 模式 =================
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093

      # ================= 监听器配置 =================
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.16.1.53:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      # ================= 单节点开发配置 =================
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  kafka-ui:
    image: registry.cn-hangzhou.aliyuncs.com/serialt/kafka-ui:v1.4.2
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8089:8080"
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: confluent-local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

volumes:
  kafka_data:

2)go 连接

生产者

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	// Kafka 地址
	brokers := []string{"172.16.1.53:9092"}
	topic := "test-topic"

	// 配置
	config := sarama.NewConfig()
	config.Version = sarama.V3_6_0_0
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true
	config.Producer.Retry.Max = 5
	config.Producer.Partitioner = sarama.NewHashPartitioner

	// 创建同步生产者
	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		log.Fatalf("创建生产者失败: %v", err)
	}
	defer producer.Close()

	// 发送消息
	for i := 0; i < 5; i++ {
		msg := &sarama.ProducerMessage{
			Topic: topic,
			Key:   sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
			Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka %d", i)),
		}

		partition, offset, err := producer.SendMessage(msg)
		if err != nil {
			log.Printf("消息发送失败: %v", err)
			continue
		}

		fmt.Printf("消息发送成功: partition=%d offset=%d\n", partition, offset)
		time.Sleep(time.Second)
	}
}

消费者

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/IBM/sarama"
)

type ConsumerGroupHandler struct{}

func (ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	fmt.Println("消费者启动...")
	return nil
}

func (ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	fmt.Println("消费者关闭...")
	return nil
}

func (ConsumerGroupHandler) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error {
	for message := range claim.Messages() {
		fmt.Printf("收到消息: topic=%s partition=%d offset=%d key=%s value=%s\n",
			message.Topic,
			message.Partition,
			message.Offset,
			string(message.Key),
			string(message.Value),
		)
		session.MarkMessage(message, "")
	}
	return nil
}

func main() {
	brokers := []string{"172.16.1.53:9092"}
	topic := "test-topic"
	groupID := "test-group"

	config := sarama.NewConfig()
	config.Version = sarama.V3_6_0_0
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		log.Fatalf("创建消费者组失败: %v", err)
	}
	defer consumerGroup.Close()

	handler := ConsumerGroupHandler{}
	ctx := context.Background()

	for {
		err := consumerGroup.Consume(ctx, []string{topic}, handler)
		if err != nil {
			log.Fatalf("消费失败: %v", err)
		}
	}
}