编码不易,转载请注意出处!
在阅读本文章之前,您需要对python logging
模块了解,并清楚什么是root logger
Python logging模块可以自定义日志消息。比如,您可能希望将日志消息写入屏幕、文件和日志管理服务(如Papertrail)。
在这种情况下,你将要添加三个日志处理程序到你的应用程序的根日志中去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import logging
logger = logging.getLogger() formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s [%(lineno)d]')
sh = logging.StreamHandler() sh.setFormatter(formatter) logger.addHandler(sh)
fh = logging.FileHandler('logs.log') fh.setFormatter(formatter) logger.addHandler(fh)
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...')) slh.setFormatter(formatter) logger.addHandler(slh)
|
在Celery中添加自定义日志记录处理程序是有难度的,你必须阅读源码。 因为Celery文档对其并没有描述,而且StackOverflow的回答大部分都是错误的,各种各样的博客文章都互抄一篇。
在这篇文章中,我将向您展示三种替代策略来配置Celery日志记录器,并说明每种策略的工作方式。
为什么celery日志配置没有一个很规范的文档说明,原因并不是Celery,因为Python日志记录系统不支持Celery支持的所有并发设置:eventlet,greenlets,prefork(子处理),线程等。因此,我们要基于Celery特定去设置。
除了配置之外,Celery在celery.utils.log
中提供了一个特殊的get_task_logger
函数。这将返回一个从名为celery.task
的特殊日志记录器,该日志记录器会自动获取任务名称和唯一ID作为日志的一部分,该日志记录器是把任务作为记录单元。
Celery文档建议使用get_task_logger
,但是我推荐使用python标准的logging.getLogger(__name__)
方式来获取日志记录器,因为我们在使用Celery时或在web应用程序中调用celery task时,task的任务日志是作为celery日志中的一部分存在的,所以我们只需要记录celery的日志即可,而没有必要为每一个任务模块都设置一个日志记录器。
1.增强Celery root logger
我们需要设置全局记录器(而非任务记录器)后接受日志并发送到指定位置,用于增强日志记录配置。Celery提供一个after_setup_logger
信号,该信号在Celery中设置记录器后触发。
信号中定义的一些参数会传递到logger对象,我们可以自定义日志handler添加到该记录器。
请注意,此信号after_setup_logger
会设置我们定义的logger以及celery root logger,自定义logger会收集相关日志,并发送到指定位置,最后还会传送至root logger中。
如果我们想只配置自定义的logger,而不想root logger发生改变,那么请使用第二种策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| import os import logging from celery import Celery from celery.signals import after_setup_logger
for f in ['./broker/out', './broker/processed']: if not os.path.exists(f): os.makedirs(f)
logger = logging.getLogger(__name__)
app = Celery('app') app.conf.update({ 'broker_url': 'filesystem://', 'broker_transport_options': { 'data_folder_in': './broker/out', 'data_folder_out': './broker/out', 'data_folder_processed': './broker/processed' }})
@after_setup_logger.connect def setup_loggers(logger, *args, **kwargs): formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler('logs.log') fh.setFormatter(formatter) logger.addHandler(fh)
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...')) slh.setFormatter(formatter) logger.addHandler(slh)
@app.task() def add(x, y): result = x + y logger.info(f'Add: {x} + {y} = {result}') return result
if __name__ == '__main__': task = add.s(x=2, y=3).delay() print(f'Started task: {task}')
|
这样,我们自定义了日志记录器logger
, 使其有自己的handler和format,但是日志最后依然会被传送到celery root logger中。
2.只配置自定义的root logger
您可以通过使用setup_logging信号来只配置自己定义的logger,而不对root logger进行任何设置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import os import logging from celery import Celery from celery.signals import setup_logging
app = Celery('app') app.conf.update({ 'broker_url': 'filesystem://', 'broker_transport_options': { 'data_folder_in': './broker/out', 'data_folder_out': './broker/out', 'data_folder_processed': './broker/processed' }})
for f in ['./broker/out', './broker/processed']: if not os.path.exists(f): os.makedirs(f)
logger = logging.getLogger(__name__)
@setup_logging.connect def setup_loggers(*args, **kwargs): logger = logging.getLogger() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh = logging.StreamHandler() sh.setFormatter(formatter) logger.addHandler(sh)
fh = logging.FileHandler('logs.log') fh.setFormatter(formatter) logger.addHandler(fh)
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...')) slh.setFormatter(formatter) logger.addHandler(slh)
@app.task() def add(x, y): result = x + y logger.info(f'Add: {x} + {y} = {result}') return result
if __name__ == '__main__': task = add.s(x=2, y=3).delay() print(f'Started task: {task}')
|
3.覆盖celery root logger
最后一种解决方案是让Celery获取root logger并进行配置,之后覆盖它。
默认情况下,Celery会在root logger中删除所有先前配置的处理程序。如果要自定义日志记录处理并不想使celery存在root logger,则可以通过设置worker_hijack_root_logger = False来禁用此行为。这使您可以收回对root logger的控制权。
请谨慎使用此方法,因为您将完全负责Python日志记录与Celery设置兼容(eventlets, greenlets, prefork (subprocessing), threads etc)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import os import logging from celery import Celery from celery.signals import setup_logging
app = Celery('app') app.conf.update({ 'broker_url': 'filesystem://', 'broker_transport_options': { 'data_folder_in': './broker/out', 'data_folder_out': './broker/out', 'data_folder_processed': './broker/processed' }, 'worker_hijack_root_logger': False } )
for f in ['./broker/out', './broker/processed']: if not os.path.exists(f): os.makedirs(f)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger()
sh = logging.StreamHandler() sh.setFormatter(formatter) logger.addHandler(sh)
fh = logging.FileHandler('logs.log') fh.setFormatter(formatter) logger.addHandler(fh)
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...')) slh.setFormatter(formatter) logger.addHandler(slh)
logger = logging.getLogger(__name__)
@app.task() def add(x, y): result = x + y logger.info(f'Add: {x} + {y} = {result}') return result
if __name__ == '__main__': task = add.s(x=2, y=3).delay() print(f'Started task: {task}')
|
总结
在此博客文章中,我向您介绍了三种自定义Celery记录器的策略。从对Celery logger微调到不使用Celery root logger最后完全覆盖root logger。
我的建议是使用第二种处理方式,自定义我们的logger,不改变root logger。这样,相关日志我们可以自定义输出位置,并不影响celery本身的日志记录。