Fork me on GitHub

RabbitMQ系列四route路由模式

路由模式:这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。这个模式和发布订阅模式差不多,只不过在发布的时候添加routing key,用户如果拥有这个key才能消费信息。

举个日志处理例子:系统需要针对日志做分析,首先所有的日志级别的日志都需要保存,其次error日志级别的日志需要单独做处理。这时就可以使用路由模式来处理了,声明交换机使用路由模式,每个日志级别的日志对应一个路由(error,info,warning)。声明一个保存日志队列用于接受所有日志,绑定交换机并绑定所有路由。声明第二个队列用于处理error级别日志,绑定交换机且只绑定error路由。以下是代码讲解。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Sender
{
private final static String EXCHANGE_NAME="testroute";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//定义路由格式的交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.basicPublish(EXCHANGE_NAME,"key1",null,"路由消息key1".getBytes());
channel.basicPublish(EXCHANGE_NAME,"key2",null,"路由消息key2".getBytes());
channel.basicPublish(EXCHANGE_NAME,"key3",null,"路由消息key3".getBytes());
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Recver1
{
//定义交换机
private final static String EXCHANGE_NAME="testroute";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testroutequeue1",false,false,false,null);
//把队列绑定到交换机上面
//参数3 标记绑定到交换机的时候会指定一个标记,只有和它一样标记的消息才会被当前消费者收到
channel.queueBind("testroutequeue1",EXCHANGE_NAME,"key1");
//如果要消费多个消息,只需要再执行一次即可
channel.queueBind("testroutequeue1",EXCHANGE_NAME,"key2");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testroutequeue1",false,consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Recver2
{
//定义交换机
private final static String EXCHANGE_NAME="testroute";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testroutequeue2",false,false,false,null);
//把队列绑定到交换机上面
//参数3 标记绑定到交换机的时候会指定一个标记,只有和它一样标记的消息才会被当前消费者收到
channel.queueBind("testroutequeue2",EXCHANGE_NAME,"key1");
//如果要消费多个消息,只需要再执行一次即可
channel.queueBind("testroutequeue2",EXCHANGE_NAME,"key3");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者2:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testroutequeue2",false,consumer);
}
}

测试结果

1
2
3
4
5
6
1、
消费者1:路由消息key1
消费者1:路由消息key2
2、
消费者2:路由消息key1
消费者2:路由消息key3

总结:

  1. 两个队列消费者设置的路由不一样,接收到的消息就不一样。路由模式下,决定消息向队列推送的主要取决于路由,而不是交换机了。
  2. 该模式必须设置交换机,且声明路由模式:channel.exchangeDeclare(EXCHANGE_NAME, “direct”);