Weixl 的个人博客

贼拉正经的个人博客

  menu
22 文章
0 浏览
2 当前访客
ღゝ◡╹)ノ❤️

RabbitMQ 消息中间件

RabbitMQ 消息中间件

RabbitMQ的简介

消息队列中间件是分布式系统中重要的组件。

主要解决应用解耦,异步消息,流量削锋,消息通讯等问题,实现高性能,高可用,可伸缩和最终一致性 架构

三个常用的MQ : ActiveMQ ,RabbitMQ,Kafka。

安全性比较: ActiveMQ > RabbitMQ > Kafka

性能比较:ActiveMQ < RabbitMQ < Kafka

RabbitMQ 和 ActiveMQ的区别

RabbitMQ在设计上比ActiveMQ多了一个 交换器。 生产者将消息 传递给 交换器,在由交换器,传递给消息队列。消费者再从消息队列中拿取消息。

  • RabbitMQ的默认端口 15672

RabbitMQ的架构图

2.png

主要概念

RabbitMQ Server : MQ服务,维护一条从Producer到Consumer的路线。保证数据能够按照指定方式进行传输。

producer:消息生产者,如图,A,B,C生产者连接RabbitMQ服务后,将消息传递给Exchange交换器,在由交换器通过 RouthingKey传递给具体的 Queue 队列。

Consumer:消息消费者。如图 1,2,3为三个消费者,连接RabbitMQ服务后,通过Queue获取到具体的消息,进行消费。

Exchange:交换器,生产者将消息发送到Exchange。有Exchange将消息路由到一个或者多个Queue中(或者丢弃)。Exchange并不存储消息。Exchange有direct,fanout,topic,headers四种类型。每种类型对应不同的路由规则。

Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息进行消费。Queue会将消息负载均衡到多个消费者进行处理。使用轮转式。

Routing Key:生产者将消息发送给Exchange时,一般会指定一个routing Key,来指定这个消息的路由规则。routing key 需要与 Exchange Type(四种模式中的topic)和 binding key(对应的Queue队列key)一起使用。

RabbitMQ的安装与启动

windows安装

  1. 下载安装 Eralng , 十次方配套软件中带有了(以管理员方式运行)
  2. 下载并安装RabbitMQ。 十次方配套软件 提供有 rabbitmq-server-3.7.4.exe。双击安装,注意不要安装在包含中文和空格的目录下!安装后window服务中就存在rabbitMQ了,并且是启动状态。
  3. 安装图形化管理界面(插件)
    进入rabbitMQ安装目录的sbin目录,输入命令
    rabbitmq‐plugins enable rabbitmq_management
    
  4. 重新启动服务
  5. 打开浏览器访问:http://127.0.0.1:15672
    用户名和密码都为 gues
    最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理

docker环境安装

  1. 下载镜像

    docker pull rabbitmq:management
    
  2. 创建容器

    docker run -di --name=tensquare_rabbitmq -p 5671:5617 -p 5672:5672 -p
    4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
    

RabbitMQ的三种模式

直接模式(Direct)

我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。

3.png

任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue中。

  1. 一般情况下这种直接模式,会将Exchange 交换器设置为一个 空字符串“”
  2. 不需要将Exchange进行任何的绑定(Binding)操作。
  3. 需要一个RoutingKey值,写入要发送到的队列的名称。
  4. 如果没有对应的队列名,该消息会被抛弃。

创建方式

在RabbitMQ图形化界面中进行创建。

在Queue 队列中进行创建

6.png

Durability:是否做持久化 Durable(持久) transient(临时)

Auto delete : 是否自动删除

分裂模式(Fanout)

当我们需要将消息一次发给多个队列时,需要使用这种模式。如下图:

4.png

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定的(Binding)的所有Queue上。

  1. 这种模式不需要RouteKey
  2. 提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue。
  3. 如果收到消息的Exchange没有与任何Queue绑定,消息会被抛弃。

创建方式

  1. 在queue中添加队列itheima 和kudingyu
  2. 在Exchange中新建交换器chuanzhi , 选择fanout模式7.png
  3. 将itcast 和itheima两个队列绑定到交换器chuanzhi
    8.png

