Whoosy's Blog

藏巧于拙 用晦而明 寓清于浊 以屈为伸

0%

celery(四):celery中的动态任务路由

编码不易,转载请注意出处!

在上一篇博客文章中,我们研究了自定义队列和任务路由。我们如果想要让某个任务路由到某个队列,配置任务路由是必要的,此方法对于简单的设置非常有效,但对于需要将许多Celery任务路由到许多不同队列的应用程序和微服务而言,明显配置要复杂的多。

实现动态路由

步骤1:Celery task_routes配置

celery配置中的task_routes参数支持传入一个自定义类,这个类里面要实现route_for_task函数,此类官方文档上叫做路由器,动态路由的核心就在于此。

celery 配置
1
2
3
4
5
6
7
8
9
10
app = Celery(__name__)
app.conf.update({
'broker_url': os.environ['CELERY_BROKER_URL'],
'imports': (
'tasks',
),
'task_routes': ('task_router.TaskRouter',), # 此自定义类的路径
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
自定义类(路由器)

根据上面的task_routes值,我们需要在模块task_router.py中定义自定义的TaskRouter类。方法route_for_task传递任务名作为它的第一个参数,并且返回值(dict)一定要与手动任务路由设置的dict完全相同

1
2
3
4
5
6
7
class TaskRouter:
def route_for_task(self, task, *args, **kwargs):
if ':' not in task:
return {'queue': 'default'}

namespace, _ = task.split(':')
return {'queue': namespace}

我的想法是基于任务名称去做动态路由,而且我们的任务名称要遵循queue:taskname形式。

定义celery任务
1
2
3
4
5
6
7
@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')  # name参数定义了此任务的 任务名称:队列名称
def fetch_bitcoin_price_index(self, start_date, end_date):
...

@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
...
启动命令
1
2
~$: celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
~$: celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

这样,当celery worker启动时,根据imports参数找到导入的相关任务。加载路由配置task_routes,根据任务名称feedsfilters返回队列名称fetch_bitcoin_price_indexcalculate_moving_average,实现任务动态路由到相关队列。

总结

在此博客文章中,我向您介绍了如何配置Celery以使用自定义任务路由器路由任务。当在多个队列和worker中使用许多任务时,此解决方案可以很好地进行扩展。