我们知道对Openstack的各个组件(nova,cinder,neutron等)来说,跨组件交互时使用的是RestAPI相互调用公共接口,组件内部各个进程间通信时使用RPC消息通信,从而实现各组件、各进程之间的解耦。Openstack RPC(Remote Producer Call)机制基于AMQP(Advanced Message Queuing Protocol)协议,搭配各种消息服务器(RabbitMQ,Qpid等)实现各个组件内部进程间的消息传递。
AMQP
AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的限制,实现异步通信。
- RPC.call:发送请求到消息队列,等待返回最终结果。
- RPC.cast:发送请求到消息队列,不需要等待最终返回的结果。

图2.AMQP模型

- Publisher:消息发送者,将消息发送到 Exchange。
- Message:由Header和Body组成,Header是由Publisher添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受(Routing Key)、优先级是多少等。而Body是真正需要传输的数据,它是对Server不可见的二进制数据流,在传输过程中不受影响……
- Exchange:接收发布应用程序发送的消息,并根据Routing Key和Binding信息、Exchange Type等参数将这些消息路由到消息队列。
- Binding:关联Exchange和Message Queue,提供路由规则。
- Message Queue:存储消息,直到这些消息被消费者安全处理完为止。
- Consumer:消息接受者,从 Message Queue 获取消息。
使用这个模型我们可以很容易的模拟出存储转发队列和主题订阅这些典型的消息中间件概念。
AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。一个AMQP服务器类似于邮件服务器,Exchange类似于消息传输代理(email里的概念),Message Queue类似于邮箱。Binding定义了每一个传输代理中的消息路由表,发布者将消息发给特定的传输代理,然后传输代理将这些消息路由到邮箱中,消费者从这些邮箱中取出消息。
AMQP模型中不同类型的Exchange对应不同的routing算法:
- Direct Exchange:Point-to-Point 消息模式,Direct Exchange 根据 Routing Key 进行精确匹配,只有对应的 Message Queue 会接收到消息。
- Topic Exchange:Publish-Subscribe(Pub-sub)消息模式,Topic Exchange 根据 Routing Key 进行模式匹配,只要符合模式匹配的 Message Queue 都会收到消息。
- Fanout Exchange:广播消息模式,Fanout Exchange 将消息转发到所有绑定的 Message Queue。
Openstack RPC
Openstack RPC实现了AMQP协议中的请求应答和数据处理等中间流程,并提供了几种发送AMQP消息的方法:
- RPC.call:发送请求到消息队列,等待返回最终结果。
- RPC.cast:发送请求到消息队列,不需要等待最终返回的结果。
图3.Openstack中的RPC流程

Openstack每个组件都会连接消息服务器,一个组件可能是一个消息发送者Invoker(如API、Scheduler),也可能是一个消息接收者Worker(如compute、volume、network)。Invoker发送消息有两种方式:同步调用rpc.call和异步调用rpc.cast,Worker接受并根据rpc.call的信息返回消息。
- Topic Publisher:该对象在进行rpc.call或rpc.cast时创建,每个对象都会连接同一个topic类型的交换器,消息发送完毕后对象被回收。
- Direct Publisher:该对象在进行rpc.call调用时创建,用于向消息发送者返回响应。该对象会根据接收到的消息属性连接一个direct类型的交换器。
- Direct Consumer:该对象在进行rpc.call调用时创建,用于接收响应消息。每一个对象都会通过一个队列连接一个direct类型的交换器(队列和交换器以UUID命名)。
- Topic Consumer:该对象在内部服务初始化时创建,在服务过程中一直存在。用于从队列中接收消息,调用消息属性中指定的函数。该对象通过一个共享队列或一个私有队列连接一个topic类型的交换器。每一个内部服务都有两个topic consumer,一个用于rpc.cast调用(此时连接的是binding-key为“topic”的共享队列);另一个用于rpc.call调用(此时连接的是binding-key为“topic.host”的私有队列)。
- Topic Exchange:topic类型交换器,每一个消息代理节点只有一个topic类型的交换器。
- Direct Exchange:direct类型的交换器,存在于rpc.call调用过程中,对于每一个rpc.call的调用,都会产生该对象的一个实例。
- Queue Element:消息队列。可以共享也可以私有。routing-key为“topic”的队列会在相同类型的服务中共享(如多个compute节点共享一个routing-key为“topic”的队列)。
name:control_exchange的名称由各个服务conf文件里的“control_exchange”项指定,默认名称为openstack。
[DEFAULT] |
|
control_exchange =openstack |
(String) The default exchange under which topics are scoped. May be overridden by an exchange name specified in the transport_url option. |
RPC Calls
图4.rpc.call消息流程

