RabbitMQ

MQ的作用

消息队列是用来
解耦:微服务之间的调用,可以通过消息队列进行
异步:微服务之间的调用,放到消息队列即可,可以继续做其他事情,消息队列仅能保证数据的最终一致性
削峰:并发量高时容易使服务器宕机,通过消息队列可限流进行请求处理

AMQP和JMS

  • AMQP是一种高级消息队列协议(Advanced Message Queuing Protocol),AMQP从不和API层进行限定,而是直接定义网络交换的数据格式
  • JMS是java消息服务(JavaMessage Service)应用程序接口,是一个java平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

AMQP和JMS的区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用java语言;AMQP只是协议,不规定实现方式因此是跨语言的
  • JMS规定了两种消息模式(1对1:点对点模式;1对多:发布订阅模式);AMQP的消息模式更加丰富

安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 拉取镜像
docker pull rabbitmq:3.12-management

# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.12-management

pom文件

org.springframework.boot spring-boot-starter-parent 3.1.5 //amqp场景启动器 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test

yml配置文件

spring:
rabbitmq:
host: 192.168.6.128 //访问ip
port: 5672 //访问端口,是基于amqp协议的
username: guest //账号
password: 123456 //密码
virtual-host: / //虚拟主机
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
prefetch: 100 #消费者端限流,配合手动确认(自动确认无法限流)
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info

操作方法

rabbit控制台可视化界面操作

访问rabbit的控制台端口操作

程序中使用注解方式操作

@RabbitListener( //Rabbit监听注解
bindings={ //绑定交换机和队列注解
@QueueBinding( //队列绑定注解
value=@Queue( //队列结构注解
name=”my_queue”, //队列名称
durable=”true”, //是否支持持久化
exclusive=”false”, //是否为排他队列
autoDelete=”false”,//自动删除是否开启
declare=”true” //存在时不创建直接使用
),
arguments={
@Argument(name=””,value-“”) //配置参数
}
exchange=@Exchange(
name=”my_exchange”, //交换机名称
durable=”true”, //是否支持持久化
type=”ExchangeTypes.DIRECT”, //交换机类型
autoDelete=”false”, //自动删除是否开启
internal=”false”, //是否为内部交换机,只支持rabbit内部使用
declare=”true” //存在时不创建直接使用
),
key={
“my_queue”,”my_queue2” //为队列设置可key
}
)
}
)

程序中使用配置类方式操作

传递参数

Message 消息的二进制文件
Channel 信道(通信通道),基本用来手动确认ack,他是发命令和收回包的接口

简单模式使用

简单模式生产者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {

public final static String EXCHANGE_DERECT="";    //使用的交换机
public final static String ROUTING_KEY_SIMPLE="queue";  //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void testProduct(){
    rabbitTemplate.convertAndSend(EXCHANGE_DERECT,ROUTING_KEY_SIMPLE,"hello,world,rabbit...");
}

}

简单模式消费者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

@RabbitListener(queues = "queue")   //监听消息队列,queue队列
public void getListener(String str, Message message, Channel channel) {
    System.out.println(str);  //接收消息字符串
    byte[] body = message.getBody();  //接收消息对象(字节码文件)
    System.out.println(new String(body));  //转换为字符串输出
    MessageProperties messageProperties = message.getMessageProperties();//获取信息属性
    System.out.println("messageProperties"+messageProperties);
}

}

工作模式使用(多开启几个消费者提高吞吐量,队列处理的消息是不同的,默认是轮询策略)

工作模式生产者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {

public final static String EXCHANGE_DERECT="";    //使用的交换机
public final static String ROUTING_KEY_WORK="work_queue";  //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void testWorkProduct(){
    for(int i=1;i<=10;i++){
        rabbitTemplate.convertAndSend(EXCHANGE_DERECT,ROUTING_KEY_WORK,"hello,world,rabbit i="+i);
    }
}

}

工作模式消费者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkListener {

@RabbitListener(queues = "work_queue")
public void getListener1(String str, Message message, Channel channel) {
    System.out.println("user1"+str);
}

@RabbitListener(queues = "work_queue")
public void getListener2(String str, Message message, Channel channel) {
    System.out.println("user2"+str);
}

}

广播模式使用(需要自己创建交换机,消息发送给每个绑定的队列,每个队列得到的消息是相同的)

广播模式生产者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_FANOUT=”my_fanout”; //使用的交换机
public final static String ROUTING_KEY_SIMPLE=””; //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

//广播模式
@Test
public void testFanoutProduct(){
    rabbitTemplate.convertAndSend(ROUTING_KEY_fanout,"","hello,world,rabbit..");
}

}

