Fork me on GitHub

RabbitMQ系列五topic模式

topic通配符模式:topic模式也称为主题模式,其实他相对于routing模式最大的好处就是他多了一种匹配模式的路由,怎么理解匹配呢,其实就相当于我们之前正则的.*这种,不过他的匹配机制可能不是这种(其实除了匹配规则外,他的作用就和routing模式一样 ),而他的工作流程图如下:

绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:

*(星号)仅代表一个单词

#(井号)代表任意个单词

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Sender
{
private static final String EXCHANGE_NAME="testtopic";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"topic 模式消息".getBytes());
channel.basicPublish(EXCHANGE_NAME,"abc.1.2",null,"topic 模式消息".getBytes());
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Recver1
{
//定义交换机
private static final String EXCHANGE_NAME="testtopic";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testtopicqueue1",false,false,false,null);
//把队列绑定到交换机上面
//参数3 标记绑定到交换机的时候会指定一个标记,只有和它一样标记的消息才会被当前消费者收到
channel.queueBind("testtopicqueue1",EXCHANGE_NAME,"key.*");
//如果要消费多个消息,只需要再执行一次即可
channel.queueBind("testtopicqueue1",EXCHANGE_NAME,"abc.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testtopicqueue1",false,consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Recver2
{
//定义交换机
private static final String EXCHANGE_NAME="testtopic";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testtopicqueue2",false,false,false,null);
//把队列绑定到交换机上面
//参数3 标记绑定到交换机的时候会指定一个标记,只有和它一样标记的消息才会被当前消费者收到
channel.queueBind("testtopicqueue2",EXCHANGE_NAME,"key.#");
//如果要消费多个消息,只需要再执行一次即可
channel.queueBind("testtopicqueue2",EXCHANGE_NAME,"abc.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者2:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testtopicqueue2",false,consumer);
}
}

测试结果

1
2
3
4
5
1、
消费者1:topic 模式消息
2、
消费者2:topic 模式消息
消费者2:topic 模式消息