hale
发布于 2023-08-26 / 97 阅读 / 0 评论 / 0 点赞

docker部署RocketMQ并且在.net中的简单使用

官网: 快速开始 | RocketMQ (apache.org)

部署

docker部署

拉取镜像

docker pull apache/rocketmq:latest

创建网络

docker network create rocketmq

部署nameserver

docker run -d --name namesrv -p 9876:9876 --network rocketmq -v /docker/rocketmq/namesrv/logs:/root/logs  apache/rocketmq:latest  sh mqnamesrv

部署broker

docker run -d --name broker -p 10911:10911 -p 10909:10909 -v /docker/rocketmq/broker/logs:/root/logs --network rocketmq --env "NAMESRV_ADDR=namesrv:9876"  apache/rocketmq:latest sh mqbroker -n namesrv:9876 -c ../conf/broker.conf

RocketMQ 控制台

docker pull apacherocketmq/rocketmq-dashboard:latest

docker run -d -p 8080:8080 --name rocketmq-dash --network rocketmq -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876" apacherocketmq/rocketmq-dashboard

修改broker.conf文件

docker exec -it namesrv bash

cd ../conf

vi broker.conf

添加

brokerIP1=127.0.0.1

docker-compose部署

version: '3.3'
services:
 mqnamesrv:
  container_name: namesrv
  ports:
   - '9876:9876'
  network_mode: rocketmq
  volumes:
   - '/docker/rocketmq/namesrv/logs:/root/logs'
  command: sh mqnamesrv
  image: apache/rocketmq

 mqbroker:
  container_name: broker
  ports:
   - '10911:10911'
   - '10909:10909'
  volumes:
   - '/docker/rocketmq/broker/logs:/root/logs'
  network_mode: rocketmq
  environment:
   - 'NAMESRV_ADDR=namesrv:9876'
  command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
  image: apache/rocketmq

 rocketmq-dashboard:
  ports:
   - '7000:8080'
  container_name: rocketmq-dash
  network_mode: rocketmq
  environment:
   - 'JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876'
  image: apacherocketmq/rocketmq-dashboard
#docker-compose基本命令
启动服务
docker-compose -f docker-compose.yml up -d

停止服务
docker-compose -f docker-compose.yml stop

停止并删除服务
docker-compose -f docker-compose.yml down

问题,如果修改了brokerIP1为宿主机IP则dashboard中显示无法连接到集群, 如果不修改,则通过程序无法连接到broker集群

在.net中的使用

Nuget包:NewLife.RocketMQ

#生产者

internal class NewLifeProducker
{
    public static void Send()
    {
        var rocketServer = "127.0.0.1:9876";
        var topic = "mytopic";
        var mq = new Producer
        {
            //Topic = topic,
            NameServerAddress = rocketServer,
            //Log = XTrace.Log,
        };
        mq.Start();
        var mqTopic = mq.Topic;
        if (string.IsNullOrEmpty(mqTopic))
        {
            mq.CreateTopic(topic, 2);
        }

        for (var i = 0; i < 10; i++)
        {
            var str = "学无先后达者为师" + i;
            //var str = Rand.NextString(1337);

            var sr = mq.Publish(str, "TagA");
        }

        Console.WriteLine("完成");
        mq.Dispose();
        Console.ReadLine();

    }
}

#消费者

internal class NewLifeConsumer
{
    public static void Resolve()
    {
        Console.WriteLine("消息接收测试");
        //测试消费消息
        var consumer = new NewLife.RocketMQ.Consumer
        {
            //Topic = "mytopic",
            Group = "CID_ONSAPI_OWNER",
            NameServerAddress = "127.0.0.1:9876",
            //设置每次接收消息只拉取一条信息
            BatchSize = 1,
            //FromLastOffset = true,
            //SkipOverStoredMsgCount = 0,
            //BatchSize = 20,
            //Log = NewLife.Log.XTrace.Log,
        };
        consumer.OnConsume = (q, ms) =>
        {
            string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
            Console.WriteLine(mInfo);
            foreach (var item in ms.ToList())
            {
                string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.Body.ToStr()}";
                Console.WriteLine(msg);
            }
            //   return false;//通知消息队:不消费消息
            return true;		//通知消息队:消费了消息
        };

        consumer.Start();
        Console.ReadLine();
    }
}

参考:

https://blog.csdn.net/weixin_44606481/article/details/129758920

https://www.cnblogs.com/lijunyzzZ/p/14687315.html


评论