程序员社区

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台

RocketMQ

RocketMQ是一个分布式消息传递和流媒体平台,是一款分布式、队列模型的消息中间件。具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。阿里开源的消息中间件,已经捐献给了 Apache 。
Github地址:https://github.com/apache/rocketmq/
官方样例代码:https://github.com/apache/rocketmq/tree/master/example
官方网站:https://rocketmq.apache.org/
使用文档:https://rocketmq.apache.org/docs/quick-start/

具体特点:

1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的订阅者水平扩展能力
4.实时的消息订阅机制
5.亿级消息堆积能力

部署RocketMQ

1.下载源码

官方下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq
官方推荐使用的镜像地址:http://mirror.bit.edu.cn/apache/rocketmq

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图
image.png

这里选择下载了最新的RocketMQ4.3.2的源码

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图1
image.png

2.1解压并用maven构建二进制文件

构建命令:mvn -Prelease-all -DskipTests clean install -U

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图2
image.png

RocketMQ中需要打包构建的模块已经全部完成,对应的target文件夹在distribution模块中

2.2或者可以直接下载bin-release解压得到二进制文件

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图3
image.png

这种方式不需要下载源码和用maven构建

3.启动

1.启动 NameServer

进入target/apache-rocketmq/bin目录
window中命令:start mqnamesrv.cmd
linux中命令nohup sh bin/mqnamesrv &

注:出现以下提示,也就是没有在环境变量中配置ROCKETMQ_HOME的值

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图4
image.png

解决方法:配置环境变量
变量名:ROCKETMQ_HOME
变量值:E:\rocketMQ\rocketmq-all-4.3.2\distribution\target\apache-rocketmq
(变量值不一定要跟我的完全一样,distribution\target\apache-rocketmq这个基本是一样的)

再次执行启动命令

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图5
image.png

成功后会弹出提示框,此框勿关闭

启动BROKER

window中命令:start mqbroker.cmd -n localhost:9876
linux中命令nohup sh bin/mqbroker -n localhost:9876 &

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图6
image.png

成功后会弹出提示框,此框勿关闭

RocketMQ的核心概念图

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图7
image.png

Producer消息生产者

将业务应用程序系统生成的消息发送给Broker,RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)
三种发送消息的方式:同步,异步和单向传输
源码解析

public class DefaultMQProducer extends ClientConfig implements MQProducer {
    /**
     * 以同步方式发送消息。仅当发送过程完全完成时,此方法才会返回。
     */
      @Override
    public SendResult send(Message msg) throws MQClientException, 
              RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }
     /**
     * 以同步方式发送消息。可指定发送超时时间
     */
    @Override
    public SendResult send(Message msg,long timeout) throws MQClientException, 
              RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }
    /**
     * 以异步方式发送消息。此方法立即返回。发送完成后,将执行sendCallback。
     */
      @Override
    public void send(Message msg,SendCallback sendCallback) throws MQClientException, 
              RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }
     /**
     * 以异步方式发送消息。可指定发送超时时间
     */
    @Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
              throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }
    /**
     * 以单向传输的方式发送消息。具有最大的吞吐量但存在消息丢失的风险。不等待服务器回应且
    没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
     */
    @Override
    public void sendOneway(Message msg) throws MQClientException, 
                            RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg);
    }
}

关于异步发送,需要实现异步发送回调接口SendCallback,在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理

public interface SendCallback {
    void onSuccess(final SendResult sendResult);
    void onException(final Throwable e);
}

详细写法推荐参考:https://help.aliyun.com/document_detail/29547.html

Producer Group生产者组

如果其中一个生产者在事务之后崩溃,则代理可以联系同一生产者组的不同生产者实例以提交或回滚事务。一个应用只有一个生产者组,避免不必要的实例初始化,每个生产者发送逻辑一致,producerGroup可以共用一个队列

生产者端的负载均衡
生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker

Consumer消息消费者

消费者从Broker那里获取消息并将其提供给应用程序。
从用户应用的角度来看,提供了两种类型的消费者:

1.PullConsumer(拉模式)

概念:consumer主动去向broker拉取消息。
取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue

PullConsumer具体使用可以参考
https://blog.csdn.net/gwd1154978352/article/details/80884741

2.PushConsumer(推模式)

概念:broker主动去向consumer推送消息。
Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListenerconsumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