9.png

10.png

主题模式

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

5.png

通过模糊匹配进行具体的队列寻找。比如 usa.abc.news 就可以同时发送给 红色 和 黄色两个Queue队列中。

  1. 主题模式,每个队列都有其关系的主题,所有的消息都带有一个标题(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列中。
  2. 需要RouteKey,提前绑定Exchange与Queue。
  3. #表示若干个关键字。 "" 表示一个关键字。如“log.”能与“log.warn”匹配,无法与与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

创建方式

  1. 新建交换器,设置 topic模式
  2. 点击新建的交换器,添加Bindings。11.png
  3. 添加完规则的,列表如下:12.png

Springboot整合RabbitMQ

基本配置

  1. 添加依赖

    <!-- TODO RabbitMQ -->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 添加yml配置文件

    spring:
      rabbitmq:
        host: 127.0.0.1
    

生产者

//进行 Rabbit 消息队列的生产 producer
@Autowired
private RabbitTemplate rabbitTemplate;

//直接模式
rabbitTemplate.convertAndSend("itcast" , "直接模式的测试");

//分裂模式
rabbitTemplate.convertAndSend("chuanzhi" , "" , "分裂模式测试");

//主题模式
rabbitTemplate.convertAndSend("topic" , "good.log" , "主题模式测试");

消费者

@RabbitListener(queues = "itcast") 注解来指定要监听哪一个Queue队列的消息。

当服务启动后,该监听就会一直执行中,一有消息就会进行消费。

@Component
@RabbitListener(queues = "itcast")
public class Consumer01 {

    //进行 消息的消费
    //对象的参数通过,生产者传递的消息类型,对应的赋值。 消息为String,收到也就用String来接收
    @RabbitHandler
    public void consumer(String message){
        System.out.println("itcast接收到的消息1111111111111111111:"+message);
    }
}

消费者Consumer消息丢失处理

  • 当A消费者处理消息时出现异常 , 消息并没有处理完毕,那么就将消息重新打回到队列中,让其他的消费者进行消费。

  • yml 配置文件:

    spring:
      application:
        name: rabbitmq-test
      rabbitmq:
        host: 127.0.0.1
        #消息确认机制 --- 是否开启手ack动确认模式
        listener:
          simple:
            acknowledge-mode: manual
          direct:
            acknowledge-mode: manual
    
  • 消费者1:

    @Component
    @RabbitListener(queues = "itcast")
    public class Consumer01 {
    
        //进行 消息的消费
        //对象的参数通过,生产者传递的消息类型,对应的赋值。 消息为String,收到也就用String来接收
        @RabbitHandler
        public void consumer(String text , Channel channel , Message message) throws IOException {
            //进行 消息丢失的处理解决
            try {
    //            进行睡眠
                Thread.sleep(3000);
                if (true){
                    throw new RuntimeException("我出错了");
                }
                System.out.println("itcast接收到的消息1111111111111111111:"+text);
    
                //上面为 消息处理的代码 。 如果处理成功,就 返回ACK,消息确认被消费
                channel.basicAck(message.getMessageProperties().getDeliveryTag() , false);
    
            }catch (Exception e){
                e.printStackTrace();
                //这里进行 消息的 失败返回确认
                // 业务处理失败后调用 , 会将消息。原封返回,RabbitMQ会找别的 消费者进行消费
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true); //fase拒绝多消费者,true确认返回队列
    //            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); //1对1的返回队列
            }
        }
    }
    
  • 消费者2:

    @Component
    @RabbitListener(queues = "itcast")
    public class Consumer04 {
        //进行 消息的消费
        @RabbitHandler
        public void consumer(String text , Channel channel , Message message) throws IOException {
            System.out.println("itcast接收到的消息2222222222222222:"+text);
            //在yml中已经配置,不再自动返回ACK,所以消费者要自己返回
            //上面为 消息处理的代码 。 如果处理成功,就 返回ACK,消息确认被消费 , false为手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag() , false);
        }
    }
    

标题:RabbitMQ 消息中间件
作者:Weixl
地址:http://loveless.top/articles/2020/10/13/1602593419386.html