任务队列Celery

背景

天眼查爬虫,账号在爬取过程中偶尔会跳出验证码,由模型来做验证,但通过验证所占用的时间不算太短,所以考虑将该任务分发给其它进程来处理,任务队列刚好适用于该场景。

两种选择:

  • python-rq
  • celery

轻量的RQ会更适合这种相对简单轻松的任务,但由于当前开发环境在Windows上,而RQ的worker是fork模式,所以选择使用Celery,权当练习。

依赖

任务队列框架:Celery

消息队列(broker):RabbitMQ

结果存储(backend):Redis

安装环境

pip install celery==3.1.25

建议celery安装3.1.25,4.x版本在Windows系统下很不好用。我试了一下,最新的4.2.1版本的celery在使用rabbitmq作为broker时,启动报错提示无法立即完成一个非阻止性套接字操作,似乎没法解决。用redis作为broker可以正常启动,但接受任务时报错提示 not enough values to unpack (expected 3, got 0)。非要这么做的话把并发模式从默认的prefork改成eventlet协程。

pip install eventlet

并在celery启动命令后面加上 -P eventlet

pip install redis==2.10.6

如果安装3.1.25版本的celery,redis版本也必须降低。

然后下载RabbitMQ:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.10/rabbitmq-server-3.7.10.exe

安装RabbitMQ前还需要下载安装Erlang:http://erlang.org/download/otp_win64_21.2.exe

安装成功后,可以启用RabbitMQ可视化的管理插件:

rabbitmq-plugins enable rabbitmq_management
net stop RabbitMQ && net start RabbitMQ

然后访问 http://localhost:15672/ 即可。

rabbitmq有一个默认用户guest,我们可以选择创建一个新用户和虚拟机:

rabbitmqctl add_user myuser mypassword
rabbitmqctl add_vhost myvhost
rabbitmqctl set_user_tags myuser mytag
rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

然后celery的broker便写成 amqp://myuser:mypassword@127.0.0.1:5672/myvhost

简单使用

# main.py
app = Celery(__name__, include=['tasks'], broker='amqp://myuser:mypassword@127.0.0.1:5672/myvhost', backend='redis://127.0.0.1:6379/1')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
)

或者引入配置文件celeryconfig.py

app = Celery(__name__, include=['tasks'])
app.config_from_object('celeryconfig')

celery的任务创建可以有多种写法,比如最常见的:

# tasks.py
@app.task
def doing(t):
    time.sleep(t)
    return '1'

或者定义一个基类:

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        pass


@app.task(base=MyTask, bind=True)
def doing(self, t):
    time.sleep(t)
    return self.request.id

或者继承自celery.Task,直接将任务写进类的方法里:

class MyTask(Task):
    def __init__(self):
        pass

    def on_success(self, retval, task_id, args, kwargs):
        pass

    def run(self, t):
        time.sleep(t)
        return self.request.id


doing = app.tasks[MyTask.name]

这里有一点值得一提,第三种写法中,类MyTask在定义的同时,解释器会自动创建一个实例,并调用 __init__() 做初始化,也就是说定义完这个类,在没有使用它创建实例的情况下直接运行,就会执行 __init__() 中的代码。查看源码发现,父类Task是由一个元类TaskType创建而来,所以在定义MyTask的时候,是通过TaskType.__new__()来创建的,它的创建行为受父类的元类所控制。

然后启动worker:

celery -A tasks worker -l info

然后执行 my_task.delay() 将任务发送给rabbitmq,再交由worker处理。处理返回的结果会存储在redis里,其实该任务场景并不关心结果,可以去掉backend

另外可以用flower来监控任务执行情况:

  1. pip install flower
  2. 指定broker并启动:celery flower --broker=amqp://myuser:mypassword@127.0.0.1:5672/myvhost
  3. 访问 http://localhost:5555/

最后附上一份常用的配置文件:

BROKER_URL = 'amqp://myuser:mypassword@127.0.0.1:5672/myvhost'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 指定任务序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 指定结果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
# 任务过期时间,celery任务执行结果的超时时间
CELERY_TASK_RESULT_EXPIRES = 60 * 20
# 指定任务接受的序列化类型
CELERY_ACCEPT_CONTENT = ["json"]
# 任务发送完成是否需要确认,这一项对性能有一点影响
CELERY_ACKS_LATE = True
# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
CELERY_MESSAGE_COMPRESSION = 'zlib'
# 规定完成任务的时间
CELERYD_TASK_TIME_LIMIT = 5  # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
# celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
CELERYD_CONCURRENCY = 4
# celery worker 每次去rabbitmq预取任务的数量
CELERYD_PREFETCH_MULTIPLIER = 4
# 每个worker执行了多少任务就会死掉,默认是无限的
CELERYD_MAX_TASKS_PER_CHILD = 40
# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
CELERY_DEFAULT_QUEUE = "default"
# 设置详细的队列
CELERY_QUEUES = {
    "default": {  # 这是上面指定的默认队列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": {  # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": {  # 设置扇形交换机
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
}

参考:https://www.01hai.com/note/av125141

0

评论