Whoosy's Blog

藏巧于拙 用晦而明 寓清于浊 以屈为伸

0%

基于celery + rabbitmq实现订阅发布设计模式

rabbitmq四种交换机介绍

RabbitMQ作为一个消息队列提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。
消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。
RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。
相反,生产者只能发送消息给交换机,交换机是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。交换机必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过交换机的类型进行定义。

有四种不同的交换机类型:

  • 直连交换机:Direct exchange

  • 扇形交换机:Fanout exchange

  • 主题交换机:Topic exchange

  • 首部交换机: hearers exchange

    扇形交换机

    扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
    node -v

    直连交换机

    直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。
    这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
    node -v

​ 适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理 高优先级的队列。

主题交换机

​ 直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。

​ 所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

​ 主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:

  • “*” 表示一个单词

    • “#” 表示任意数量(零个或多个)单词

假设有一条消息的routing_keyfast.rabbit.white,那么带有这样binding_key的几个队列都会接收这条消息:

1. fast..
2. ..white
3. fast.#
4. ......

下面这张图对主题交换机描述非常准确:

node -v

当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息。

首部交换机

​ 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。

绑定交换机队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

订阅发布模式Publish/Subscribe

我们之前用的队列都是一个消息只能被一个消费者消费,那么如果我想发一个消息能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型。

类似微信订阅号发布文章消息就可以广播给所有的接受者。(订阅者)

发布者通过exchange(交换机)发送消息 ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机根据规则把消息发送到相应的队列 , 消费者才能获取队列的消息。

node -v

解读:
1. 1个生产者,多个消费者
2. 每一个消费者都有自己的一个队列
3. 生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4. 每个队列都要绑定到交换机
5. 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

基于celery+rabbitmq实现订阅发布模型

扇形交换机实现方式

​ 在celery中,扇形交换机被定义为广播路由的形式进行分发消息, 下面两张图诠释了rabbitmq本身的扇形交换机工作方式与celery广播路由的工作方式。

node -v

node -v

在RabbitMq中,你可以绑定多个队列到同一个扇形交换机来实现消息广播。

而celery使用rabbitmq做broker时,自己又实现的一套广播机制:通过一个创建一个广播Queue,来广播消息至订阅此队列的所有消费者。

实现具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from celery import Celery
from kombu.common import Broadcast

app = Celery(__name__, broker="amqp://admin:ODWB6IaRqI@192.168.199.142:5672/pubsub_stage")

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'my_task': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}

app.send_task("upunit", queue='broadcast_tasks', kwargs={"unit_id": 1})

追踪celery内部实现发现:

​ celery通过广播queue,又对订阅此队列的不同worker创建不同的队列(以当前广播队列名称为前缀,拼接uuid为整个子队列名称, 如下图),这样就实现了每个worker对应不同队列,而这些队列又与广播队列有关联,所以当有消息时会首先传播至广播队列,然后分发给所有与其关联的队列进行消息传递。

node -v

主题交换机实现方式

​ 交换机中topic就是传说中的基于主题形式发布任务。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似binding key与routing key相匹配的Queue中。可以看出和上面那个区别的地方,这里面不是强匹配。它引入了两个通配符#*前者匹配多个单词(可以为0),后者匹配一个单词。

​ 体现在celery中就是如下配置:​

1
2
3
4
5
6
7
8
9
10
CELERY_QUEUES = (
Queue('for_adds',Exchange('for_adds',type='topic'), routing_key='*.task.*'),
Queue('for_send_emails', Exchange('for_adds',type='topic'), routing_key='*.*.email'),
Queue('add', Exchange('for_adds',type='topic'), routing_key='*.add'),
)
CELERY_ROUTES = {
'celery_test.tasks.add': {'exchange':'for_adds','routing_key':'q.task.email'},
'celery_test.tasks.send_mail': {'exchange':'for_adds','routing_key':'a.task.e'},
'celery_test.tasks.adds': {'exchange':'for_adds','routing_key':'b.add'},
}

当消息名称的routing_key为q.task.email, 会被主题交换机接受,并且匹配到for_adds队列,这样只有此队列接受到相应的任务,而其他对应不接收此任务。