RabbitMQ入门笔记

MQ引言

何为MQ?

MQ(Message Queue) : 翻译为 消息队列 ,通过典型的 生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

MQ的作用

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口

image-20210406232705211

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。

订单系统和库存系统高耦合—>引入消息队列

image-20210406233036598

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

流量削峰

如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

  • 可以控制活动人数,超过此一定阀值的订单直接丢弃
  • 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

image-20210407092232882

  1. 用户发送请求,服务器接收后,首先写入消息队列,若加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。

  2. 秒杀业务根据消息队列中的请求信息,再做后续处理。

消息分发

多个服务对数据感兴趣,只需监听同一类消息即可处理。

image-20210407092801130

例如A产生数据,B对数据感兴趣。如果没有消息队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码,没有便捷性。

image-20210407093359894

加入消息队列后,A只需发送一次信息,B对消息感兴趣,只需监听信息。同样的C感兴趣,也只需监听信息。A服务作为基础服务则不须改动。

异步消息

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行方式  2. 并行方式

  • 串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这样就产生一个问题,邮件,短信并不是用户必须要等待其响应,这就浪费了大部分时间到响应请求上。

image-20210407095734009

  • 并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端。并行方式相较于串行能略微提高处理的时间。

image-20210407100711498

  • 消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行方式使用时间100ms。虽然并行已经提高处理时间,但是,前面说过,邮件和短信的响应对正常访问网站无影响,客户端没有必要等其发送完成才显示注册成功,应该是写入数据库后即刻返回。

    消息队列: 引入消息队列后,把发送邮件,短信这些不必要等待响应的业务逻辑异步处理

    image-20210407101316780

    由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列处理后,响应时间是串行的3倍,是并行的2倍。

    常用MQ的对比

    # 1.ActiveMQ
    	ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
    
    # 2.Kafka
    	Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
    
    # 3.RocketMQ
    	RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
    
    # 4.RabbitMQ
    	RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

    RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

RabbitMQ引言

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移则是构建在开放电信平台框架上的。所有主要的编程语言均有与其代理接口通讯的客户端库。

image-20210407103237748

AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。

131925_mGta_1759553.png

RabbitMQ的安装(CentOS 7.x)

通过Docker

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

–rm(关闭容器时删除容器)

通过依赖包

资源包:https://wws.lanzous.com/b0261684d 密码:702u

资源包包括Erlang语言依赖、socat内存管理以及RabbitMQ服务

依次安装三个依赖包即可完成安装

  1. 将三个安装包全部拖入CentOS 7中,执行rpm命令依次进行安装
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
  1. 安装成功后,将RabbitMQ默认的配置文件模板拷贝一份至其目录下
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
  1. 使用vim对配置文件进行编辑,开放访问权限
vim /etc/rabbitmq/rabbitmq.config

image-20210407111624188

  1. 开启RabbitMQ可视化管理插件
rabbitmq-plugins enable rabbitmq_management
  1. 开启服务,并通过访问 IP:15672 进入管理页面
systemctl start rabbitmq-server

systemctl status rabbitmq-server

image-20210407112254962

image-20210407112315496

至此,RabbitMQ安装完毕

RabbitMQ支持的消息模型

image-20210407113101359

image-20210407113119982

需要在项目中使用需提前加入RabbitMQ依赖

<!--  引入rabbitmq的相关依赖  -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.2</version>
    </dependency>

Hello World模型

image-20210407113754874

  • P (Provider) :生产者,也就是要发送消息的程序
  • C (Consumer) :消费者—>消息的接受者,会一直等待消息到来。
  • Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

实现代码

Provider.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 提供者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Provider {

    // 生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        // 通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        // 获取连接中通道
        Channel channel = connection.createChannel();

        // 通道绑定对应消息队列
        // 参数1:队列名称 如果队列不存在自动创建
        // 参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
        // 参数3:exclusive 是否独占队列 true 独占队列 false 不独占
        // 参数4:autoDelete:是否消费完成后自动删除队列 true 自动删除 false 不自动删除
        // 参数5:额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        // 发布信息
        // 参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

        // 调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }

}

