本例中,一个生产者、两个消费者;一个消息只能被一个消费者获取。
生产者:向队列中发送100条消息。
1 | public class Sender |
消费者1
1 | public class Recver1 |
消费者2
1 | public class Recver2 |
连接工具类
1 | public class ConnectionUtil |
测试结果:消费者1中设置线程睡眠时间10,消费者2中设置进程睡眠时间300,测试结果
1 | 1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。 |
但是,这样消费很不合理,因为消费者1线程睡眠时间比消费者2短,故应该比消费者2获取的消息多,RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。
如何按能力分配消息呢
联合使用 Qos 和 Acknowledge 就可以做到。basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。
1 | channel.basicQos(1);//设置1 告诉服务器在我没有确认当前消息完成之前,不要给我发新的消息 |
测试结果
1 | 1、消费者1收到96条消息 |