关系:PUSH模式实际上在内部还是使用的PULL方式实现的,通过PULL不断地轮询Broker获取消息,当不存在新消息时,Broker会挂起PULL请求,直到有新消息产生才取消挂起,返回新消息

PushConsumer具体使用主要有三步

1.订阅主题以消费订阅,主要用到subscribe()

   @Override
   public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
    }

2.设置Push的消费策略,用到setConsumeFromWhere()

public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.consumeFromWhere = consumeFromWhere;
    }

ConsumeFromWhere是一个枚举类

public enum ConsumeFromWhere {
  //默认策略,从该队列最尾开始消费,即跳过历史消息
    CONSUME_FROM_LAST_OFFSET,
  //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
    CONSUME_FROM_FIRST_OFFSET,
  //从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
    CONSUME_FROM_TIMESTAMP,
}

3.进行消息的逻辑处理(消费)
3.1注册MessageListener监听器,监听器有两种,分别是
有序消费MessageListenerOrderly
并发消费MessageListenerConcurrently
MessageListenerConcurrentlyMessageListenerOrderly均继承于 MessageListener

public interface MessageListenerConcurrently extends MessageListener {
    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
        final ConsumeConcurrentlyContext context);
  }
public interface MessageListenerOrderly extends MessageListener {
    ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
        final ConsumeOrderlyContext context);
  }

区别是:
1.并发消费的消费速度要比有序消费更快。
2.并发消费模式不会无限消费,而且消费失败后不会马上再消费。
有序消费模式要慎重地处理异常,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS

(推荐使用并发消费)

    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }
    @Override
    public void registerMessageListener(MessageListenerOrderly messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

注:被transient修饰的成员属性变量不被序列化

DefaultMQPushConsumerImplPushConsumer消费者具体实现类,用于设置拉取消息后的回调类

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private MessageListener messageListenerInner;
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }
  //省略部分源码
}

3.2在对应的回调类中写消息处理逻辑
简单来说就是实现MessageListenerConcurrently或者MessageListenerOrderly回调接口的方法
处理模板

consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                              ConsumeConcurrentlyContext context) {
                    //具体逻辑
                }
            });
consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
                              ConsumeOrderlyContext context) {
                    //具体逻辑
                }
            });

consumeMessage()的参数说明

参数List<MessageExt> msgs是一个消息列表
参数ConsumeConcurrentlyContext context用于消费的事务控制,可以用于设置是否自动提交,消息队列和当前​​队列的暂停时间

ConsumeOrderlyContext源码

public class ConsumeOrderlyContext {
    private final MessageQueue messageQueue;
    private boolean autoCommit = true;
    private long suspendCurrentQueueTimeMillis = -1;

    public ConsumeOrderlyContext(MessageQueue messageQueue) {
        this.messageQueue = messageQueue;
    }

    public boolean isAutoCommit() {
        return autoCommit;
    }

    public void setAutoCommit(boolean autoCommit) {
        this.autoCommit = autoCommit;
    }

    public MessageQueue getMessageQueue() {
        return messageQueue;
    }

    public long getSuspendCurrentQueueTimeMillis() {
        return suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }
}

这里重点说明下返回值,consumeMessage有两种返回值:成功消费和失败稍后重试
枚举类ConsumeConcurrentlyStatus源码,并发消费返回值

public enum ConsumeConcurrentlyStatus {
    //成功消费
    CONSUME_SUCCESS,
    //失败稍后重试
    RECONSUME_LATER;
}

枚举类ConsumeOrderlyStatus源码,顺序消费返回值

