RabbitMQ基础用例汇总

组件

一、 Connection (连接)

RabbitMQ 的 socket 的长链接,它封装了 socket 协议相关部分逻辑

二、 Channel (信道)

建立在 Connection 连接之上的一种轻量级的连接,我们大部分的业务操作是在 Channel 这个接口中完成的,包括

  1. 交换机的声明 exchangeDeclare、
  2. 定义队列的声明 queueDeclare、
  3. 队列的绑定 queueBind、
  4. 发布消息 basicPublish、
  5. 消费消息 basicConsume 等。

如果把 Connection 比作一条光纤电缆的话,那么 Channel 信道就比作成光纤电缆中的其中一束光纤。一个 Connection 上可以创建任意数量的 Channel


Channel 中的方法

  • 声明交换器
channel.exchangeDeclare(
    exchange : "ExchangeName",//交换器的名字
    type: "direct",// 模式
    durable: false, // 持久化
    autoDelete: false, //自动删除
    arguments:null 参数
);
  • 声明队列
channel.queueDeclare(
    queue: 队列的名称,
    durable: flase //是否持久化,false: 队列在内存中,服务器挂掉后,队列就没了;true: 服务器重启后,队列将会重新生成。注意:只是队列持久化,不代表队列中的消息持久化!!!!
    exclusive: 是否专属,专属的范围针对的是连接,一个连接下面的多个信道是可见的。对于其他连接是不可见的。连接断开后,该队列会被删除。
    autoDelete: 是否自动删除,    
    arguments: 队列参数
);
  • 发送消息
channel.basicPublish(
    exchange: "name", //交换机名称
    routingKey: "key", //路由键
    basicProperties: null, //该条消息的配置
    body: Encoding.Default.GetBytes(msg) //消息字节数组
);
  • 接收消息
channel.BasicGet(
    queue: QueueName, //队列名称
    autoAck: true //是否自动确认
);
  • 消费者设置
channel.basicConsume(
    queue:QUEUE_NAME, 
    autoAck:true, //是否自动ack,如果不自动ack,需要使用   channel.ack、channel.nack、channel.basicReject
    consumer // 消费者
);
  • 设置只处理一条消息
channel.basicQos(
    0, // prefetchSize:0 
    1, prefetchCount:1 , 告诉 RabbitMQ, 不要同时给一个消费者推送多于 1 条消息,即一旦有 1 个消息还没有 ack (确认),则该消费者将 block 掉,直到有消息确认
    false //global:true\false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别
);
  • 自动应答
channel.basicAck(
    e.DeliveryTag, // deliveryTag : e.DeliveryTag, 该消息的标记,ulong 类型.
    false //multiple:是否批量.true: 将一次性确认所有小于 deliveryTag 的消息. 
);

三、 Exchange (交换器)

1. RoutingKey

用自己的话说,这个 RoutingKey 就是表示生产者在发送消息时,交换器需要根据传入的 key 去根据队列和交换器绑定时用的 bindingKey 去匹配,这个 RoutingKey 生产者告诉交换器根据什么去分发;

  • fanout 的 routingKey 失去作用,交换器会把所有的消息发送到绑定的队列中;

2. Exchange 的创建

  • direct direct 可以使用默认的 Exchange,不需要声明,但需要指定消息发送到那个队列,路由 key 就是队列的名称
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  • fanout

fanout 的 routing key 是无效的,会把消息发送到所有的绑定队列

channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  • topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

注意:使用 fanout 和 topic 需要先声明 交换器类型;需要将 exchange 个 queue 绑定;

四、 Queue (队列)

1. 队列的声明

channel.queueDeclare(
queue: QueueName, //队列名称
durable: false, //队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
exclusive: false, //队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除.
autoDelete: true, //如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)
arguments: null //队列的配置
);

2. 队列详细参数(arguments)

    1. 消息生存时间 TTL 消息的存活时间,过了这个时间就会根据配置,到死信队列中
    2. 队列生存时间 Exp
    3. 队列在指定的时间内没有被使用 (访问) 就会被删除
    4. 如果有消费者订阅,则不会被删除
    5. 队列最大消息数量 lim 队列可以容纳的消息的最大条数。超出后根据溢出策略进行处理
    6. 队列消息最大容量 limB 队列可以容纳的消息的最大字节数,超出后同消息最大数
    7. 溢出策略 Ovfl 队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息,默认是丢弃之前的消息
    8. 死信队列交换器 DXL (dead-letter-exchange) 死信交换机的名称,当队列中的消息的生存期到了,或者因长度限制被丢弃时,消息会被推送到 (绑定到) 这台交换机 (的队列中), 而不是直接丢掉.
    9. 私信队列 routingKey DLK (dead-letter-routing-key) 死信队列的 routingKey
    10. 优先级 Pri 设置该队列中的消息的优先级最大值。发布消息的时候,可以指定消息的优先级,优先级高的先被消费
  • 注意 一旦声明队列参数创建,不可以修改;只能删除重新创建了

3. binding key

简单的理解为,交换器需要根据 key 值来给我发送消息,根据 bindingKey 的规则去匹配 RoutingKey,匹配上了就发送到队列,仅对 direct 和 topic 模式有效;

五、消息 (measage)

channel.basicPublish(
       exchange: "test_exchange",
       routingKey: "",
       mandatory: false,
       basicProperties: null,
       body: Encoding.Default.GetBytes(msg)
);
  • exchange: 交换机名称
  • routingKey: 路由 key
  1. 若模式为 fanout 模式,就会忽略 key,所有绑定的队列都会收到消息
  2. 该值仅对 direct 和 topic 模式有效
  • mandatory: 消息处理
  1. 当为 true 时,exchange 根据模式和 routKey 无法找到 queue 时,将消息返回给生产者;
  2. 当为 false 时, broker 直接丢弃消息
  • basicProperties: 消息的基本属性
展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java