Consumer.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        // 通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        // 获取连接中通道
        Channel channel = connection.createChannel();

        // 通道绑定对应消息队列
        // 参数1:队列名称 如果队列不存在自动创建
        // 参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
        // 参数3:exclusive 是否独占队列 true 独占队列 false 不独占
        // 参数4:autoDelete:是否消费完成后自动删除队列 true 自动删除 false 不自动删除
        // 参数5:额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        // 消费信息
        // 参数1:队列名称 消费队列信息
        // 参数2:开启消息的自动确认机制
        // 参数3:消费时的回调接口
        channel.basicConsume("hello",true,new DefaultConsumer(channel){

            @Override // 最后一个参数:消息队列中取出的信息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body) = " + new String(body));
            }
        });
    }
}

工具类 RabbitMQUtils.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 兔子mqutils
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class RabbitMQUtils {

    private static final ConnectionFactory connectionFactory;

    static {
        // 重量级资源 类加载只执行一次
        // 创建连接工厂
        connectionFactory = new ConnectionFactory();
        // 设置连接rabbitmq的主机
        connectionFactory.setHost("192.168.150.128");
        // 设置端口号
        connectionFactory.setPort(5672);
        //设置连接虚拟主机
        connectionFactory.setVirtualHost("/ems");
        // 设置访问虚拟主机的用户和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
    }

    // 定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            // 获取连接对象
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    // 关闭通道和关闭连接的工具方法
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

测试结果

image-20210407123500251

成功接受生产者提供的信息,测试完毕。

Work Queues 模型

img

Work queues,也被称为(Task queues),任务模型。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。

队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

实现代码

Provider.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 提供者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Provider {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 通过通道声明队列
        channel.queueDeclare("work",true,false,false,null );

        for (int i = 1; i <= 20; i++) {
            // 生产消息
            channel.basicPublish("","work",null,(i + " =====> hello work queue").getBytes());
        }

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }

}

Consumer1.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * consumer1
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Consumer1 {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        final Channel channel = connection.createChannel();

        channel.basicQos(1); // 每一次只能消费一个消息

        // 通过通道声明队列
        // 参数1:队列名称 参数2:消息自动确认 true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消费
        channel.queueDeclare("work",true,false,false,null );

        channel.basicConsume("work",false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("消费者-1:" + new String(body));
                // 手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

依次类推写出Consumer2.java,同时启动并测试即可观察到如下结果

测试结果

正常开启(开启自动确认并无休眠)即可观察到两个消费者遵循轮询消费

image-20210407124915144

image-20210407124937591

在两个消费者均加入每次只能消费一条信息的限制,且在Consumer1.java中加入休眠并启用手动确认可实现抢占式获取信息

image-20210407124503181

image-20210407124520218

Fanout 模型

image-20210407125357499

Fanout 模型(也称 Publish/Subscribe 模型)是一种广播模型,也就是一个生产者可以发一个消息,进行广播,让多个消费者消费同一个消息。

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

实现代码

Provider.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 提供者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Provider {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 通过通道声明指定的交换机     // 参数1:交换机名称 参数2:交换机类型 fanout 广播类型
        channel.exchangeDeclare("logs","fanout");

        // 发送消息
        channel.basicPublish("logs","work",null, " =====> fanout type queue".getBytes());

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }

}

Consumer1.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * consumer1
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Consumer1 {

    public static void main(String[] args) throws IOException {

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        final Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

        // 临时队列
        String queue = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queue,"logs","");

        // 消费消息
        channel.basicConsume(queue,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:" + new String(body));
            }
        });
    }
}

依次类推Consumer2以及Consumer3

测试结果

image-20210407144020032

image-20210407144045936

image-20210407144101427

