hale
发布于 2023-08-12 / 39 阅读 / 0 评论 / 0 点赞

Kafka的简单使用

kafka简介:

Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。 它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。 它主要用于处理消费者规模网站中的所有动作流数据。动作指(网页浏览、搜索和其它用户行动所产生的数据)。

部署方式

docker部署

镜像images

docker pull bitnami/kafka:latest
docker pull zookeeper

zookeeper

docker run -d --name zookeeper -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes zookeeper

kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 -e ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest

docker-compose部署

version: '3'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  
  kafka:
    image: bitnami/kafka:latest
    container_name: kafka
    ports:
      - '9092:9092'
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

k8s部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - name: zookeeper
          image: zookeeper
          ports:
            - containerPort: 2181
          env:
            - name: ALLOW_ANONYMOUS_LOGIN
              value: "yes"

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
        - name: kafka
          image: bitnami/kafka:latest
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "PLAINTEXT://localhost:9092"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: ALLOW_PLAINTEXT_LISTENER
              value: "yes"

在.net中的使用

引用Nuget包

Confluent.Kafka

一个简单的示例

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace TestLibrary.MQ
{
    public class KafkaTest
    {
        public static void Test()
        {
            var bootstrapServers = "127.0.0.1:9092";
            var topic = "my-topic";

            var config = new ConsumerConfig
            {
                BootstrapServers = bootstrapServers,
                GroupId = "my-consumer-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            // 创建一个生产者实例
            //var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                Console.WriteLine($"开始向主题 {topic} 发送消息...");

                string message = "Hello, Kafka!";
                var sendResult = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }).GetAwaiter().GetResult();
                Console.WriteLine($"发送消息: {message}, 分区: {sendResult.Partition}, 偏移量: {sendResult.Offset}");
            }

            Console.WriteLine("生产者已关闭。");

            // 创建一个消费者实例
            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                // 订阅主题
                consumer.Subscribe(topic);

                Console.WriteLine($"开始消费来自主题 {topic} 的消息...");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true;
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cts.Token);
                            Console.WriteLine($"接收到消息: {consumeResult.Value}");
                        }
                        catch (ConsumeException ex)
                        {
                            Console.WriteLine($"消费异常: {ex.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // 当用户按下Ctrl+C或者取消键时,退出循环
                }
                finally
                {
                    consumer.Close();
                }
            }
            Console.WriteLine("消费者已关闭。");

        }
    }
}


评论