Whoosy's Blog

藏巧于拙 用晦而明 寓清于浊 以屈为伸

0%

celery(五):如何在应用程序中调用celery任务

编码不易,转载请注意出处!

task.s.delay()task.delay()

假定您的Celery任务和API源代码位于同一项目中。在这种情况下,您只需将任务导入API模块并异步调用即可。

1
2
3
4
5
6
7
8
9
from flask import Flask, jsonify
from tasks import fetch_data

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
fetch_data.s(url=request.json['url']).delay()
return jsonify({'url': request.json['url']}), 201

只有在Celery中注册了fetch_data任务,此方法才起作用。当Celery和API源代码不在同一项目中时,或者当您有多个Celery微服务并且需要从一个Celery微服务调用另一个Celery微服务中的任务时,您需要使用另外方式去调用任务。

app.send_task()

Celery应用程序实例具有send_task方法,可用于通过其任务名称调用此任务。

1
2
3
4
5
6
7
8
9
from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
celery_app.send_task('fetch_data', kwargs={'url': request.json['url']}, queue='fetch')
return jsonify({'url': request.json['url']}), 201

我们只需要传入任务名称,和任务需要的参数即可成功发送此任务,并由相应的worker进行处理。此方式可适用于当Celery和API源代码不在同一项目中。

app.signature()

在我们的第一个示例中,Celery任务中的.s(…)方法创建了一个称为Celery Signature对象。 Celery签名本质上封装了单个Celery任务调用的参数,关键字参数和执行选项。

因此,另一种策略是通过Celery应用程序的签名方法显式创建Signature对象,并传递任务名称及其参数。 然后,可以通过delay()方法以与标准示例中相同的方式异步执行生成的Signature对象。

1
2
3
4
5
6
7
8
9
from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
celery_app.signature('fetch_data', kwargs={'url': request.json['url']).delay()
return jsonify({'url': request.json['url']}), 201

总结

Celery不需要清楚任务的执行代码即可调用它。直接通过celery_app.send_task(…)或通过创建一个签名对象celery_app.signature(…)来调用其任务名称,该任务等同于调用task.s(…)。

您最终选择哪种方法会成为一个重要问题,我们将在另一篇博客文章中介绍。