三个消费者均能接收到同一个交换机的信息,测试完毕

Routing 模型

image-20210407144304560

在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。

这时就要用到 Direct 类型的 Exchange 。

Direct 模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 Routing Key(路由key)
  • 消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 Routing Key
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routing Key 与消息的 Routing Key 完全一致,才会接收到消息

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个 Routing Key 。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 Routing Key 完全匹配的队列
  • C1:消费者,其所在队列指定了需要 Routing Key 为 orange 的消息
  • C2:消费者,其所在队列指定了需要 Routing Key 为 black 、green 的消息

实现代码

Provider.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 提供者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Provider {

    public static void main(String[] args) throws IOException {

        String exchangeName = "logs_direct";

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 通过通道声明指定的交换机     // 参数1:交换机名称 参数2:交换机类型 direct 路由模式
        channel.exchangeDeclare(exchangeName,"direct");

        // 发送消息
        String routingKey = "error";
        channel.basicPublish(exchangeName,routingKey,null, ("这是基于direct模型发布的基于route key:[" + routingKey + "]发送的消息").getBytes());

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }

}

Consumer1.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * consumer1
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Consumer1 {

    public static void main(String[] args) throws IOException {

        String exchangeName = "logs_direct";

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        final Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare(exchangeName,"direct");

        // 临时队列
        String queue = channel.queueDeclare().getQueue();

        // 绑定交换机和队列
        channel.queueBind(queue,exchangeName,"error");

        // 消费消息
        channel.basicConsume(queue,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:" + new String(body));
            }
        });
    }
}

以此类推得出Consumer2的代码,对绑定的交换机以及队列做出略微修改

// 绑定交换机和队列
        channel.queueBind(queue,exchangeName,"info");
        channel.queueBind(queue,exchangeName,"error");
        channel.queueBind(queue,exchangeName,"warning");

测试结果

发送Routing Key为error的信息时,由于两者都绑定了此Key所以都能接受到信息

image-20210407145223014

image-20210407145238424

而发送Routing Key为info的信息时,只有绑定了此Key的消费者2才能接收到信息

image-20210407145449087

Topics 模型

image-20210407145605180

Topics 类型的 ExchangeDirect 相比,都是可以根据 Routing Key 把消息路由到不同的队列。

只不过Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

这种模型 Routing Key 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

# 通配符
		*   匹配不多不少恰好1个词
		#   匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者 audit.irs 等
    	audit.*    只能匹配 audit.irs

实现代码

Provider.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 提供者
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Provider {

    public static void main(String[] args) throws IOException {

        String exchangeName = "topics";

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        Channel channel = connection.createChannel();

        // 通过通道声明指定的交换机     // 参数1:交换机名称 参数2:交换机类型 direct 路由模式
        channel.exchangeDeclare(exchangeName,"topic");

        // 发送消息
        String routingKey = "save.user.delete";
        channel.basicPublish(exchangeName,routingKey,null, ("这是topic动态路由模型,route key:[" + routingKey + "]发送的消息").getBytes());

        // 关闭资源
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }

}

Consumer1.java****

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * consumer1
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
public class Consumer1 {

    public static void main(String[] args) throws IOException {

        String exchangeName = "topics";

        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道对象
        final Channel channel = connection.createChannel();

        // 通道绑定交换机
        channel.exchangeDeclare(exchangeName,"topic");

        // 临时队列
        String queue = channel.queueDeclare().getQueue();

        // 绑定交换机和队列 动态通配符形式 routingKey
        channel.queueBind(queue,exchangeName,"user.*");

        // 消费消息
        channel.basicConsume(queue,true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:" + new String(body));
            }
        });
    }
}

Consumer2.java

// 绑定交换机和队列 动态通配符形式 routingKey
        channel.queueBind(queue,exchangeName,"user.#");

测试结果

image-20210407150234072

image-20210407150244812

若Routing Key为user.delete.map,则只有通配符为#的消费者2能接收到信息

