Whoosy's Blog

藏巧于拙 用晦而明 寓清于浊 以屈为伸

0%

celery(三):简单的任务路由配置

编码不易,转载请注意出处!

默认情况下,Celery将所有任务路由到单个队列(celery)中,并且所有worker都监听此默认队列。 合理使用Celery队列,您就可以自定义控制哪些Celery worker 处理哪些任务。 如果您有慢任务和快任务,并且希望慢任务不干扰快任务,这会很有用。 或者,如果您需要将任务从一个微服务发送到另一种微服务(订阅发布)。

配置一个task_routes

可以为每个任务配置队列,那么将会把此任务发送到相应的队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
app = Celery(__name__)
app.conf.update({
'broker_url': os.environ['CELERY_BROKER_URL'],
'imports': (
'tasks',
),
'task_routes': {
'fetch_bitcoin_price_index': {'queue': 'feeds'},
'calculate_moving_average': {'queue': 'filters'}
},
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})

任务fetch_bitcoin_price_index就会被路由到feeds队列中,任务calculate_moving_average 将会发送到filters队列中。

celery worker订阅某些队列

启动一个celery worker,并且使其监听feeds,filters队列

1
~$: celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds,filters

--queues参数指定监听队列,请注意队列之间要以”,”隔开。此worker启动后,就能处理feeds和filters队列里面的所有任务。

总结

在此篇文章中,我向您简单介绍了如何配置celery以将任务路由到专用队列中,以及如何使celery worker订阅某些队列。
为此,您需要将每个任务都订阅相应的task_routes,这种设置方式仅适用于简单程序的任务设置。然而,对于许多具有复杂的celery任务的应用程序或微服务/Docker环境(其中多个服务通过一个消息代理进行通讯),它的伸缩性不是很好。
在之后的文章中,我将以此为您介绍有关动态任务路由的所有信息,并为您介绍基于celery-rabbitmq实现多个服务之间的订阅发布机制,这是一种可扩展的解决方案。