RabbitMQ入门笔记
MQ引言
何为MQ?
MQ
(Message Queue) : 翻译为消息队列
,通过典型的生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
MQ的作用
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口
这种做法有一个缺点:
当库存系统出现故障时,订单就会失败。
订单系统和库存系统高耦合—>引入消息队列
订单系统:
用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:
订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
流量削峰
如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。
使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
- 可以控制活动人数,超过此一定阀值的订单直接丢弃
- 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
用户发送请求,服务器接收后,首先写入消息队列,若加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理。
消息分发
多个服务对数据感兴趣,只需监听同一类消息即可处理。
例如A产生数据,B对数据感兴趣。如果没有消息队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码,没有便捷性。
加入消息队列后,A只需发送一次信息,B对消息感兴趣,只需监听信息。同样的C感兴趣,也只需监听信息。A服务作为基础服务则不须改动。
异步消息
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行方式 2. 并行方式
串行方式:
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这样就产生一个问题,邮件,短信并不是用户必须要等待其响应,这就浪费了大部分时间到响应请求上。
并行方式:
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端。并行方式相较于串行能略微提高处理的时间。
消息队列:
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行方式使用时间100ms。虽然并行已经提高处理时间,但是,前面说过,邮件和短信的响应对正常访问网站无影响,客户端没有必要等其发送完成才显示注册成功,应该是写入数据库后即刻返回。消息队列: 引入消息队列后,把发送邮件,短信这些不必要等待响应的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列处理后,响应时间是串行的3倍,是并行的2倍。
常用MQ的对比
# 1.ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎! # 2.Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。 # 3.RocketMQ RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。 # 4.RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
RabbitMQ引言
RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移则是构建在开放电信平台框架上的。所有主要的编程语言均有与其代理接口通讯的客户端库。
AMQP
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
AMQP在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
RabbitMQ的安装(CentOS 7.x)
通过Docker
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
–rm(关闭容器时删除容器)
通过依赖包
资源包:https://wws.lanzous.com/b0261684d 密码:702u
资源包包括Erlang语言依赖、socat内存管理以及RabbitMQ服务
依次安装三个依赖包即可完成安装
- 将三个安装包全部拖入CentOS 7中,执行rpm命令依次进行安装
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
- 安装成功后,将RabbitMQ默认的配置文件模板拷贝一份至其目录下
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
- 使用vim对配置文件进行编辑,开放访问权限
vim /etc/rabbitmq/rabbitmq.config
- 开启RabbitMQ可视化管理插件
rabbitmq-plugins enable rabbitmq_management
- 开启服务,并通过访问 IP:15672 进入管理页面
systemctl start rabbitmq-server
systemctl status rabbitmq-server
至此,RabbitMQ安装完毕
RabbitMQ支持的消息模型
需要在项目中使用需提前加入RabbitMQ依赖
<!-- 引入rabbitmq的相关依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
Hello World模型
- P (Provider) :生产者,也就是要发送消息的程序
- C (Consumer) :消费者—>消息的接受者,会一直等待消息到来。
- Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
实现代码
Provider.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 提供者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Provider {
// 生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数1:队列名称 如果队列不存在自动创建
// 参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
// 参数3:exclusive 是否独占队列 true 独占队列 false 不独占
// 参数4:autoDelete:是否消费完成后自动删除队列 true 自动删除 false 不自动删除
// 参数5:额外附加参数
channel.queueDeclare("hello",true,false,true,null);
// 发布信息
// 参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
// 调用工具类
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
Consumer.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数1:队列名称 如果队列不存在自动创建
// 参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
// 参数3:exclusive 是否独占队列 true 独占队列 false 不独占
// 参数4:autoDelete:是否消费完成后自动删除队列 true 自动删除 false 不自动删除
// 参数5:额外附加参数
channel.queueDeclare("hello",true,false,true,null);
// 消费信息
// 参数1:队列名称 消费队列信息
// 参数2:开启消息的自动确认机制
// 参数3:消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override // 最后一个参数:消息队列中取出的信息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
}
}
工具类 RabbitMQUtils.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 兔子mqutils
*
* @author PlutoWu
* @date 2021/04/06
*/
public class RabbitMQUtils {
private static final ConnectionFactory connectionFactory;
static {
// 重量级资源 类加载只执行一次
// 创建连接工厂
connectionFactory = new ConnectionFactory();
// 设置连接rabbitmq的主机
connectionFactory.setHost("192.168.150.128");
// 设置端口号
connectionFactory.setPort(5672);
//设置连接虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}
// 定义提供连接对象的方法
public static Connection getConnection() {
try {
// 获取连接对象
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 关闭通道和关闭连接的工具方法
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
if (channel != null) channel.close();
if (connection != null) connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
测试结果
成功接受生产者提供的信息,测试完毕。
Work Queues 模型
Work queues
,也被称为(Task queues
),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
实现代码
Provider.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* 提供者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work",true,false,false,null );
for (int i = 1; i <= 20; i++) {
// 生产消息
channel.basicPublish("","work",null,(i + " =====> hello work queue").getBytes());
}
// 关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}
Consumer1.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* consumer1
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
final Channel channel = connection.createChannel();
channel.basicQos(1); // 每一次只能消费一个消息
// 通过通道声明队列
// 参数1:队列名称 参数2:消息自动确认 true 消费者自动向rabbitmq确认消息消费 false 不会自动确认消费
channel.queueDeclare("work",true,false,false,null );
channel.basicConsume("work",false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("消费者-1:" + new String(body));
// 手动确认 参数1:手动确认消息标识 参数2:false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
依次类推写出Consumer2.java,同时启动并测试即可观察到如下结果
测试结果
正常开启(开启自动确认并无休眠)即可观察到两个消费者遵循轮询消费
在两个消费者均加入每次只能消费一条信息的限制,且在Consumer1.java中加入休眠并启用手动确认可实现抢占式获取信息
Fanout 模型
Fanout 模型(也称 Publish/Subscribe 模型)是一种广播模型,也就是一个生产者可以发一个消息,进行广播,让多个消费者消费同一个消息。
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
实现代码
Provider.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* 提供者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明指定的交换机 // 参数1:交换机名称 参数2:交换机类型 fanout 广播类型
channel.exchangeDeclare("logs","fanout");
// 发送消息
channel.basicPublish("logs","work",null, " =====> fanout type queue".getBytes());
// 关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}
Consumer1.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* consumer1
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
final Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("logs","fanout");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,"logs","");
// 消费消息
channel.basicConsume(queue,true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:" + new String(body));
}
});
}
}
依次类推Consumer2以及Consumer3
测试结果
三个消费者均能接收到同一个交换机的信息,测试完毕
Routing 模型
在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到 Direct
类型的 Exchange 。
在 Direct
模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 Routing Key(路由key)
- 消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 Routing Key
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routing Key 与消息的 Routing Key 完全一致,才会接收到消息
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个 Routing Key 。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 Routing Key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 Routing Key 为 orange 的消息
- C2:消费者,其所在队列指定了需要 Routing Key 为 black 、green 的消息
实现代码
Provider.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* 提供者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Provider {
public static void main(String[] args) throws IOException {
String exchangeName = "logs_direct";
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明指定的交换机 // 参数1:交换机名称 参数2:交换机类型 direct 路由模式
channel.exchangeDeclare(exchangeName,"direct");
// 发送消息
String routingKey = "error";
channel.basicPublish(exchangeName,routingKey,null, ("这是基于direct模型发布的基于route key:[" + routingKey + "]发送的消息").getBytes());
// 关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}
Consumer1.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* consumer1
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
String exchangeName = "logs_direct";
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
final Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare(exchangeName,"direct");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queue,exchangeName,"error");
// 消费消息
channel.basicConsume(queue,true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:" + new String(body));
}
});
}
}
以此类推得出Consumer2的代码,对绑定的交换机以及队列做出略微修改
// 绑定交换机和队列
channel.queueBind(queue,exchangeName,"info");
channel.queueBind(queue,exchangeName,"error");
channel.queueBind(queue,exchangeName,"warning");
测试结果
发送Routing Key为error的信息时,由于两者都绑定了此Key所以都能接受到信息
而发送Routing Key为info的信息时,只有绑定了此Key的消费者2才能接收到信息
Topics 模型
Topics
类型的 Exchange
与 Direct
相比,都是可以根据 Routing Key
把消息路由到不同的队列。
只不过Topic
类型 Exchange
可以让队列在绑定 Routing key
的时候使用通配符。
这种模型 Routing Key
一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 通配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
实现代码
Provider.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* 提供者
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Provider {
public static void main(String[] args) throws IOException {
String exchangeName = "topics";
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通过通道声明指定的交换机 // 参数1:交换机名称 参数2:交换机类型 direct 路由模式
channel.exchangeDeclare(exchangeName,"topic");
// 发送消息
String routingKey = "save.user.delete";
channel.basicPublish(exchangeName,routingKey,null, ("这是topic动态路由模型,route key:[" + routingKey + "]发送的消息").getBytes());
// 关闭资源
RabbitMQUtils.closeConnectionAndChanel(channel, connection);
}
}
Consumer1.java****
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* consumer1
*
* @author PlutoWu
* @date 2021/04/06
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
String exchangeName = "topics";
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
final Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare(exchangeName,"topic");
// 临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定交换机和队列 动态通配符形式 routingKey
channel.queueBind(queue,exchangeName,"user.*");
// 消费消息
channel.basicConsume(queue,true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:" + new String(body));
}
});
}
}
Consumer2.java
// 绑定交换机和队列 动态通配符形式 routingKey
channel.queueBind(queue,exchangeName,"user.#");
测试结果
若Routing Key为user.delete.map,则只有通配符为#的消费者2能接收到信息
RPC 模型
在 Work Queues
模型中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。
但是,如果我们需要在远程计算机上运行功能并等待结果怎么办?那就可以用 RPC
模型(远程过程调用)。
我们将使用 RabbitMQ
构建 RPC
系统:客户端和可伸缩 RPC
服务器。由于我们没有值得分配的耗时任务,因此我们将创建一个虚拟 RPC
服务,该服务返回斐波那契数。
图解
- 对于 RPC 请求,客户端发送一条消息,该消息具有两个属性:replyTo(设置为仅为该请求创建的匿名互斥队列)和 correlationId(设置为每个请求的唯一值)。
- 该请求被发送到 rpc_queue 队列。
- RPC 工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它会使用 replyTo 字段中的队列来完成工作,并将消息和结果发送回客户端。
- 客户端等待答复队列中的数据。出现消息时,它将检查 correlationId 属性。如果它与请求中的值匹配,则将响应返回给应用程序。
消息的属性
实现代码
RPCClient.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue"; // 发送请求的队列名称
public static void main(String[] args) throws IOException, InterruptedException {
// 初始化 RPCClient
RPCClient rpcClient = new RPCClient();
rpcClient.connection = RabbitMQUtils.getConnection();
rpcClient.channel = rpcClient.connection.createChannel();
// 发送 request 请求信息
for (int i = 0; i < 5; i++) {
String i_str = Integer.toString(i);
System.out.println("现在客户端希望计算 fic("+i+")");
String response = rpcClient.call(i_str);
System.out.println("现在计算出来 fic("+i+") = "+ response);
}
// 关闭资源
close(rpcClient);
}
// 发送请求,希望调用远程的函数
public String call(String message) throws IOException, InterruptedException {
// 定义一个相关ID
final String correlationId = UUID.randomUUID().toString();
// 回调队列
String replyQueueName = channel.queueDeclare().getQueue();
// 定义消息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
.Builder()
.correlationId(correlationId) // 设置相关ID
.replyTo(replyQueueName) // 设置回调队列
.build();
// 存储相应信息
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
// 发送消息
channel.basicPublish("",requestQueueName,basicProperties,message.getBytes("UTF-8"));
// 消费消息
String ctag = channel.basicConsume(replyQueueName, true
// 参数3:服务器端传过来的回调对象
, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
if (message.getProperties().getCorrelationId().equals(correlationId)) {
response.offer(new String(message.getBody(), "UTF-8"));
}
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
// 关闭资源
public static void close(RPCClient rpcClient){
RabbitMQUtils.closeConnectionAndChanel(rpcClient.channel,rpcClient.connection);
}
}
RPCServer.java
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class RPCServer {
// 定义队列名称
private static final String RPC_QUEUE_NAME = "rpc_queue";
// 计算斐波那契数列
public static int fic(int a){
if (a == 0) return 0;
if (a == 1) return 1;
return fic(a-1) + fic(a-2);
}
public static void main(String[] args) throws IOException {
// 调用自行封装的工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
// 清除队列
channel.queuePurge(RPC_QUEUE_NAME);
// 每次处理一条信息
channel.basicQos(1);
// 定义一个监听器
final Object monitor = new Object();
// 定义回调信息
DeliverCallback deliverCallback = new DeliverCallback() {
/**
* @param consumerTag 消费者标签,可以与消费者建立联系
* @param message 消费者发送过来的消息(消息属性、消息封装体、消息没人)
*/
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
// 定义信息属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
.Builder()
.correlationId(message.getProperties().getCorrelationId()) // 指明关联ID
.build();
// 定义相应信息
String response = "";
// 解析request信息
try{
String s = new String(message.getBody(),"UTF-8");
int i = Integer.parseInt(s);
System.out.println("正在计算 fic("+i+")");
response += fic(i);
}catch (Exception e){
e.printStackTrace();
}finally {
// 发布 response 消息
channel.basicPublish("",message.getProperties().getReplyTo(),basicProperties,response.getBytes("UTF-8"));
// 手动确认信息
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
// RabbitMQ 消费者工作线程通知 RPC 服务器所有者线程
synchronized (monitor){
/**
* notify()
*
* 唤醒处于等待的线程
*/
monitor.notify();
}
}
}
};
// 消费客户端发送过来的请求消息
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, new CancelCallback() {
// 等待并准备好使用来自RPC客户端的消息
@Override
public void handle(String consumerTag) throws IOException {
while (true){
synchronized (monitor){
try {
/**
* wait()
*
* 使得当前线程立刻停止运行,处于等待状态(WAIT),
* 并将当前线程置入锁对象的等待队列中,
* 直到被通知(notify)或被中断为止。
*/
monitor.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
});
}
}
测试结果
Spring Boot 整合 RabbitMQ
首先引入RabbitMQ依赖
<!-- 引入rabbitmq集成的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
其次配置Application.yml
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 192.168.150.128
port: 5672
username: ems
password: ems
virtual-host: /ems
Hello World 模型
代码实现
HelloConsumer.java
package cn.plutowu.hello;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
*
* @author PlutoWu
* @date 2021/04/06
*/
@Component // 持久化 非独占 默认
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloConsumer {
/**
* @RabbitListener:申明监听的队列
* @Queue:具体指明哪一个队列及其相应的属性
* @RabbitHandler:指定收到消息时的回调方法
*/
@RabbitHandler
public void receive(String message) {
System.out.println("hello message = " + message);
}
}
Test 片段
package cn.plutowu.test;
import cn.plutowu.RabbitmqSpringbootApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// hello world
@Test
public void testHello() {
rabbitTemplate.convertAndSend("hello","hello world");
}
}
测试结果
Work Queues 模型
代码实现
WorkConsumer.java
package cn.plutowu.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
*
* @author PlutoWu
* @date 2021/04/06
*/
@Component
public class WorkConsumer {
// 一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("work message1 = " + message);
}
// 一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("work message2 = " + message);
}
}
Test 片段
// work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work模型" + i);
}
}
测试结果
遵循了轮询规则,把信息分配到两个消费者上
Fanout 模型
代码实现
FanoutConsumer.java
package cn.plutowu.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
*
* @author PlutoWu
* @date 2021/04/06
*/
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout") // 绑定的交换机
)
})
public void receive1(String message) {
System.out.println("work message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout") // 绑定的交换机
)
})
public void receive2(String message) {
System.out.println("work message2 = " + message);
}
}
Test 片段
// fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs","","Fanout模型的message");
}
测试结果
再绑定同一个交换机的情况下,两个消费者均接收到此信息
Routing 模型
实现代码
RouteConsumer.java
package cn.plutowu.route;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author PlutoWu
* @date 2021/04/06
*/
@Component
public class RouteConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), // 绑定的交换机
key = {"info", "error", "warn"}
)
})
public void receive1(String message) {
System.out.println("route message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), // 绑定的交换机
key = {"error"}
)
})
public void receive2(String message) {
System.out.println("route message2 = " + message);
}
}
Test 片段
// route 路由模式
@Test
public void testRoute() {
rabbitTemplate.convertAndSend("directs","error","发送error的key的路由message");
}
测试结果
发送Routing Key —–> error 两个消费者皆可接收
发送Routing Key —–> info 只有消费者2能接收
Topics 模型
实现代码
TopicConsumer.java
package cn.plutowu.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author PlutoWu
* @date 2021/04/06
*/
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "topics", type = "topic"), // 绑定的交换机
key = {"user.save", "user.*"}
)
})
public void receive1(String message) {
System.out.println("topic message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "topics", type = "topic"), // 绑定的交换机
key = {"order.#", "produce.#", "user.*"}
)
})
public void receive2(String message) {
System.out.println("topic message2 = " + message);
}
}
Test 片段
// topic 动态路由 订阅模式
@Test
public void testTopic() {
rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
}
测试结果
发送user.save均可接收
发送prodece.save只有消费者2可以接收
RabbitMQ集群
普通集群(副本集群)
默认情况下:RabbitMQ 代理操作所需的所有数据/状态都将跨所有节点复制。
这方面的一个例外是消息队列,默认情况下,==消息队列仅位于一个节点上==,尽管它们可以从所有节点看到和访问
架构图
核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份。
集群搭建
# 0.集群规划
node1: 10.15.0.3 mq1 master 主节点
node2: 10.15.0.4 mq2 repl1 副本节点
node3: 10.15.0.5 mq3 repl2 副本节点
# 1.克隆三台机器主机名和ip映射
vim /etc/hosts加入:
10.15.0.3 mq1
10.15.0.4 mq2
10.15.0.5 mq3
node1: vim /etc/hostname 加入: mq1
node2: vim /etc/hostname 加入: mq2
node3: vim /etc/hostname 加入: mq3
# 2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行:
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
# 3.查看cookie是否一致:
node1: cat /var/lib/rabbitmq/.erlang.cookie
node2: cat /var/lib/rabbitmq/.erlang.cookie
node3: cat /var/lib/rabbitmq/.erlang.cookie
# 4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面:
rabbitmq-server -detached
# 5.在node2和node3执行加入集群命令:
1.关闭 rabbitmqctl stop_app
2.加入集群 rabbitmqctl join_cluster rabbit@mq1
3.启动服务 rabbitmqctl start_app
# 6.查看集群状态,任意节点执行:
rabbitmqctl cluster_status
# 7.如果出现如下显示,集群搭建成功:
Cluster status of node rabbit@mq3 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
{cluster_name,<<"rabbit@mq1">>},
{partitions,[]},
{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]
镜像集群
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。
架构图
集群搭建
# 0.策略说明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定
nodes:表示在指定的节点上进行镜像,节点名称通过 ha-params 指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual(默认)
priority:可选参数,policy 的优先级(数字越高,优先级越高)
# 1.查看当前策略
rabbitmqctl list_policies
# 2.添加策略
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
说明:策略正则表达式为 “^” 表示所有匹配所有队列名称 ^hello:匹配hello开头队列
# 3.删除策略
rabbitmqctl clear_policy ha-all
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!