Fork me on GitHub

RabbitMQ系列三发布订阅模式

发布订阅模式:就是将消息发送给不同类型的消费者。做到发布一次,消费多个。比如用户注册手机和邮箱,注册完会向邮箱和手机发送注册结果,利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处理。这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者。

发布者(生产者)—>(交换机)—>两个队列—>两个消费者

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Sender
{
//定义交换机
private final static String EXCHANGE_NAME="testexchange";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机,类型是fanout(发布订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发布订阅模式,因为消息是先发到交换机中,而交换机没有保存功能,如果没有消费者,消费会丢失
channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes());
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Recver1
{
//定义交换机
private final static String EXCHANGE_NAME="testexchange";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testpubqueue1",false,false,false,null);
//把队列绑定到交换机上面
channel.queueBind("testpubqueue1",EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testpubqueue1",false,consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Recver2
{
//定义交换机
private final static String EXCHANGE_NAME="testexchange";

public static void main(String[] args) throws Exception
{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("testpubqueue2",false,false,false,null);
//把队列绑定到交换机上面
channel.queueBind("testpubqueue2",EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
System.out.println("消费者2:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("testpubqueue2",false,consumer);
}
}

运行结果

1
2
1、消费者1:发布订阅模式的消息
2、消费者2:发布订阅模式的消息

总结:

  1. 该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列。从运行结果中可以看到,两中类型的消费者(Email,Phone)都收到相同数量的消息。
  2. 该模式必须声明交换机,并且设置模式:channel.exchangeDeclare(EXCHANGE_NAME, “fanout”)  fanout指分发模式(将每一条消息都发送到与交换机绑定的队列。
  3. 队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);