image-20210407150346970

RPC 模型

image-20210407150810242

Work Queues 模型中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

但是,如果我们需要在远程计算机上运行功能并等待结果怎么办?那就可以用 RPC 模型(远程过程调用)。

我们将使用 RabbitMQ 构建 RPC 系统:客户端和可伸缩 RPC 服务器。由于我们没有值得分配的耗时任务,因此我们将创建一个虚拟 RPC 服务,该服务返回斐波那契数。

图解

  • 对于 RPC 请求,客户端发送一条消息,该消息具有两个属性:replyTo(设置为仅为该请求创建的匿名互斥队列)和 correlationId(设置为每个请求的唯一值)。
  • 该请求被发送到 rpc_queue 队列。
  • RPC 工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它会使用 replyTo 字段中的队列来完成工作,并将消息和结果发送回客户端。
  • 客户端等待答复队列中的数据。出现消息时,它将检查 correlationId 属性。如果它与请求中的值匹配,则将响应返回给应用程序。

消息的属性

image-20200922232442807

实现代码

RPCClient.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue"; // 发送请求的队列名称

    public static void main(String[] args) throws IOException, InterruptedException {
        // 初始化 RPCClient
        RPCClient rpcClient = new RPCClient();
        rpcClient.connection = RabbitMQUtils.getConnection();
        rpcClient.channel = rpcClient.connection.createChannel();
        // 发送 request 请求信息
        for (int i = 0; i < 5; i++) {
            String i_str = Integer.toString(i);
            System.out.println("现在客户端希望计算 fic("+i+")");
            String response = rpcClient.call(i_str);
            System.out.println("现在计算出来 fic("+i+") = "+ response);
        }
        // 关闭资源
        close(rpcClient);
    }

    // 发送请求,希望调用远程的函数
    public String call(String message) throws IOException, InterruptedException {
        // 定义一个相关ID
        final String correlationId = UUID.randomUUID().toString();
        // 回调队列
        String replyQueueName = channel.queueDeclare().getQueue();
        // 定义消息属性
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
                .Builder()
                .correlationId(correlationId)  // 设置相关ID
                .replyTo(replyQueueName)       // 设置回调队列
                .build();

        // 存储相应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        // 发送消息
        channel.basicPublish("",requestQueueName,basicProperties,message.getBytes("UTF-8"));

        // 消费消息
        String ctag = channel.basicConsume(replyQueueName, true
                // 参数3:服务器端传过来的回调对象
                , new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        if (message.getProperties().getCorrelationId().equals(correlationId)) {
                            response.offer(new String(message.getBody(), "UTF-8"));
                        }
                    }
                }, new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {

                    }
                });
        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    // 关闭资源
    public static void close(RPCClient rpcClient){
        RabbitMQUtils.closeConnectionAndChanel(rpcClient.channel,rpcClient.connection);
    }

}

