RabbitMQ
RabbitMQ
神奇海螺MQ的作用
消息队列是用来
解耦:微服务之间的调用,可以通过消息队列进行
异步:微服务之间的调用,放到消息队列即可,可以继续做其他事情,消息队列仅能保证数据的最终一致性
削峰:并发量高时容易使服务器宕机,通过消息队列可限流进行请求处理
AMQP和JMS
- AMQP是一种高级消息队列协议(Advanced Message Queuing Protocol),AMQP从不和API层进行限定,而是直接定义网络交换的数据格式
- JMS是java消息服务(JavaMessage Service)应用程序接口,是一个java平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信
AMQP和JMS的区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用java语言;AMQP只是协议,不规定实现方式因此是跨语言的
- JMS规定了两种消息模式(1对1:点对点模式;1对多:发布订阅模式);AMQP的消息模式更加丰富
安装
1 | |
pom文件
yml配置文件
spring:
rabbitmq:
host: 192.168.6.128 //访问ip
port: 5672 //访问端口,是基于amqp协议的
username: guest //账号
password: 123456 //密码
virtual-host: / //虚拟主机
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
prefetch: 100 #消费者端限流,配合手动确认(自动确认无法限流)
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info
操作方法
rabbit控制台可视化界面操作
访问rabbit的控制台端口操作
程序中使用注解方式操作
@RabbitListener( //Rabbit监听注解
bindings={ //绑定交换机和队列注解
@QueueBinding( //队列绑定注解
value=@Queue( //队列结构注解
name=”my_queue”, //队列名称
durable=”true”, //是否支持持久化
exclusive=”false”, //是否为排他队列
autoDelete=”false”,//自动删除是否开启
declare=”true” //存在时不创建直接使用
),
arguments={
@Argument(name=””,value-“”) //配置参数
}
exchange=@Exchange(
name=”my_exchange”, //交换机名称
durable=”true”, //是否支持持久化
type=”ExchangeTypes.DIRECT”, //交换机类型
autoDelete=”false”, //自动删除是否开启
internal=”false”, //是否为内部交换机,只支持rabbit内部使用
declare=”true” //存在时不创建直接使用
),
key={
“my_queue”,”my_queue2” //为队列设置可key
}
)
}
)
程序中使用配置类方式操作
传递参数
Message 消息的二进制文件
Channel 信道(通信通道),基本用来手动确认ack,他是发命令和收回包的接口
简单模式使用
简单模式生产者
pom文件
yml文件
API部分
package com.server;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_DERECT=""; //使用的交换机
public final static String ROUTING_KEY_SIMPLE="queue"; //使用的队列
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testProduct(){
rabbitTemplate.convertAndSend(EXCHANGE_DERECT,ROUTING_KEY_SIMPLE,"hello,world,rabbit...");
}
}
简单模式消费者
pom文件
yml文件
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "queue") //监听消息队列,queue队列
public void getListener(String str, Message message, Channel channel) {
System.out.println(str); //接收消息字符串
byte[] body = message.getBody(); //接收消息对象(字节码文件)
System.out.println(new String(body)); //转换为字符串输出
MessageProperties messageProperties = message.getMessageProperties();//获取信息属性
System.out.println("messageProperties"+messageProperties);
}
}
工作模式使用(多开启几个消费者提高吞吐量,队列处理的消息是不同的,默认是轮询策略)
工作模式生产者
pom文件
yml文件
API部分
package com.server;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_DERECT=""; //使用的交换机
public final static String ROUTING_KEY_WORK="work_queue"; //使用的队列
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testWorkProduct(){
for(int i=1;i<=10;i++){
rabbitTemplate.convertAndSend(EXCHANGE_DERECT,ROUTING_KEY_WORK,"hello,world,rabbit i="+i);
}
}
}
工作模式消费者
pom文件
yml文件
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
@RabbitListener(queues = "work_queue")
public void getListener1(String str, Message message, Channel channel) {
System.out.println("user1"+str);
}
@RabbitListener(queues = "work_queue")
public void getListener2(String str, Message message, Channel channel) {
System.out.println("user2"+str);
}
}
广播模式使用(需要自己创建交换机,消息发送给每个绑定的队列,每个队列得到的消息是相同的)
广播模式生产者
pom文件
yml文件
API部分
package com.server;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_FANOUT=”my_fanout”; //使用的交换机
public final static String ROUTING_KEY_SIMPLE=””; //使用的队列
@Autowired
RabbitTemplate rabbitTemplate;
//广播模式
@Test
public void testFanoutProduct(){
rabbitTemplate.convertAndSend(ROUTING_KEY_fanout,"","hello,world,rabbit..");
}
}
广播模式消费者
pom文件
yml文件
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
@RabbitListener(queues = "fanout01")
public void getListener1(String str, Message message, Channel channel) {
System.out.println("fanout01"+str);
}
@RabbitListener(queues = "fanout02")
public void getListener2(String str, Message message, Channel channel) {
System.out.println("fanout02"+str);
}
}
路由模式使用(发送给指定的key的队列)
路由模式生产者
pom文件
yml文件
API部分
package com.server;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_ROUTE=”my_route”; //使用的交换机
public final static String ROUTING_KEY_ROUTE=”route_test”; //使用的队列
@Autowired
RabbitTemplate rabbitTemplate;
//路由模式
@Test
public void testRouteProduct(){
rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}
}
路由模式消费者
pom文件
yml文件
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RouteListener {
@RabbitListener(queues = "route01")
public void getListener1(String str, Message message, Channel channel) {
System.out.println("route01"+str);
}
}
topics模式使用(通配符匹配)
* 匹配一个词
# 匹配零个或多个词
topics模式生产者
pom文件
yml文件
API部分
public final static String EXCHANGE_TOPICS=”my_topics”;
public final static String ROUTING_KEY_TOPICS01=”item.a.data”;
public final static String ROUTING_KEY_TOPICS02=”item.data.b”;
public final static String ROUTING_KEY_TOPICS03=”c.item.data”;
//topics模式
@Test
public void testTopicsProduct(){
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS01,”hello,world,rabbit..aaaaa”);
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS02,”hello,world,rabbit..bbbbb”);
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS03,”hello,world,rabbit..ccccc”);
}
topics消费者
pom文件
yml文件
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicsListener {
@RabbitListener(queues = "topics01")
public void getListener1(String str, Message message, Channel channel) {
System.out.println("topics01"+str);
}
@RabbitListener(queues = "topics02")
public void getListener2(String str, Message message, Channel channel) {
System.out.println("topics02"+str);
}
}
消息的可靠性投递
1.消息队列的交换机和队列收到消息应该给服务端返回一个ack确认
2.消息队列中的交换机和队列的消息需要持久化,避免宕机消息丢失
3.消息处理失败,不应该返回确认,应当重试
解决方案1 发送时的确认策略
确认模式(confirm) 消息队列收到消息返回ack:true/false,false重试
回退模式(return) 消息队列找不到消息对应的队列,回退给生产者
yml配置文件部分
spring:
rabbitmq:
host: 192.168.6.104
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info
api部分
注册交换机确认模式和队列回退模式
package com.server.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component //注册为bean
@Slf4j //日志采集
public class RabbitMQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { //实现消息确认模式接口(ConfirmCallback),实现回退模式接口(ReturnsCallback)
@Autowired
private RabbitTemplate rabbitTemplate; //rabbit模板对象
@PostConstruct //bean注册时额外的初始化操作
public void init() {
rabbitTemplate.setConfirmCallback(this); //将当前类注入到模板中
rabbitTemplate.setReturnsCallback(this); //将当前类注入到模板中
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {//处理消息确认的方法,包含关联数据,消息状态,状态信息
if(b){
System.out.println("发送成功");
}else{
System.out.println("发送失败,找不到交换机");
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {//处理返回模式的方法,包含消息和退回原因
System.out.println("找不到队列");
}
}
测试
package com.server;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_ROUTE=”my_route”; //使用的交换机
public final static String ROUTING_KEY_ROUTE=”route_test”; //使用的队列
@Autowired
RabbitTemplate rabbitTemplate;
//测试故障(正确示例)
@Test
public void testErrorProduct(){
rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}
//测试故障(错误示例1)
@Test
public void testErrorProduct1(){
rabbitTemplate.convertAndSend("123",ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}
//测试故障(错误示例2)
@Test
public void testErrorProduct2(){
rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,"123","hello,world,rabbit..");
}
//测试故障(错误示例3)
@Test
public void testErrorProduct3(){
rabbitTemplate.convertAndSend("123","123","hello,world,rabbit..");
}
}
解决方案2 持久化
设置交换机和队列为持久化
解决方案3 重试机制
消费者处理消息异常,返回ack=false,requeue=true
ack=false表示不确认,requeue=true表示让消息回到队列,为false则不回队列
yml部分
spring:
rabbitmq:
host: 192.168.6.104
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info
API部分
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ErrorListener {
@RabbitListener( //Rabbit监听注解
bindings={ //绑定交换机和队列注解
@QueueBinding( //队列绑定注解
value=@Queue( //队列结构注解
name="my_queue" //队列名称
),
exchange=@Exchange(
name="my_exchange", //交换机名称
type=ExchangeTypes.DIRECT //交换机类型
),
key={
"my_queue","my_queue2" //为队列设置可key
}
)
}
)
public void getListener1(String str, Message message, Channel channel)throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag(); //获取消息唯一标识
try{
channel.basicAck(deliveryTag,false); //手动确认消息,deliveryTag是消息的唯一标识,false表示不批量处理,true为批量处理,会批量处理掉唯一标志之前的消息
}catch (Exception e){
Boolean reDelivered = message.getMessageProperties().getRedelivered(); //获取是否是重复投递
if(!reDelivered){
channel.basicReject(deliveryTag,true); //拒绝并放回队列,可以重新处理
}else{
channel.basicNack(deliveryTag,false,false); //丢弃或放到死信队列
}
}
}
}
保证消息百分比投递成功
发消息到队列前记录到数据库此条消息的状态,然后向消息队列发送消息,成功返回将数据库消息状态改为1,因为某种原因,消息队列并未返回ack,生产者会定时查询数据库中状态为0的消息,并重新发送,定时器到达次数限制后将数据库中消息状态改为2,表示消息不可投递
消息幂等性
在数据库中定义版本号字段,当修改成功后版本号发生变化,后续的其他修改都不会匹配成功版本号,导致执行失败
设置队列超时时间
1.在rabbit控制台创建队列时指定超时时间参数和值
2.在程序中创建队列时指定超时时间参数和值
设置消息超时时间
1 | |
死信队列
死信队列结构
死信交换机和死信队列和正常交换机正常队列一样
x-dead-letter-exchange 正常队列指定死信交换机
x-dead-letter-routing-key 正常队列指定死信队列的key
消息进入死信队列的情况
1.队列溢出消息进入死信队列
2.信息超时进入死信队列
3.拒收且不将消息放回队列会进入死信队列
延时消息
1 | |
安装插件
1 | |
启用插件
1 | |
使用插件
创建延迟交换机 类型选择x-delayed-message,指定x-delayed-type参数(发送类型)
API部分
消费者
1 | |
生产者
1 | |


