10年开发大牛用3000字带你彻底搞懂RabbitMQ!真的牛!

B站影视 2024-12-16 21:33 2

摘要:RabbitMQ是一款流行的开源消息队列系统。相比后起之秀Kafka,RabbitMQ在设计和实现上更显传统和精致。RabbitMQ构建在高并发、高可靠语言平台Erlang上,具有出色的性能及卓越的可靠性,这两个主要原因让RabbitMQ获得了广泛的应用。Ra

RabbitMQ是一款流行的开源消息队列系统。相比后起之秀Kafka,RabbitMQ在设计和实现上更显传统和精致。RabbitMQ构建在高并发、高可靠语言平台Erlang上,具有出色的性能及卓越的可靠性,这两个主要原因让RabbitMQ获得了广泛的应用。Rabbit最初应用于金融系统这种对性能和可靠性都非常严苛的场景,从这一事实我们就能略微感受到RabbitMQ的性能和可靠性是多么强大。

RabbitMQ严格遵守了AMQP标准,其实现的架构也是非常清晰的。按照AMQP标准定义,RabbitMQ实现了一系列的组件,其中对理解RabbitMQ整体架构非常重要的组件包括以下几种。

·Broker:RabbitMQ的服务节点,多个Broker能够组建为集群。

·Vhost:虚拟主机,是对Broker的逻辑划分,可以实现诸如资源隔离和用户权限隔离的功能.

·Exchange:消息交换器,用于将消息按照设定的规则路由到一个或多个队列.

·Queue:消息队列,用于暂存由Exchange投递的消息.

·Binding:绑定,相当于Exchange的路由表,将Exchange和Queue按照设定的路由规则绑定起来.

·Routin.Key:路由主键,Exchange在执行路由规则时使用的主键,是一个消息头.

·BindingKey:指定哪些RoutingKey会被路由到相应Exchange绑定的Queue中。

·Producer:消息生产者,指发送消息到Broker的客户端程序.

·Consumer:消息消费者,指从Broker读取消息的客户端程序。

·Connection:与RabbitMQ服务器的连接。·Channel:消息通道,构建于Connection上的通道,是与Exchange或Queue的连接,一个Connection上可以构建多个Channel。

图8-7展示了RabbitMQ的工作原理。当消息生产者往Broker发送消息时,先与Exchange之间建立Channel。当消息经由Channel被发送到Exchange后,再由Exchange根据Binding的规则和消息头包含的Routin.Key,将消息转发到相应的Queue。当消费者读取消息时,先建立起与Queue之间的Channel,然后消费者就可以通过Channel从Queue中读取消息了。

public class RabbitMQProducerExample {

private static final Logger logger =

LoggerFactory.getLogger(RabbitMQProducerExample.class);

public static void main(String args) throws Exception {

ConnectionFactory factory = new ConnectionFactory;

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("admin");

factory.setPassword("admin");

factory.setVirtualHost("/"); //从'http://127.0.0.1:15672/#/vhosts'查看vhost

名字

Connection connection = factory.newConnection;

Channel channel = connection.createChannel;

String exchangeName = "exchange001";

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,

false, false, new HashMap);

String queueName = "queue001";

channel.queueDeclare(queueName, true, false, false, null);

String routingKey = "routingkey001";

channel.queueBind(queueName, exchangeName, routingKey);

int samples = 1000000;

int productNumber = 5;

for (int i = 0; i

String productId = String.format("product_%d", RandomUtils.nextInt

(0, productNumber));

String event = JSONObject.toJSONString(new Event(productId,

System.currentTimeMillis));

channel.basicPublish(exchangeName, routingKey, null, event.getbytes

(Charsets.UTF_8));

logger.info(String.format("send event[%s]", event));

Tools.sleep(1000);

}

}

}

public class RabbitMQConsumerExample {

private static final Logger logger =

LoggerFactory.getLogger(RabbitMQProducerExample.class); public static void main(String args) throws Exception {

ConnectionFactory factory = new ConnectionFactory;

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("admin");

factory.setPassword("admin");

factory.setVirtualHost("/"); //从'http://127.0.0.1:15672/#/vhosts'查看vhost

名字

Connection connection = factory.newConnection;

Channel channel = connection.createChannel;

String exchangeName = "exchange001";

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,

false, false, new HashMap);

String queueName = "queue001";

channel.queueDeclare(queueName, true, false, false, null);

String routingKey = "routingkey001";

channel.queueBind(queueName, exchangeName, routingKey);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte body) throws IOException {

String event = new String(body, Charsets.UTF_8);

logger.info(String.format("receive exchange[%s], routingKey[%s],

event[%s]",

envelope.getExchange, envelope.getRoutingKey, event));

}

};

channel.basicConsume(queueName, true, consumer);

}

}

从消费者和生产者的代码可以看到,这两者都通过exchangeDeclare、queueDeclare和queueBind创建并绑定了交换器和队列。初看起来,这是一个很奇怪的做法,因为怎么同时在消费者和生产者两端创建同样的资源呢?其实这是有原因的,考虑到消费者和生产者是相互独立的程序,它们谁先上线连接到RabbitMQ是不可预先知道的。为了减少消费者和生产者之间的耦合,我们也不应该对谁先连接到RabbitMQ做出任何要求和假设,所以将创建资源的程序在消费者和生产者的代码中都做一次是有必要的,这种做法也是RabbitMQ官方推荐的做法。当然,在实际开发中具体怎么做还是看业务场景的,如果有专门的模块管理消息路由器和队列,那么在消费者和生产者两端都不需要创建这些资源了。

在8.2节中,我们将Kafka在实时流计算系统中的作用定位为数据总线。既然已经有Kafka这么出色的数据总线了,那么RabbitMQ在实时流计算系统中又承担什么角色呢?

首先不能否认的是,虽然和Kafka比较起来,RabbitMQ的性能少了一个数量级,但RabbitMQ本身也是一个性能不错的消息中间件,所以在一些性能要求相对不是非常高的场景下,使用RabbitMQ做数据总线也并无不妥,毕竟RabbitMQ最初的应用场景就是金融系统领域。

在大数据领域,已经有诸如Kafka这样的高性能数据总线后,用RabbitMQ充当数据总线会略显性能不足。但是RabbitMQ遵循定义良好的AMQP实现,具有高度的严谨性,数据丢失概率更低,路由灵活,支持事务,也有更好的实时性。这些特性让RabbitMQ非常适用于配置系统,充当配置系统中配置总线的角色。

下面我们就以RabbitMQ在Spring Cloud Config中的应用为例来看看它是如何充当Spring Cloud微服务系统的配置总线的。

在图8-8中,当向Config Server发送/bus/refresh请求时,ConfigServer就通过RabbitMQ总线,将刷新配置的命令发布到每一个微服务实例上去。当各个微服务实例收到这条消息后,就会从Config Server重新获取配置,并刷新本地配置。如此就完成了微服务系统动态更新配置的过程。

来源:大数据架构师

相关推荐