- Topic Publisher向消息队列服务发送RPC请求,同时初始化一个Direct Consumer等待回复消息。
- Topic Exchange “control_exchange”根据routing key将消息分发到相应队列后,对应的Consumer接收消息并触发Worker任务。
- Worker任务完成后,初始化一个Direct Publisher向消息服务器发送一条回复消息。
- Direct Exchange将消息分派到对应的queue给等待的Direct Consumer接收处理。
RPC Casts
rpc.cast和call相比只是少了返回消息的部分,其余类似。
图4.rpc.cast消息流程
图4.rpc.cast消息流程

Notifications
Openstack中除了rpc.call和rpc.cast这两种用于组件内部各进程间消息处理的功能外,还可以发送相应资源的操作通知到notifications队列供计费和监控等组件使用。
Openstack中除了rpc.call和rpc.cast这两种用于组件内部各进程间消息处理的功能外,还可以发送相应资源的操作通知到notifications队列供计费和监控等组件使用。
要开启组件的notify功能,需要设置服务conf文件中的notify driver配置项,支持多种通知发送方式,messaging和messagingv2发送rpc消息通知到指定的“topics”,“driver”默认空不开启任何通知,“topics”不设定时仍使用control_exchange指定的topic。
[oslo_messaging_notifications] |
|
driver = [] |
(Multi-valued) The Drivers(s) to handle sending notifications. Possible values are messaging, messagingv2, routing, log, test, noop |
topics = notifications |
(List) AMQP topic used for OpenStack notifications. |
RPC.Notifier提供的通知消息有以下几种:
- audit:审计类消息
- info:正常操作消息
- warn:告警类操作消息
- error:错误类操作消息
- critical:严重错误
- sample:sample消息
以create_volume为例,配置rabbitmq及messagingv2开启通知时的流程如下:
图5.notify流程


- 服务初始化时启动的Consumer监听到队列消息,分析并触发Manager类的对应方法。
- Manager初始化并启动任务流flow。
- flow中的NotifyVolumeActionTask调用utils模块中的notify_about_xxx方法,该方法初始化oslo.messaging库中的Notifier类,并调用info方法。
- Notifier.info方法根据conf文件里的notify driver名(driver = messagingv2)和messaging driver配置(rpc_backend = rabbit)动态加载MessagingV2Driver类和RabbitDriver驱动(命名空间对应关系在项目代码文件setup.cfg中的entry points段指定)。
- RabbitDriver获取服务器连接并调用kombu库完成消息发送。
结束语
消息通讯机制是Openstack整个工作流程中非常重要的一部分,充分理解它的工作原理对认识和理解Openstack的设计理念,各组件间关系及组件内部处理过程等都有非常重要的意义。在实际使用中还可以结合具体环境搭配的不同消息服务器、命令行及可视化工具等进行概念对照以加深理解。
作者简介
窦锐元,烽火云计算高级软件工程师,五年Openstack相关开发设计工作经验,对Openstack整个生命周期的多种组织架构和部署方式有深刻理解,目前专注于存储特性设计开发和社区相关工作。