很早就听说celeryfabric这两个神器,最近同时在做两个自己的小项目,更新比较频繁,一遍一遍手动部署太过僵硬,所以才真正用到了这俩好东西。

celery

在用到celery之前,如果有定时任务这样的需求,我一般都是直接写crontab,但是如果单个任务时间消耗较长的话,使用celery就可以让长时间消耗的任务异步执行,避免程序主线程的阻塞。另外一点,celery结合flower这个celery监控工具,能够让你看到自己的任务执行状况。

celery作为一个分布式任务调度模块,它拥有独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务,是否有要处理的任务则取决于中间人(Broker)在客户端和职程间斡旋。Broker从客户端向队列添加消息,之后Broker把消息派送给Worker。 在真正使用的时候,也不会有多少难度。

  1. 安装celery
1
pip install celery
  1. 选择一个中间人,我直接用redis,比较方便
  2. 新建celery app,douyu_app.py文件
1
2
3
4
5
6
7
8
9
from celery import Celery
import celery_douyu_config	# celery配置


app = Celery('douyuapp', include=['task_douyu'])	# task_douyu为celery任务
app.config_from_object('celery_douyu_config')

if __name__ == '__main__':
    app.start()
  1. 新建任务task_douyu.py文件
1
2
3
4
5
6
7
8
from douyu_app import app
from collector.danmu.CDouyu import Douyu


@app.task
def SAVE_DOUYU_DATA():
    douyu = Douyu()
    douyu.getRoomInfos()
  1. celery配置celery_douyu_config.py文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from celery.schedules import crontab

BROKER_URL = 'redis://127.0.0.1:6379/2'		# 任务发布队列
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/3'		# 任务执行结果存储

CELERY_TIMEZONE = 'Asia/Shanghai' 	# 时区选择

CELERYBEAT_SCHEDULE = {								
    'SAVE_DOUYU_ROOMINFO': { 	# 任务名称
        'task': 'task_douyu.SAVE_DOUYU_DATA', 	# 具体任务 task_douyu.SAVE_DOUYU_DATA 是一个真正要执行的任务
        'schedule': crontab(minute=[40]), 	# 任务计划时间,每小时40分的时候执行
    }
}
  1. 运行
1
celery -A douyu_app worker -B --loglevel=info

这里的-B主要用于celery定时任务,通过heartbeat来通知Worker是否有任务需要执行。

我这里只是简单的用了一下celery,并且只用到了定时任务,celery作为一款分布式任务调度工具,肯定会有更多更厉害的用法,留待以后用到的时候再研究。

fabric

之所以用到fabric还是因为自己比较懒,前边说到最近在频繁更新两个小项目,这两个项目都有向外提供api数据服务的部分,所以每次修改代码,运行测试,提交代码,ssh到远程主机pull最新代码,杀死原先服务进程,nohup最新服务。这个链路还是比较长的,并且有个项目长期的爬取网络数据,然后存储在mongodb中,自己的低配云主机硬盘有限,需要不定期地将数据迁移出来,所以想到了fabric这个好东西,应该可以满足自己的需求,自动化完成上述整个任务链路。

  1. 安装
1
pip install fabric
  1. 在项目根目录创建fabfile.py(默认使用这个名字,如果想要自定义,在运行的时候fab -f xxx.py也是可以的)
  2. 配置远程主机 fabric是通过ssh的方式登录到远程主机的,可以通过用户名/密码的方式或者是通过密钥的形式,因为也就三台主机,也都是自己的机器,我选择使用密钥登陆。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 配置~/.ssh/config
Host host_name
	HostName ip or domain
	User user 
	IdentityFile ~/.ssh/id_rsa

# 在fabfile中增加相关配置
env.use_ssh_config = True
env.hosts = ['al', 'al1']  # 这里的名字就是config中的host_name
env.roledefs = {		   # 不同的主机要是有不同的任务,可以通过指定role来进行区分
    'douyu': ['al'],	   
    'other': ['al1'],
}
  1. 添加运行测试
1
2
3
4
5
6
@task
def run_test():
    with settings(warn_only=True):
        result = local('python tester.py', capture=True)
    if result.failed and not confirm("Tests failed. Continue anyway?"):
        abort("Aborting at user request.")
  1. 提交代码
1
2
3
4
5
@runs_once
@task
def pre_deploy():
    local('git add -A && git commit')
    local('git push origin master && git push tx master')
  1. 更新代码,发布新的服务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def deploy_code(cmd):
    code_dir = '~/git/Hotroom'
    with settings(warn_only=True):
        if run("ls {}".format(code_dir)).failed:
            run("git clone hhttps://github.com/love3forever/Hotroom.git {}".format(code_dir))
    with cd(code_dir):
        run("git pull")
        run("sudo pip install -r ./requirement.txt")
        with cd('./hotroom'):
            pids = run(
                "ps -ef | grep celery | grep -v grep | awk '{print $2}'")
            if pids:
                pid_list = pids.split('\r\n')
                for i in pid_list:
                    with settings(warn_only=True):
                        run('kill -9 %s' % i)
            run('pwd')
            run("(nohup celery -A {} worker -B --loglevel=error >& /dev/null < /dev/null &) && sleep 1".format(cmd))
            run('echo deployed')
  1. 备份还原数据库,清空数据库
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def dump(db):
    stamp = datetime.now().strftime('%y_%m_%d_%H_%M')
    cd_dir = '~/tars/dump/'
    dump_dir = '~/tars/dump/dump_{}'.format(stamp)
    dump_tar = 'dump_{}.tar.gz'.format(stamp)
    local_dir = '/Users/eclipse/Downloads/mongo34/dump/'
    local_tar_dir = '/Users/eclipse/Downloads/mongo34/dump/dump_{}'.format(
        stamp)
    local_dump_dir = '/Users/eclipse/Downloads/mongo34/dump/dump_{}.tar.gz'.format(
        stamp)
    with settings(warn_only=True):
        run('mkdir -p {}'.format(dump_dir))
        run('mongodump -d {} -c Roominfo -o {} --gzip'.format(db, dump_dir))
    with cd(cd_dir):
        run('ls')
        run('tar -zcvf {} {}'.format(dump_tar, 'dump_{}'.format(stamp)))
    with settings(warn_only=True):
        get('~/tars/dump/{}'.format(dump_tar), local_dir)
        run('echo current db:{}'.format(db))
        run('mongo')
        local('mkdir -p {}'.format(local_tar_dir))
        local('tar -zxvf {} -C {}'.format(local_dump_dir, local_dir))
        local('/Users/eclipse/Downloads/mongo34/bin/mongorestore --dir={} \
            --gzip'.format(local_tar_dir))

小结

目前都只用到了这两个库的一点点功能,相信在以后的工作中会让这俩库有更多的作用!