广播模式消费者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutListener {

@RabbitListener(queues = "fanout01")
public void getListener1(String str, Message message, Channel channel) {
    System.out.println("fanout01"+str);
}

@RabbitListener(queues = "fanout02")
public void getListener2(String str, Message message, Channel channel) {
    System.out.println("fanout02"+str);
}

}

路由模式使用(发送给指定的key的队列)

路由模式生产者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_ROUTE=”my_route”; //使用的交换机
public final static String ROUTING_KEY_ROUTE=”route_test”; //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

//路由模式
@Test
public void testRouteProduct(){
    rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}

}

路由模式消费者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RouteListener {

@RabbitListener(queues = "route01")
public void getListener1(String str, Message message, Channel channel) {
    System.out.println("route01"+str);
}

}

topics模式使用(通配符匹配)

* 匹配一个词
# 匹配零个或多个词

topics模式生产者

pom文件

pom文件

yml文件

yml配置文件

API部分

public final static String EXCHANGE_TOPICS=”my_topics”;
public final static String ROUTING_KEY_TOPICS01=”item.a.data”;
public final static String ROUTING_KEY_TOPICS02=”item.data.b”;
public final static String ROUTING_KEY_TOPICS03=”c.item.data”;
//topics模式
@Test
public void testTopicsProduct(){
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS01,”hello,world,rabbit..aaaaa”);
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS02,”hello,world,rabbit..bbbbb”);
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS,ROUTING_KEY_TOPICS03,”hello,world,rabbit..ccccc”);

}

topics消费者

pom文件

pom文件

yml文件

yml配置文件

API部分

package com.server;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicsListener {

@RabbitListener(queues = "topics01")
public void getListener1(String str, Message message, Channel channel) {
    System.out.println("topics01"+str);
}

@RabbitListener(queues = "topics02")
public void getListener2(String str, Message message, Channel channel) {
    System.out.println("topics02"+str);
}

}

消息的可靠性投递

1.消息队列的交换机和队列收到消息应该给服务端返回一个ack确认
2.消息队列中的交换机和队列的消息需要持久化,避免宕机消息丢失
3.消息处理失败,不应该返回确认,应当重试

解决方案1 发送时的确认策略

确认模式(confirm) 消息队列收到消息返回ack:true/false,false重试
回退模式(return) 消息队列找不到消息对应的队列,回退给生产者

yml配置文件部分

spring:
rabbitmq:
host: 192.168.6.104
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info

api部分

注册交换机确认模式和队列回退模式

package com.server.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component //注册为bean
@Slf4j //日志采集
public class RabbitMQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { //实现消息确认模式接口(ConfirmCallback),实现回退模式接口(ReturnsCallback)

@Autowired
private RabbitTemplate rabbitTemplate;    //rabbit模板对象
@PostConstruct    //bean注册时额外的初始化操作
public void init() {
    rabbitTemplate.setConfirmCallback(this);   //将当前类注入到模板中
    rabbitTemplate.setReturnsCallback(this);   //将当前类注入到模板中
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {//处理消息确认的方法,包含关联数据,消息状态,状态信息
    if(b){
        System.out.println("发送成功");
    }else{
        System.out.println("发送失败,找不到交换机");
    }
}

@Override
public void returnedMessage(ReturnedMessage returnedMessage) {//处理返回模式的方法,包含消息和退回原因
    System.out.println("找不到队列");
}

}

测试

package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_ROUTE=”my_route”; //使用的交换机
public final static String ROUTING_KEY_ROUTE=”route_test”; //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

//测试故障(正确示例)
@Test
public void testErrorProduct(){
    rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}
//测试故障(错误示例1)
@Test
public void testErrorProduct1(){
    rabbitTemplate.convertAndSend("123",ROUTING_KEY_ROUTE,"hello,world,rabbit..");
}
//测试故障(错误示例2)
@Test
public void testErrorProduct2(){
    rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,"123","hello,world,rabbit..");
}
//测试故障(错误示例3)
@Test
public void testErrorProduct3(){
    rabbitTemplate.convertAndSend("123","123","hello,world,rabbit..");
}

}

解决方案2 持久化

设置交换机和队列为持久化

解决方案3 重试机制

消费者处理消息异常,返回ack=false,requeue=true
ack=false表示不确认,requeue=true表示让消息回到队列,为false则不回队列

yml部分

spring:
rabbitmq:
host: 192.168.6.104
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info //日志级别info

API部分

