安装并启动

  1. 配置yum源

    [rabbitmq-erlang]
    name=rabbitmq-erlang
    baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/20/el/7
    gpgcheck=1
    gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
    repo_gpgcheck=0
    enabled=1
  2. 安装 yum install -y rabbitmq-server

  3. 启动 rabbitmq-server start &

  4. 安装管控台插件 rabbitmq-plugins enable rabbitmq_management

控制台

  1. 登录

    浏览器输入http://ip:15672/ 用户名guest 密码guest

命令行

  1. 关闭应用 rabbitmqctl stop_app
  2. 启动应用 rabbitmqctl start_app
  3. 节点状态 rabbitmqctl status
  4. 添加用户 rabbitmqctl add username password
  5. 列出所有用户 rabbitmqctl list_users
  6. 删除用户 rabbitmqctl delete_user username
  7. 修改密码 rabbitmqctl change_password username newpassword
  8. 列出用户权限rabbitmqctl list_user_permissions username
  9. 清除用户权限 rabbitmqctl clear_permissions -p vhostpath username
  10. 设置用户权限 rabbitmqctl set_permissions -p vhostpath username ".*"".*"".*"
  11. 创建虚拟主机rabbitmqctl add_vhost vhostpath
  12. 列出所有虚拟主机 rabbitmqctl list_vhost
  13. 列出虚拟主机上所有权限 rabbitmqctl list_permissions -p vhostpath
  14. 删除虚拟主机 rabbitmqctl delete_vhost vhostpath
  15. 查看所有队列信息 rabbitmqctl list_queues
  16. 清除队列中的消息 rabbitmqctl -p vhostpath purge_queue blue
  17. 移除所有数据(要在rabbitmqctl stop_app之后使用) rabbitmqctl reset
  18. 组成集群命令rabbitmqctl join_cluster <clusternode> [--ram]
  19. 查看集群状态rabbitmqctl cluster_status
  20. 修改集群节点的存储形式rabbitmqctl change_cluster_node_type disc|ram
  21. 摘除节点rabbitmqctl forget_cluster_node [offline]
  22. 修改节点名称rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]

java客户端

生产者

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author: haoming
 * @Date: 2020/2/23 4:03 下午
 * @Version 1.0
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("152.136.233.203");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();
        //4.通过channel发送数据
        for (int i = 0; i < 5; i++) {
            String msg = "hello RabbitMQ!";
            channel.basicPublish("","test001",null,msg.getBytes());
          	//如果未指定exchange,默认使用(AMQP default) exchange,这个exchange会根据routerKey找到同名队列
        }
        //5.关闭相关的连接
        channel.close();
        connection.close();

    }
}

消费者

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author: haoming
 * @Date: 2020/2/23 4:04 下午
 * @Version 1.0
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("152.136.233.203");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        //3.通过connection创建一个channel
        Channel channel = connection.createChannel();
        //4.声明(创建)一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName,true,false,false,null);
        //5.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6.设置channel
        channel.basicConsume(queueName,true,queueingConsumer);
        //7.获取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("msg="+msg);
            //Envelope envelope = delivery.getEnvelope();
        }
    }
}

交换机

作用

接收消息,并根据路由键转发消息到绑定的队列。