RPCServer.java

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class RPCServer {

    // 定义队列名称
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    // 计算斐波那契数列
    public static int fic(int a){
        if (a == 0) return 0;
        if (a == 1) return 1;
        return fic(a-1) + fic(a-2);
    }

    public static void main(String[] args) throws IOException {
        // 调用自行封装的工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取通道
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
        // 清除队列
        channel.queuePurge(RPC_QUEUE_NAME);
        // 每次处理一条信息
        channel.basicQos(1);
        // 定义一个监听器
        final Object monitor = new Object();
        // 定义回调信息
        DeliverCallback deliverCallback = new DeliverCallback() {
            /**
             * @param consumerTag 消费者标签,可以与消费者建立联系
             * @param message     消费者发送过来的消息(消息属性、消息封装体、消息没人)
             */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                // 定义信息属性
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(message.getProperties().getCorrelationId()) // 指明关联ID
                        .build();
                // 定义相应信息
                String response = "";
                // 解析request信息
                try{
                    String s = new String(message.getBody(),"UTF-8");
                    int i = Integer.parseInt(s);
                    System.out.println("正在计算 fic("+i+")");
                    response += fic(i);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    // 发布 response 消息
                    channel.basicPublish("",message.getProperties().getReplyTo(),basicProperties,response.getBytes("UTF-8"));
                    // 手动确认信息
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    // RabbitMQ 消费者工作线程通知 RPC 服务器所有者线程
                    synchronized (monitor){
                        /**
                         * notify()
                         *
                         * 唤醒处于等待的线程
                         */
                        monitor.notify();
                    }
                }
            }
        };
        // 消费客户端发送过来的请求消息
        channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, new CancelCallback() {
            // 等待并准备好使用来自RPC客户端的消息
            @Override
            public void handle(String consumerTag) throws IOException {
                while (true){
                    synchronized (monitor){
                        try {
                            /**
                             * wait()
                             *
                             * 使得当前线程立刻停止运行,处于等待状态(WAIT),
                             * 并将当前线程置入锁对象的等待队列中,
                             * 直到被通知(notify)或被中断为止。
                             */
                            monitor.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
    }
}

测试结果

image-20210407151813017

image-20210407151725294

image-20200923112003146

参考:https://blog.csdn.net/Hedon954/article/details/108802009?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-5.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-5.control

Spring Boot 整合 RabbitMQ

首先引入RabbitMQ依赖

<!--    引入rabbitmq集成的依赖     -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

其次配置Application.yml

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: 192.168.150.128
    port: 5672
    username: ems
    password: ems
    virtual-host: /ems

Hello World 模型

代码实现

HelloConsumer.java

package cn.plutowu.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
@Component  // 持久化 非独占 默认
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloConsumer {
	
    /**
 	* @RabbitListener:申明监听的队列
	*	@Queue:具体指明哪一个队列及其相应的属性
 	* @RabbitHandler:指定收到消息时的回调方法
 	*/
    @RabbitHandler
    public void receive(String message) {
        System.out.println("hello message = " + message);
    }

}

Test 片段

package cn.plutowu.test;

import cn.plutowu.RabbitmqSpringbootApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // hello world
    @Test
    public void testHello() {
        rabbitTemplate.convertAndSend("hello","hello world");
    }

}

测试结果

image-20210407152945872

Work Queues 模型

代码实现

WorkConsumer.java

package cn.plutowu.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
@Component
public class WorkConsumer {

    // 一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message) {
        System.out.println("work message1 = " + message);
    }

    // 一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message) {
        System.out.println("work message2 = " + message);
    }

}

Test 片段

// work
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","work模型" + i);
        }
    }

测试结果

image-20210407153310564

遵循了轮询规则,把信息分配到两个消费者上

Fanout 模型

代码实现

FanoutConsumer.java

package cn.plutowu.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *
 * @author PlutoWu
 * @date 2021/04/06
 */
@Component
public class FanoutConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "logs", type = "fanout") // 绑定的交换机
            )
    })
    public void receive1(String message) {
        System.out.println("work message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "logs", type = "fanout") // 绑定的交换机
            )
    })
    public void receive2(String message) {
        System.out.println("work message2 = " + message);
    }

}

Test 片段

// fanout 广播
    @Test
    public void testFanout() {
            rabbitTemplate.convertAndSend("logs","","Fanout模型的message");
    }

测试结果

image-20210407153541651

再绑定同一个交换机的情况下,两个消费者均接收到此信息

Routing 模型

实现代码

RouteConsumer.java

package cn.plutowu.route;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author PlutoWu
 * @date 2021/04/06
 */
@Component
public class RouteConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "directs", type = "direct"), // 绑定的交换机
                    key = {"info", "error", "warn"}
            )
    })
    public void receive1(String message) {
        System.out.println("route message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "directs", type = "direct"), // 绑定的交换机
                    key = {"error"}
            )
    })
    public void receive2(String message) {
        System.out.println("route message2 = " + message);
    }

}

