Agenda文档与源码学习

Agenda定时任务学习

Agenda配置

  • database(url, [collectionName]):指定数据库URL,如果没有给出集合名字,则默认使用“agendaJobs”
  • mongo(dbInstance):使用一个已存在的mongoDB本地客户端实例
  • name(name):用来设置“lastModifiedBy”字段内容,如果使用多进程任务,并且想看是哪个任务队列最后一次执行这个任务,该字段会很有用。
  • processEvery(interval):用来指定Agenda查询数据库的频率,看哪些任务需要执行。Agenda内部使用“setTimeout”字段保证误差在3ms左右。降低频率将导致更少的数据库查询,但更多的作业存储在内存中。另外值得注意的是,如果作业队列关闭,存储在内存中的任何尚未运行的作业都将被锁定,这意味着您可能必须等待锁定过期。默认值是'5秒'。
  • maxConcurrency(number):最大任务同时执行数,默认值20.
  • defaultConcurrency(number):特定任务在某时刻同时执行数,默认值5.
  • lockLimit(number):同时最多可以被锁住的任务数量,默认值为0,表示无限制
  • defaultLockLimit(number):特定任务在某时刻默认上锁的数量,默认值为0,表示无限制
  • defaultLockLifetime(number):指定默认锁定时长,默认值10分钟,任务当在默认锁定时长之前完成的话会被解锁,这个锁对任务崩溃或超时会很有用。
  • sort(query):指定查找并锁定下一个任务的查找顺序,默认值{ nextRunAt: 1, priority: -1 }

一些问题

任务执行的顺序是什么?
任务默认遵循带优先级的先进先出执行顺序。不过也修改sort参数来改变执行规则。

lockLimit和maxConcurrency的区别是什么?
agenda会将任务一个接一个的上锁,并在mongodb数据库中设置lockedAt参数,并创建Job类实例缓存到_lockedJobs数组中,这个过程默认没有限制,但可以使用lockLimit进行限制,如果所有任务都需要被执行,则agenda会尝试上锁所有任务,并将任务从_lockedJobs数组中发到_runningJobs数组中,这些任务运行后会执行用户的代码,这可以通过maxConcurrency参数进行限制。
但如果许多任务在同一时间执行,你或许会发现他们甚至没有被加载进来,因为他们会尽可能多地给任务上锁,尽管没有足够的并发来处理他们。里有调整lockLimit和maxConcurrency参数来解决这个问题。

源码

保存任务

agenda/save-job.js
如果任务已经存在则更新数据库中的信息,如果不存在则插入一条记录。(其他无关代码省略)

module.exports = async function(job) {
  try {
    // 保存任务信息,并设置最后修改者为当前agenda队列进程
    const props = job.toJSON();
    props.lastModifiedBy = this._name;
    // 获取当前时间并设置默认查询选项
    const now = new Date();
    const protect = {};
    let update = {$set: props};
    // 如果当前任务已经有id,则修改相关属性,例如谁最后一次修改的它,并返回
    if (id) {
      const result = await this._collection.findOneAndUpdate(
        {_id: id},
        update,
        {returnOriginal: false}
      );
      return processDbResult(job, result);
    }
    // 否则,直接插入一条
    const result = await this._collection.insertOne(props);
    return processDbResult(job, result);
  } catch (error) {
    throw error;
  }
};
//一个判断任务是否应该立即执行的方法
const processDbResult = (job, result) => {
  let res = result.ops ? result.ops : result.value;
  if (res) {
    // 如果是个数组,则只取第一个
    if (Array.isArray(res)) {
      res = res[0];
    }
    job.attrs.nextRunAt = res.nextRunAt;
    // 如果当前时间大于任务下一次应该执行的时间,则立即执行任务
    if (job.attrs.nextRunAt && job.attrs.nextRunAt < this._nextScanAt) {
      processJobs.call(this, job);
    }
  }

  // Return the Job instance
  return job;
};

查找任务

agenda/find-and-lock-next-job.js

  • JOB_PROCESS_WHERE_QUERY:查找要运行的任务,条件:(name=任务名字,disabled字段不为true)&&((lockedAt字段不为null,nextRunAt(下次运行时间)字段小于等于_nextScanAt下次扫描时间(当前时间+扫描间隔时间)||(lockedAt字段小于等于lockDeadline锁过期时间(当前时间-锁的时长))
  • JOB_PROCESS_SET_QUERY:给任务上锁,设置lockedAt字段为当前时间。
  • JOB_RETURN_QUERY:设置返回内容,返回更新后的文档并排序。
const JOB_PROCESS_WHERE_QUERY = {
  $and: [{
    name: jobName,
    disabled: {$ne: true}
  }, {
    $or: [{
      lockedAt: {$eq: null},
      nextRunAt: {$lte: this._nextScanAt}
    }, {
      lockedAt: {$lte: lockDeadline}
    }]
  }]
};
const JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}};
const JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort};
const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY);
上一篇:shell 脚本实战 四


下一篇:自定义org agend view时org-agenda-skip-function变量的小结