kafka简介:
Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。 它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。 它主要用于处理消费者规模网站中的所有动作流数据。动作指(网页浏览、搜索和其它用户行动所产生的数据)。
部署方式
docker部署
镜像images
docker pull bitnami/kafka:latest
docker pull zookeeper
zookeeper
docker run -d --name zookeeper -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes zookeeper
kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 -e ALLOW_PLAINTEXT_LISTENER=yes bitnami/kafka:latest
docker-compose部署
version: '3'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
container_name: kafka
ports:
- '9092:9092'
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
k8s部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper-deployment
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: bitnami/kafka:latest
ports:
- containerPort: 9092
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://localhost:9092"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
在.net中的使用
引用Nuget包
Confluent.Kafka
一个简单的示例
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace TestLibrary.MQ
{
public class KafkaTest
{
public static void Test()
{
var bootstrapServers = "127.0.0.1:9092";
var topic = "my-topic";
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "my-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
// 创建一个生产者实例
//var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
Console.WriteLine($"开始向主题 {topic} 发送消息...");
string message = "Hello, Kafka!";
var sendResult = producer.ProduceAsync(topic, new Message<Null, string> { Value = message }).GetAwaiter().GetResult();
Console.WriteLine($"发送消息: {message}, 分区: {sendResult.Partition}, 偏移量: {sendResult.Offset}");
}
Console.WriteLine("生产者已关闭。");
// 创建一个消费者实例
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// 订阅主题
consumer.Subscribe(topic);
Console.WriteLine($"开始消费来自主题 {topic} 的消息...");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"接收到消息: {consumeResult.Value}");
}
catch (ConsumeException ex)
{
Console.WriteLine($"消费异常: {ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// 当用户按下Ctrl+C或者取消键时,退出循环
}
finally
{
consumer.Close();
}
}
Console.WriteLine("消费者已关闭。");
}
}
}