Celery的使用(2)---从配置开始

admin 2021年1月21日12:50:16评论252 views字数 4303阅读14分20秒阅读模式
摘要

上文中我们将配置直接设置在了tasks.py中:这样的做法适合于小项目,如果想规范化的话还是建议单独存放在一个配置文件中。


前言

上文中我们将配置直接设置在了tasks.py中:

tasks.py from celery import Celery app = Celery('tasks', broker='redis://:password@host:port/database_num',backend='redis://:password@host:port/database_num') #第一个次数‘tasks’是当前模块的名字,第二个参数是broker代理的url,这里选用了redis

这样的做法适合于小项目,如果想规范化的话还是建议单独存放在一个配置文件中。

开始配置

配置文件化的好处是容易管理,可以增加复用率,不然每个tasks.py文件中你都写一遍冗长的redis、MQ的url,一方面麻烦,另一方面容易泄露一些敏感字段。

配置文件化

这时我们可以在与tasks.py 同级的目录下新建一个celeryconfig.py的文件,文件名并不是指定的,可以任意取,但是最好能一眼就能看出它的功能。
打开celeryconfig.py,我们开始将tasks.py中的broker和backend信息搬到配置文件中,接下介绍一下用到的两个配置,有些配置celery4.x和老版本有一些区别,有区别的在后面已注释:

broker的url
CELERY_BROKER_URL='redis://:password@host:port/database_num' ##4.x之前用这个 BROKER_URL = 'redis://:password@host:port/database_num' ##4.x之后用这个
接受结果的url
CELERY_RESULT_BACKEND = 'redis://:password@host:port/database_num'

此时,celeryconfig.py文件内容如下:

CELERY_BROKER_URL='redis://:password@host:port/database_num' CELERY_RESULT_BACKEND = 'redis://:password@host:port/database_num'

相应的我们将tasks.py中的内容也做一些调整:

from celery import Celery app = Celery("tasks") app.config_from_object('celeryconfig', namespace='CELERY')

可以看出,配置信息已经没有在task.py中出现,内容已减少,看着没那么乱了,创建celery实例也只用传递一个参数,与此同时,我们引入了一个新的函数:

app.config_from_object(self, obj,silent=False, force=False, namespace=None) #obj:即为我们配置文件或者是一个带有配置的对象; #silent:为True时导入发生错误会被忽略; #force:为True时回强制立刻读取配置文件,默认只有需要的时候回被调用; #namespace:命名;

这个函数可以使我们很容易的就读取配置文件。配置文件所在目录不同导入方式也可以不同:

app.config_from_object('func_app.celeryconfig') #func_app为celeryconfig所在目录名称

这种情况适用于tasks.py与不在一个目录下,如果都在一个目录下则直接引入即可,不必加前面的func_app,还有一种方式:

##这里假设celeryconfig 在func_app 下且tasks.py不在func_app 下。 from func_app import celeryconfig  app.config_from_object(celeryconfig)

需要注意的是,调用config_from_object函数将重置前面的所有配置,你如果想添加额外的设置则需要在这个函数调用后添加。

前面说到config_from_object中的obj可以为配置对象,这里举一个小例,如果你确实需要这么做那么这个例子可以作为一个参考:

from celery import Celery  app = Celery("tasks")  class Config:     enable_utc = True#启动时区     timezone = 'Europe/London' #设置时区  app.config_from_object(Config)#Config为类,上面我们用的是配置文件,这里使用类

从环境变量中读取配置

还可以从环境变量中读取配置,这里使用到了下面这个函数:

 app.config_from_envvar()
import os from celery import Celery  #: Set default configuration module name os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')  app = Celery() app.config_from_envvar('CELERY_CONFIG_MODULE')

作为一个了解吧,很少会用到这种方式,大多数应该还是采用了文件或者类的方式。

检查配置

如果你想把配置打印出来作为调试信息,同时还希望能屏蔽一些敏感的信息,可以使用Celery提供的这几个API:
humanize()函数:

app.conf.humanize(with_defaults=False, censored=True)

这个方法默认情况下只会以字符串返回你配置的内容,但是你可以通过设置with_defaults来返回所有默认配置,以下是执行后返回的内容。

>>>app.conf.humanize(with_defaults=False, censored=True) CELERY_BROKER_URL: 'redis://:********@127.0.0.1:6379/1' CELERY_RESULT_BACKEND: 'redis://:********@127.0.0.1:6379/2'

可以看出它把用户名和密码字段加密了。

如果想以字典形式返回配置信息请使用:
table()函数

>>>app.conf.table(with_defaults=False, censored=True) {'CELERY_BROKER_URL': 'redis://:********@127.0.0.1:6379/1', 'CELERY_RESULT_BACKEND': 'redis://:********@127.0.0.1:6379/2'}

注意,celery并不能屏蔽所有敏感字段,它也只是依照规则使用正则表达式进行识别,如果你想为你的一些自定义特殊字段进行加密,应该使用celery规定的命名方式,如果你自定义配置里包含这些字符串则会被加密:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

其他常用配置

并发的worker数量,也是命令行-c指定的数目
事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了

CELERYD_CONCURRENCY = 20

celery worker每次去redis取任务的数量,默认值就是4

CELERYD_PREFETCH_MULTIPLIER = 4

每个worker执行了多少次任务后就会死掉,建议数量大一些

CELERYD_MAX_TASKS_PER_CHILD = 200

celery任务执行结果的超时时间

CELERY_TASK_RESULT_EXPIRES = 1200

单个任务的运行时间限制,否则会被杀死

CELERYD_TASK_TIME_LIMIT = 60

使用redis存储任务执行结果,默认不使用

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

将任务结果使用’pickle’序列化成’json’格式
任务序列化方式

CELERY_TASK_SERIALIZER = 'pickle'

任务执行结果序列化方式

CELERY_RESULT_SERIALIZER = 'json'

也可以直接在Celery对象中设置序列化方式

app = Celery('tasks', broker='...', task_serializer='yaml')

指定任务序列化方式

CELERY_TASK_SERIALIZER = 'msgpack'

指定结果序列化方式

CELERY_RESULT_SERIALIZER = 'msgpack'

指定任务接受的序列化类型.

CELERY_ACCEPT_CONTENT = ['msgpack']

任务过期时间,celery任务执行结果的超时时间

CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60

任务发送完成是否需要确认,对性能会稍有影响

CELERY_ACKS_LATE = True

压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据

CELERY_MESSAGE_COMPRESSION = 'zlib'

规定完成任务的时间
在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程

CELERYD_TASK_TIME_LIMIT = 5

celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目

CELERYD_CONCURRENCY = 4

celery worker 每次去BROKER中预取任务的数量

CELERYD_PREFETCH_MULTIPLIER = 4

每个worker执行了多少任务就会死掉,默认是无限的

CELERYD_MAX_TASKS_PER_CHILD = 40

设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中

CELERY_DEFAULT_QUEUE = "default"

设置时区

CELERY_TIMEZONE = 'Asia/Shanghai'

启动时区设置

CELERY_ENABLE_UTC = True

限制任务的执行频率
下面这个就是限制tasks模块下的add函数,每秒钟只能执行10次

CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}

或者限制所有的任务的刷新频率

CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}

结束

以上就是配置相关的内容,更多资料请移步从今天开始种树

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2021年1月21日12:50:16
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Celery的使用(2)---从配置开始http://cn-sec.com/archives/248362.html

发表评论

匿名网友 填写信息