Fork me on GitHub

RabbitMQ系列二work模式

本例中,一个生产者、两个消费者;一个消息只能被一个消费者获取。

生产者:向队列中发送100条消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Sender
{
private final static String QUEUE = "testwork";//队列的名字

public static void main(String[] args) throws Exception
{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列 如果队列存在则什么都不做,如果不存在才创建
//参数1 队列的名字
//参数2 是否持久化队列,队列默认是在内存中的,如果 rabbitmq 重启就会丢失,如果true,则会保存在erlang自带的数据库中,重启后会重新读取
//参数3 是否排外,有两个作用,一当我们连接关闭后是否自动删除队列,二 是否私有当天前队列,如果私有了,其他通道b不可以访问当前队列,如果为true,一般是一个是一个队列只适用一个消费者的时候
//参数4 是否自动删除
//参数5 我们的一些其他参数
channel.queueDeclare(QUEUE,false,false,false,null);
//发送内容
for (int i = 0; i < 100; i++) {
channel.basicPublish("",QUEUE,null,("发送的消息"+i).getBytes());
}
//关闭连接
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Recver1
{
private final static String QUEUE = "testwork";//队列的名字

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);

channel.basicQos(1);//设置1 告诉服务器在我没有确认当前消息完成之前,不要给我发新的消息

DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
//当我们收到消息后调用
System.out.println("消费者1 收到的内容是:"+new String(body));
//确认
try {
Thread.sleep(10);//模拟耗时
}catch (Exception e)
{
e.printStackTrace();
}

channel.basicAck(envelope.getDeliveryTag(),false);//参数2 false为确认收到消息,true为拒接受消息
}
};
//注册消费者,参数2 手动确认,代表我们收到消息后需要手动告诉服务器,我收到消息了
channel.basicConsume(QUEUE,false,consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recver2
{
private final static String QUEUE = "testwork";//队列的名字

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE,false,false,false,null);

channel.basicQos(1);//设置1 告诉服务器在我没有确认当前消息完成之前,不要给我发新的消息

DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
//当我们收到消息后调用
System.out.println("消费者2 收到的内容是:"+new String(body));
try {
Thread.sleep(300);//模拟耗时
}catch (Exception e)
{
e.printStackTrace();
}
//确认
channel.basicAck(envelope.getDeliveryTag(),false);//参数2 false为确认收到消息,true为拒接受消息
}
};
//注册消费者,参数2 手动确认,代表我们收到消息后需要手动告诉服务器,我收到消息了
channel.basicConsume(QUEUE,false,consumer);
}
}

连接工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ConnectionUtil
{
public static Connection getConnection() throws Exception
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//设置server的地址
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/test");
return connectionFactory.newConnection();//创建一个新的连接

}
}

测试结果:消费者1中设置线程睡眠时间10,消费者2中设置进程睡眠时间300,测试结果

1
2
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。

但是,这样消费很不合理,因为消费者1线程睡眠时间比消费者2短,故应该比消费者2获取的消息多,RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

如何按能力分配消息呢

联合使用 Qos 和 Acknowledge 就可以做到。basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

1
channel.basicQos(1);//设置1 告诉服务器在我没有确认当前消息完成之前,不要给我发新的消息

测试结果

1
2
1、消费者1收到96条消息
2、消费者2收到4条消息