package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ErrorListener {

@RabbitListener(  //Rabbit监听注解
        bindings={    //绑定交换机和队列注解
                @QueueBinding(   //队列绑定注解
                        value=@Queue(  //队列结构注解
                                name="my_queue"   //队列名称
                        ),
                        exchange=@Exchange(
                        name="my_exchange", //交换机名称
                        type=ExchangeTypes.DIRECT //交换机类型
                ),
                        key={
                        "my_queue","my_queue2"  //为队列设置可key
                }
                )
        }
)
public void getListener1(String str, Message message, Channel channel)throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();  //获取消息唯一标识
    try{
        channel.basicAck(deliveryTag,false);     //手动确认消息,deliveryTag是消息的唯一标识,false表示不批量处理,true为批量处理,会批量处理掉唯一标志之前的消息
    }catch (Exception e){
        Boolean reDelivered = message.getMessageProperties().getRedelivered(); //获取是否是重复投递
        if(!reDelivered){
            channel.basicReject(deliveryTag,true);  //拒绝并放回队列,可以重新处理
        }else{
            channel.basicNack(deliveryTag,false,false);  //丢弃或放到死信队列
        }
    }
}

}

保证消息百分比投递成功

发消息到队列前记录到数据库此条消息的状态,然后向消息队列发送消息,成功返回将数据库消息状态改为1,因为某种原因,消息队列并未返回ack,生产者会定时查询数据库中状态为0的消息,并重新发送,定时器到达次数限制后将数据库中消息状态改为2,表示消息不可投递

消息幂等性

在数据库中定义版本号字段,当修改成功后版本号发生变化,后续的其他修改都不会匹配成功版本号,导致执行失败

设置队列超时时间

1.在rabbit控制台创建队列时指定超时时间参数和值
2.在程序中创建队列时指定超时时间参数和值

设置消息超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {

public final static String EXCHANGE_ROUTE="my_route"; //使用的交换机
public final static String ROUTING_KEY_ROUTE="route_test"; //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

//设置消息超时时间
@Test
public void testTimeoutProduct3(){
MessagePostProcessor messagePostProcessor=(Message message)->{
message.getMessageProperties().setExpiration("5000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"hello,world,rabbit..",messagePostProcessor);
}
}

死信队列

死信队列结构

死信交换机和死信队列和正常交换机正常队列一样
x-dead-letter-exchange 正常队列指定死信交换机
x-dead-letter-routing-key 正常队列指定死信队列的key

消息进入死信队列的情况

1.队列溢出消息进入死信队列
2.信息超时进入死信队列
3.拒收且不将消息放回队列会进入死信队列

延时消息

1
2
3
4
5
6
延迟队列插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
延迟期限:最多延迟两天
一些问题:
1.最多延迟两天
2.主要是创建延迟交换机实现的,延迟转发消息给队列,由于延迟发送给队列,如果开启回退模式,每次都会调用我们的回退处理程序,我们的回退处理程序需要判定是否是延迟消息
3.插件版本和rabbit版本需要一致

安装插件

1
2
3
4
下载插件,并移动到容器目录中
wget https://github.com/rabbitmq/rabbitmq-delayed-message-
exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
mv rabbitmq_delayed_message_exchange-3.12.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data

启用插件

1
2
3
4
5
6
7
8
登录进入容器内部
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-pluginsIenable rabbitmq_delayed_message_exchange
退出Docker容器
exit
重启Docker容器
docker restart rabbitmq

使用插件

创建延迟交换机 类型选择x-delayed-message,指定x-delayed-type参数(发送类型)

API部分
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.server;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class RelayListener {

@RabbitListener( //Rabbit监听注解
bindings={ //绑定交换机和队列注解
@QueueBinding( //队列绑定注解
value=@Queue( //队列结构注解
name="queue_delay" //队列名称
),
exchange=@Exchange(
name="exchange_delay", //交换机名称
type= "x-delayed-message", //交换机类型
arguments = {
@Argument(name = "x-delay-type",value = "direct") //配置参数
}
),
key={
"my_queue","my_queue2" //为队列设置可key
}
)
}
)
public void getListener1(String str, Message message, Channel channel) throws IOException {
System.out.println("延迟队列delay...");
}
}
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.server;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class testRabbitMQ {
public final static String EXCHANGE_ROUTE="my_route"; //使用的交换机
public final static String ROUTING_KEY_ROUTE="route_test"; //使用的队列

@Autowired
RabbitTemplate rabbitTemplate;

//延迟发送消息
@Test
public void testDelayProduct3(){
MessagePostProcessor messagePostProcessor=(Message message)->{
message.getMessageProperties().setExpiration("10000");
return message;
};
rabbitTemplate.convertAndSend(EXCHANGE_ROUTE,ROUTING_KEY_ROUTE,"rabbit.."+new Date(),messagePostProcessor);
}
}