public enum ConsumeOrderlyStatus {
    //成功消费
    SUCCESS,
    //失败稍后重试
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

两者具体的实现方法可以参考
https://blog.csdn.net/qq_36804701/article/details/81481343

4.调用start()方法启动consumer

Consumer Group消费者组

与之前提到的生产者组类似,完全相同角色的消费者被组合在一起并命名为消费者组。

Message Model消息模型

1.集群消费方式 (聚类模型)
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息
例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式 (广播模型)
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费
原文:https://blog.csdn.net/weixin_41098980/article/details/79880957

Broker

消息中转角色,负责存储消息,转发消息。是RocketMQ系统的主要组成部分。接收从生产者发送的消息,存储它们并准备处理来自消费者的拉取请求。还存储与消息相关的元数据,包括消费者组,消耗进度偏移和主题/队列信息。
Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的

NameServer

NameServer的作用是注册中心,类似于Zookeeper,但又有区别于它的地方。每个NameServer节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题,因此NameServerZookeeper相比更轻量级

Topic

主题是生产者传递消息和消费者提取消息的类别。
主题与生产者和消费者的关系非常松散。具体来说,一个主题可能有零个,一个或多个生成器向它发送消息; 相反,制作人可以发送不同主题的消息。从消费者的角度来看,主题可以由零个,一个或多个消费者群体订阅。类似地,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。

Tags

标记,换句话说,子主题,为用户提供了额外的灵活性。TagsTopic下的次级消息类型(注:Tags也支持TagA || TagB这样的表达式),可以在同一个Topic下基于Tags进行消息过滤。对于标记,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,而标签也可以方便RocketMQ提供的查询系统。
RocketMQ支持给在发送的时候给topictag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。

Message信息

消息是要传递的信息。消息必须有一个主题,可以将其解释为您要发送给的邮件地址

MessageQueueMessageQueueSelector

消息还可以具有可选标记和额外的键 - 值对。
例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。
生产者的send方法不同于以往

    @Override
    public SendResult send(Message msg, MessageQueue mq)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }
    @Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

可以指定目标消息队列MessageQueue
消息队列选择器MessageQueueSelector(通过它可以获得自定义的目标消息队列以传递消息),Object arg是与消息队列选择器一起使用的参数。

具体使用可以参考:https://www.jianshu.com/p/53324ea2df92

Message Queue消息队列

主题被划分为一个或多个子主题消息队列
MessageQueue源码

public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic;
    private String brokerName;
    private int queueId;
    //省略部分代码
}

关于RocketMQ——顺序消息和重复消息可参考
https://blog.csdn.net/gwd1154978352/article/details/80691916

进行与SpringBoot集成

下面写下测试代码

生产者

@Component
@Slf4j
public class RocketMQClient {
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
            Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

            StopWatch stop = new StopWatch();
            stop.start();

            for (int i = 0; i < 50; i++) {
                SendResult result = producer.send(message);
                log.info("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            }
            stop.stop();
            log.info("----------------发送50条消息耗时:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
            log.warn("exception_content",e);
        } finally {
            producer.shutdown();
        }
    }
}

@PostConstruct注解用来修饰一个非静态的void()方法,而且这个方法不能有抛出异常声明。在服务器加载Servlet的时候运行,并且只会被服务器调用一次,会在构造函数之后,init()方法之前运行。
拓展:
@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,并且只会被服务器调用一次,会在destroy()方法之后,在Servlet被彻底卸载之前运行。

消费者

@Component
@Slf4j
public class RocketMQServer {
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        log.info("messageExt: " + messageExt);//输出消息内容
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        log.info("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    log.warn("exception_content",e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            log.warn("exception_content",e);
        }
    }
}

application.yml配置文件

apache:
  rocketmq:
   # 消费者的组名
    consumer:
      PushConsumer: PushConsumer
  # 生产者的组名
    producer:
      producerGroup: Producer
  # NameServer地址
    namesrvAddr: localhost:9876

多个namesrvAddr;隔开

代码参考于:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/#%E5%90%AF%E5%8A%A8-Name-Server

启动测试,消息成功发送和消费

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图8
image.png

RocketMQ可视化监控界面

来源于RocketMQ有一个对其扩展的开源项目rocketmq-externals
Github地址:https://github.com/apache/rocketmq-externals
子模块rocketmq-console便是RocketMQ管理控制台项目。
官网说明对rocketmq-console的描述是A newly designed RocketMQ's console using spring-boot

搭建过程

1.把整个项目拉下来,进入rocketmq-console,找到application.properties,修改下配置

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图9
image.png

设置好rocketmq-console的启动端口和需要监听的RocketMQ服务端的ip地址和端口号

2.用maven构建rocketmq-console
执行命令:mvn clean package -Dmaven.test.skip=true

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图10
image.png

3.执行生成的jar
编译成功之后,Cmd进入target文件夹,执行java -jar rocketmq-console-ng-1.0.0.jar

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图11
image.png

访问http://localhost:8081就可以看到(挺漂亮的)控制台

对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台插图12
image.png
赞(0) 打赏
未经允许不得转载:IDEA激活码 » 对RocketMQ的见解以及与SpringBoot集成和配置可视化控制台

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区