python定时任务模块-crontab

python定时任务模块-crontab

特点:使用linux系统自带的定时管理模块,简单配置,灵活使用,轻松上手,可以解决开发过程中的定时需求问题.

使用:

安装:pip install python-crontab
导入:from crontab import CronTab
实例化对象:cron = CronTab(user=True)
此时的crontab用户是当前用户,定时任务的增删改操作都是基于当前用户的.
新增定时任务:
              job = cron.new(command=command_line)
              # 设置任务执行周期
              job.setall(time_str)
              # 给任务添加一个标识,给任务设置comment,这样就可以根据comment查询
              job.set_comment(comment_name)
              # 将crontab写入配置文件
              cron.write_to_user() # 指定用户,写入指定用户下的crontab任务
删除定时任务:
            # 按照名字进行指定定时任务删除
            cron.remove_all(comment=comment_name)
完整例子:
          class Crontab_Update(object):

            def __init__(self):
                # 创建当前用户的crontab,当然也可以创建其他用户的,但得有足够权限
                self.cron = CronTab(user=True)

            def add_crontab_job(self, command_line, time_str, comment_name):
                # 创建任务
                job = self.cron.new(command=command_line)
                # 设置任务执行周期
                job.setall(time_str)
                # 给任务添加一个标识,给任务设置comment,这样就可以根据comment查询
                job.set_comment(comment_name)
                # 将crontab写入配置文件
                self.cron.write_to_user() # 指定用户,写入指定用户下的crontab任务

            def del_crontab_jobs(self, comment_name):
                # 根据comment查询,当时返回值是一个生成器对象,
                # 不能直接根据返回值判断任务是否存在,
                # 如果只是判断任务是否存在,可直接遍历my_user_cron.crons
                # 返回所有的定时任务,返回的是一个列表
                # 按comment清除定时任务
                # self.cron.remove_all(comment=comment_name)
                # 按comment清除多个定时任务,一次write即可
                self.cron.remove_all(comment=comment_name)
                self.cron.remove_all(comment=comment_name+ ' =')
                self.cron.write_to_user() # 指定用户,删除指定用户下的crontab任务

            def del_all_crontab_jobs(self):
                # 指定用户,删除指定用户下所有的crontab任务
                self.cron.remove_all()
                self.cron.write_to_user()


      def get_period(period):
          ret = None
          if period == '5min':
              ret = '*/5 * * * *'
          elif period == 'hour':
              ret = '0 * * * *'
          elif period == 'day':
              ret = '0 8 * * *'

          return ret

      def update_crontab(clu_nid, add_nid, period, del_nid):
          command_line = EXEC_ENV + EXEC_FILE + clu_nid

          # 创建一个实例
          crontab_update = Crontab_Update()
          # 调用函数新增一个crontab任务
          try:
              if del_nid == '*':
                  crontab_update.del_all_crontab_jobs()
              elif del_nid:
                  crontab_update.del_crontab_jobs(del_nid)
              if add_nid:
                  period_time = get_period(period)
                  if not period_time:
                      return None
                  crontab_update.add_crontab_job(command_line, period_time, add_nid)

          except Exception as err:
              logger.error(err)
上一篇:2021-04-28


下一篇:hbase simple权限一