属性

  • Name 交换机名称
  • Type 交换机类型 direct topic fanout headers(以下图片参考https://www.cnblogs.com/stefan-liu/p/5315809.html)
    • direct(使用比较多)
      • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
      • Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouterKey必须完全匹配才能完全接收,否则该消息会被抛弃。
    • Topic
      • 所有发送到Topic Exchange的消息被转发到所有关心RouterKey中指定Topic的Queue上。
      • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic。
    • fanout
      • 不处理路由键,只需要简单的将队列绑定到交换机上。
      • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
      • 转发消息是最快的。
  • Durability 是否需要持久化,true未持久化。
  • Auto Delete 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange。
  • Internal 当前Exchange是否用于RabbitMQ内部使用,默认为false。
  • Arguments 扩展参数,用于扩展AMQP自制定化使用。

模糊匹配

  • 符号 “#” 匹配一个或多个词

    “log.#” 能够匹配到“log.info.oa”

  • 符号 “*” 匹配不多不少一个词

    “log.*” 只会匹配到 “log.erro”

binding

作用

Exchange 和 Exchang、Queue之间的连接关系。

Binding中可以包含RoutingKey或者参数。

消息队列

作用

存储消息数据

属性

  • Durability:是否持久化,Durable:是;Transient:否。
  • Auto delete:yes:表示当最后一个监听移除之后,该Queue会自动被删除。

Message

作用

服务器和应用程序之间传送的数据。

本质上就是一段数据,由Properties和Payload(Body)组成。

属性

  • Delivery mode 送达模式 :持久化非持久化

  • Headers 自定义属性

  • 其他属性:

    • content_type
    • content_encoding
    • priority 优先级
    • correlation_id 消息唯一id(消息幂等)
    • reply_to
    • expiration 消息到期时间
    • message_id 消息的id
    • timestamp
    • type
    • user_id
    • app_id
    • cluster_id
    Map<String, Object> headers = new HashMap<>();
    headers.put("my1", "111");
    headers.put("my2", "222");
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .deliveryMode(2)
      .contentEncoding("UTF-8")
      .expiration("10000")
      .headers(headers)
      .build();

Virtual host 虚拟主机

作用

虚拟地址,用于进行逻辑隔离,最上层的消息路由。

一个vhost里面可以有若干个exchange和queue。

同一个vhost里面不能有相同名称的exchange和queue。

RabbitMQ架构图

消息丢失

生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点成功接收
  • 发送端收到MQ节点确认应答
  • 完善的消息补偿机制

解决方案

  • 消息落库,对消息状态进行打标

    1. 业务数据入库,消息入库,消息状态为0
    2. 发送消息到MQ
    3. 消息确认回调
    4. 更新消息状态为1
    5. 定时任务拉取一段时间内状态一直为0的消息,进行重发
    6. 超过规定的重发次数仍然没有成功,更新消息状态为2,人工介入
  • 消息的延迟投递,做二次确认,回调检查(高并发)

    1. 业务数据入库
    2. 发送两条消息,其中一条正常发送(队列0),其中一条延迟发送(队列1)
    3. 消费者消费消息
    4. 消费者发送消费确认消息到MQ(队列2)
    5. callback服务消费确认消息(队列2),并将确认结果入库
    6. callback服务消费延迟发送的消息(队列1),确认此消息是否成功,如果未成功,通知生产者重发

重复消费

消费端-幂等性保障

消费端实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息

幂等性解决方案

  1. 唯一ID+指纹码机制 ,利用数据库主键去重
    • select count(1) from TABLE where id = 唯一id+指纹码
    • 好处:实现简单;坏处:高并发下有数据库写入的性能瓶颈
    • 解决方案:根据ID进行分库分表
  2. 利用Redis的原子性去实现
    • 需要考虑的问题
      • 是否要进行数据的落库,如果要落库的话,要解决的问题是数据库和缓存如何做到原子性
      • 如果不进行落库,都存储到缓存中,如何设置定时同步的策略

确认机制

Confirm确认消息

生产端代码
//指定我们的消息投递模式: 消息的确认模式 
channel.confirmSelect();
//添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
  @Override
  public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    System.err.println("-------no ack!-----------");
  }

  @Override
  public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    System.err.println("-------ack!-----------");
  }
});

Return消息

在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key找不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

配置项

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息

生产端代码
channel.addReturnListener(new ReturnListener() {
		@Override
		public void handleReturn(int replyCode, String replyText, String exchange,
				String routingKey, BasicProperties properties, byte[] body) throws IOException {
			
			System.err.println("---------handle  return----------");
			System.err.println("replyCode: " + replyCode);
			System.err.println("replyText: " + replyText);
			System.err.println("exchange: " + exchange);
			System.err.println("routingKey: " + routingKey);
			System.err.println("properties: " + properties);
			System.err.println("body: " + new String(body));
		}
	});
	//mandatory设置为true
	channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

消费端自定义监听

public class Consumer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("ip");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_consumer_exchange";
		String routingKey = "consumer.#";
		String queueName = "test_consumer_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		channel.basicConsume(queueName, true, new MyConsumer(channel));
	}
}

public class MyConsumer extends DefaultConsumer {
	public MyConsumer(Channel channel) {
		super(channel);
	}
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
	}
}

消费端限流

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息

autoACK 一定要设置成false.不自动签收。

//void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
//prefetchSize:0
//prefetchCount:告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该cousumer将block掉,直到有消息ack
//global:是否将上面设置应用于channel,就是上面的限制是channel级别还是consumer级别
//prefetchSize、global  RabbitMQ还没有实现
//1 限流方式  第一件事就是 autoAck设置为 false
//消费端
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));

public class MyConsumer extends DefaultConsumer {
	private Channel channel ;
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
    //手动ack
		channel.basicAck(envelope.getDeliveryTag(), false);
	}
}

消费端ack与重回队列

重回队列

对没有处理成功的消息,把消息重新传递给Broker

一般在实际的应用中,都会关闭重回队列,设置为false

public class MyConsumer extends DefaultConsumer {
	private Channel channel ;
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if((Integer)properties.getHeaders().get("num") == 0) {
      //nack,处理失败,重回队列
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		} else {
			channel.basicAck(envelope.getDeliveryTag(), false);
		}
	}
}

死信队列(DLX Dead-Letter-Exchange)

定义

  • 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另外一个Exchange,这个Exchange就是DLX
  • DLX也是一个正常的Exchange

消息变成死信的几种情况

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false
  • 消息TTL过期
  • 队列达到最大长度
public class Consumer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("ip");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		// 这就是一个普通的交换机 和 队列 以及路由
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		
		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");
		//这个agruments属性,要设置到声明队列上
		channel.queueDeclare(queueName, true, false, false, agruments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//要进行死信队列的声明:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");
		channel.basicConsume(queueName, true, new MyConsumer(channel));
	}
}