Test 片段

// route 路由模式
    @Test
    public void testRoute() {
        rabbitTemplate.convertAndSend("directs","error","发送error的key的路由message");
    }

测试结果

发送Routing Key —–> error 两个消费者皆可接收

image-20210407153757266

发送Routing Key —–> info 只有消费者2能接收

image-20210407153922909

Topics 模型

实现代码

TopicConsumer.java

package cn.plutowu.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author PlutoWu
 * @date 2021/04/06
 */
@Component
public class TopicConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "topics", type = "topic"), // 绑定的交换机
                    key = {"user.save", "user.*"}
            )
    })
    public void receive1(String message) {
        System.out.println("topic message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    exchange = @Exchange(value = "topics", type = "topic"), // 绑定的交换机
                    key = {"order.#", "produce.#", "user.*"}
            )
    })
    public void receive2(String message) {
        System.out.println("topic message2 = " + message);
    }

}

Test 片段

// topic 动态路由 订阅模式
    @Test
    public void testTopic() {
        rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
    }

测试结果

发送user.save均可接收

image-20210407190939641

发送prodece.save只有消费者2可以接收

image-20210407191250129

RabbitMQ集群

普通集群(副本集群)

默认情况下:RabbitMQ 代理操作所需的所有数据/状态都将跨所有节点复制。

这方面的一个例外是消息队列,默认情况下,==消息队列仅位于一个节点上==,尽管它们可以从所有节点看到和访问

架构图

image-20200924224004549

核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份。

集群搭建

# 0.集群规划
	node1: 10.15.0.3  mq1  master 主节点
	node2: 10.15.0.4  mq2  repl1  副本节点
	node3: 10.15.0.5  mq3  repl2  副本节点

# 1.克隆三台机器主机名和ip映射
	vim /etc/hosts加入:
		 10.15.0.3 mq1
    	10.15.0.4 mq2
    	10.15.0.5 mq3
	node1: vim /etc/hostname 加入:  mq1
	node2: vim /etc/hostname 加入:  mq2
	node3: vim /etc/hostname 加入:  mq3

# 2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行:
	scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
	scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/

# 3.查看cookie是否一致:
	node1: cat /var/lib/rabbitmq/.erlang.cookie 
	node2: cat /var/lib/rabbitmq/.erlang.cookie 
	node3: cat /var/lib/rabbitmq/.erlang.cookie 

# 4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面:
	rabbitmq-server -detached 

# 5.在node2和node3执行加入集群命令:
	1.关闭       rabbitmqctl stop_app
	2.加入集群    rabbitmqctl join_cluster rabbit@mq1
	3.启动服务    rabbitmqctl start_app

# 6.查看集群状态,任意节点执行:
	rabbitmqctl cluster_status

# 7.如果出现如下显示,集群搭建成功:
	Cluster status of node rabbit@mq3 ...
	[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
	{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
	{cluster_name,<<"rabbit@mq1">>},
	{partitions,[]},
	{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]

镜像集群

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。

架构图

image-20200925082814401

集群搭建

# 0.策略说明
	rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
	-p Vhost: 可选参数,针对指定vhost下的queue进行设置
	Name:     policy的名称
	Pattern: queue的匹配模式(正则表达式)
	Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
           		  ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
                        all:表示在集群中所有的节点上进行镜像
                        exactly:表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定
                        nodes:表示在指定的节点上进行镜像,节点名称通过 ha-params 指定
            	  ha-params:ha-mode模式需要用到的参数
                ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual(默认)
                priority:可选参数,policy 的优先级(数字越高,优先级越高)
# 1.查看当前策略
	rabbitmqctl list_policies

# 2.添加策略
	rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}' 
	说明:策略正则表达式为 “^” 表示所有匹配所有队列名称  ^hello:匹配hello开头队列

# 3.删除策略
	rabbitmqctl clear_policy ha-all

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!