RabbitMQ学习
安装并启动
配置yum源
[rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7 gpgcheck=1 gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc repo_gpgcheck=0 enabled=1
安装
yum install -y rabbitmq-server
启动
rabbitmq-server start &
安装管控台插件
rabbitmq-plugins enable rabbitmq_management
控制台
登录
浏览器输入
http://ip:15672/
用户名guest
密码guest
命令行
- 关闭应用
rabbitmqctl stop_app
- 启动应用
rabbitmqctl start_app
- 节点状态
rabbitmqctl status
- 添加用户
rabbitmqctl add username password
- 列出所有用户
rabbitmqctl list_users
- 删除用户
rabbitmqctl delete_user username
- 修改密码
rabbitmqctl change_password username newpassword
- 列出用户权限
rabbitmqctl list_user_permissions username
- 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
- 设置用户权限
rabbitmqctl set_permissions -p vhostpath username ".*"".*"".*"
- 创建虚拟主机
rabbitmqctl add_vhost vhostpath
- 列出所有虚拟主机
rabbitmqctl list_vhost
- 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
- 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
- 查看所有队列信息
rabbitmqctl list_queues
- 清除队列中的消息
rabbitmqctl -p vhostpath purge_queue blue
- 移除所有数据(要在rabbitmqctl stop_app之后使用)
rabbitmqctl reset
- 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]
- 查看集群状态
rabbitmqctl cluster_status
- 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc|ram
- 摘除节点
rabbitmqctl forget_cluster_node [offline]
- 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]
java客户端
生产者
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: haoming
* @Date: 2020/2/23 4:03 下午
* @Version 1.0
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("152.136.233.203");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.通过channel发送数据
for (int i = 0; i < 5; i++) {
String msg = "hello RabbitMQ!";
channel.basicPublish("","test001",null,msg.getBytes());
//如果未指定exchange,默认使用(AMQP default) exchange,这个exchange会根据routerKey找到同名队列
}
//5.关闭相关的连接
channel.close();
connection.close();
}
}
消费者
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: haoming
* @Date: 2020/2/23 4:04 下午
* @Version 1.0
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("152.136.233.203");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel channel = connection.createChannel();
//4.声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName,true,false,false,null);
//5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6.设置channel
channel.basicConsume(queueName,true,queueingConsumer);
//7.获取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("msg="+msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}
交换机
作用
接收消息,并根据路由键转发消息到绑定的队列。
属性
- Name 交换机名称
- Type 交换机类型 direct topic fanout headers(以下图片参考https://www.cnblogs.com/stefan-liu/p/5315809.html)
- direct(使用比较多)
- 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
- Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouterKey必须完全匹配才能完全接收,否则该消息会被抛弃。
- Topic
- 所有发送到Topic Exchange的消息被转发到所有关心RouterKey中指定Topic的Queue上。
- Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic。
- fanout
- 不处理路由键,只需要简单的将队列绑定到交换机上。
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- 转发消息是最快的。
- direct(使用比较多)
- Durability 是否需要持久化,true未持久化。
- Auto Delete 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。
- Internal 当前Exchange是否用于RabbitMQ内部使用,默认为false。
- Arguments 扩展参数,用于扩展AMQP自制定化使用。
模糊匹配
符号 “#” 匹配一个或多个词
“log.#” 能够匹配到“log.info.oa”
符号 “*” 匹配不多不少一个词
“log.*” 只会匹配到 “log.erro”
binding
作用
Exchange 和 Exchang、Queue之间的连接关系。
Binding中可以包含RoutingKey或者参数。
消息队列
作用
存储消息数据
属性
- Durability:是否持久化,Durable:是;Transient:否。
- Auto delete:yes:表示当最后一个监听移除之后,该Queue会自动被删除。
Message
作用
服务器和应用程序之间传送的数据。
本质上就是一段数据,由Properties和Payload(Body)组成。
属性
Delivery mode 送达模式 :持久化非持久化
Headers 自定义属性
其他属性:
- content_type
- content_encoding
- priority 优先级
- correlation_id 消息唯一id(消息幂等)
- reply_to
- expiration 消息到期时间
- message_id 消息的id
- timestamp
- type
- user_id
- app_id
- cluster_id
Map<String, Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .headers(headers) .build();
Virtual host 虚拟主机
作用
虚拟地址,用于进行逻辑隔离,最上层的消息路由。
一个vhost里面可以有若干个exchange和queue。
同一个vhost里面不能有相同名称的exchange和queue。
RabbitMQ架构图
消息丢失
生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点成功接收
- 发送端收到MQ节点确认应答
- 完善的消息补偿机制
解决方案
消息落库,对消息状态进行打标
- 业务数据入库,消息入库,消息状态为0
- 发送消息到MQ
- 消息确认回调
- 更新消息状态为1
- 定时任务拉取一段时间内状态一直为0的消息,进行重发
- 超过规定的重发次数仍然没有成功,更新消息状态为2,人工介入
消息的延迟投递,做二次确认,回调检查(高并发)
- 业务数据入库
- 发送两条消息,其中一条正常发送(队列0),其中一条延迟发送(队列1)
- 消费者消费消息
- 消费者发送消费确认消息到MQ(队列2)
- callback服务消费确认消息(队列2),并将确认结果入库
- callback服务消费延迟发送的消息(队列1),确认此消息是否成功,如果未成功,通知生产者重发
重复消费
消费端-幂等性保障
消费端实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息
幂等性解决方案
- 唯一ID+指纹码机制 ,利用数据库主键去重
- select count(1) from TABLE where id = 唯一id+指纹码
- 好处:实现简单;坏处:高并发下有数据库写入的性能瓶颈
- 解决方案:根据ID进行分库分表
- 利用Redis的原子性去实现
- 需要考虑的问题
- 是否要进行数据的落库,如果要落库的话,要解决的问题是数据库和缓存如何做到原子性
- 如果不进行落库,都存储到缓存中,如何设置定时同步的策略
- 需要考虑的问题
确认机制
Confirm确认消息
生产端代码
//指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
Return消息
在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key找不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
配置项
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息
生产端代码
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//mandatory设置为true
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
消费端自定义监听
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消费端限流
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息
autoACK 一定要设置成false.不自动签收。
//void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
//prefetchSize:0
//prefetchCount:告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该cousumer将block掉,直到有消息ack
//global:是否将上面设置应用于channel,就是上面的限制是channel级别还是consumer级别
//prefetchSize、global RabbitMQ还没有实现
//1 限流方式 第一件事就是 autoAck设置为 false
//消费端
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消费端ack与重回队列
重回队列
对没有处理成功的消息,把消息重新传递给Broker
一般在实际的应用中,都会关闭重回队列,设置为false
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//nack,处理失败,重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
死信队列(DLX Dead-Letter-Exchange)
定义
- 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另外一个Exchange,这个Exchange就是DLX
- DLX也是一个正常的Exchange
消息变成死信的几种情况
- 消息被拒绝(basic.reject/basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("ip");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}