官网: 快速开始 | RocketMQ (apache.org)
部署
docker部署
拉取镜像
docker pull apache/rocketmq:latest
创建网络
docker network create rocketmq
部署nameserver
docker run -d --name namesrv -p 9876:9876 --network rocketmq -v /docker/rocketmq/namesrv/logs:/root/logs apache/rocketmq:latest sh mqnamesrv
部署broker
docker run -d --name broker -p 10911:10911 -p 10909:10909 -v /docker/rocketmq/broker/logs:/root/logs --network rocketmq --env "NAMESRV_ADDR=namesrv:9876" apache/rocketmq:latest sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
RocketMQ 控制台
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d -p 8080:8080 --name rocketmq-dash --network rocketmq -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876" apacherocketmq/rocketmq-dashboard
修改broker.conf文件
docker exec -it namesrv bash
cd ../conf
vi broker.conf
添加
brokerIP1=127.0.0.1
docker-compose部署
version: '3.3'
services:
mqnamesrv:
container_name: namesrv
ports:
- '9876:9876'
network_mode: rocketmq
volumes:
- '/docker/rocketmq/namesrv/logs:/root/logs'
command: sh mqnamesrv
image: apache/rocketmq
mqbroker:
container_name: broker
ports:
- '10911:10911'
- '10909:10909'
volumes:
- '/docker/rocketmq/broker/logs:/root/logs'
network_mode: rocketmq
environment:
- 'NAMESRV_ADDR=namesrv:9876'
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
image: apache/rocketmq
rocketmq-dashboard:
ports:
- '7000:8080'
container_name: rocketmq-dash
network_mode: rocketmq
environment:
- 'JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876'
image: apacherocketmq/rocketmq-dashboard
#docker-compose基本命令
启动服务
docker-compose -f docker-compose.yml up -d
停止服务
docker-compose -f docker-compose.yml stop
停止并删除服务
docker-compose -f docker-compose.yml down
问题,如果修改了brokerIP1为宿主机IP则dashboard中显示无法连接到集群, 如果不修改,则通过程序无法连接到broker集群
在.net中的使用
Nuget包:NewLife.RocketMQ
#生产者
internal class NewLifeProducker
{
public static void Send()
{
var rocketServer = "127.0.0.1:9876";
var topic = "mytopic";
var mq = new Producer
{
//Topic = topic,
NameServerAddress = rocketServer,
//Log = XTrace.Log,
};
mq.Start();
var mqTopic = mq.Topic;
if (string.IsNullOrEmpty(mqTopic))
{
mq.CreateTopic(topic, 2);
}
for (var i = 0; i < 10; i++)
{
var str = "学无先后达者为师" + i;
//var str = Rand.NextString(1337);
var sr = mq.Publish(str, "TagA");
}
Console.WriteLine("完成");
mq.Dispose();
Console.ReadLine();
}
}
#消费者
internal class NewLifeConsumer
{
public static void Resolve()
{
Console.WriteLine("消息接收测试");
//测试消费消息
var consumer = new NewLife.RocketMQ.Consumer
{
//Topic = "mytopic",
Group = "CID_ONSAPI_OWNER",
NameServerAddress = "127.0.0.1:9876",
//设置每次接收消息只拉取一条信息
BatchSize = 1,
//FromLastOffset = true,
//SkipOverStoredMsgCount = 0,
//BatchSize = 20,
//Log = NewLife.Log.XTrace.Log,
};
consumer.OnConsume = (q, ms) =>
{
string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
Console.WriteLine(mInfo);
foreach (var item in ms.ToList())
{
string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}";
Console.WriteLine(msg);
}
// return false;//通知消息队:不消费消息
return true; //通知消息队:消费了消息
};
consumer.Start();
Console.ReadLine();
}
}
参考:
https://blog.csdn.net/weixin_44606481/article/details/129758920