OneFlow: 启动 Session

前言

前面在初始化 Session 的时候,通过 CurJobAddOp 将 Op 加入到计算图当中,实际上只是将 Op 加入到 Job 里面,而 Job 只是一个 Protobuf Message 罢了。如果用户定义了多个 Job,那么这些 Job 就会构成一个 JobSet。用户将算子添加完之后,就会调用 Complete 对计算图 (其实就是 Job) 进行优化改写。接下来就是启动 Session,启动 Session 的时候进行了什么重要的事情呢?这篇文章就来分析一下。

  • 结论:启动 Session 的时候将逻辑上的 Job 编译为物理上的 Plan,启动 Runtime 去执行 Plan。

流程回顾

在 Session 初始化的时候,我们可以看到先调用了 InitLazyGlobalSession,然后调用 compiler.Compile 将 Op 逐个加入计算图,接下来就是启动 Session,调用 StartLazyGlobalSession。StartLazyGlobalSession 背后做了什么操作呢?

# python/oneflow/compatible/single_client/framework/session_util.py: 183
def Init(self):
    assert self.status_ is SessionStatus.OPEN
    self.status_ = SessionStatus.RUNNING
    if not oneflow._oneflow_internal.IsEnvInited():
        flow.env.init()
    _TryCompleteConfigProto(self.config_proto)
    self.resource_ = self.config_proto.resource
    if not oneflow._oneflow_internal.EagerExecutionEnabled():
        c_api_util.InitLazyGlobalSession(self.config_proto)
        for (job_name, func_desc) in self.job_name2function_desc_.items():
            compiler.Compile(self, func_desc, self.config_proto)
            self.existed_module_names_ = set()
        self.job_name2var_name2var_blob_ = dict()
        assert len(self.job_name2function_desc_.items()) > 0
        oneflow._oneflow_internal.StartLazyGlobalSession()
        self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
        self.UpdateInfo4InterfaceOp()
        if not config_util.api_legacy_model_io_enabled():
            check_point_v2.Init()
    else:
        self.eager_config_proto_ctx_ = oneflow._oneflow_internal.LogicalConfigProtoContext(
            str(self.config_proto)
        )
    return self

StartLazyGlobalSession

回想一下,我们进入这个方法之前的状态,我们有一个 JobBuildAndInferCtxMgr,里面存有 JobSet。用户定义一个 Job,就 JobSet 就多一个 Job。这个 Job 的状态是什么样子的呢?这个 Job 是用户定义的 Job 函数转化过来的,并且经过了 CurJobBuildAndInferCtx_Complete 优化改写了。

StartLazyGlobalSession 启动 Session,背后做了什么呢?带着问题单步调试跟踪进去看一看。

  • 在 StartLazyGlobalSession 中获取 JobSet,从 JobBuildAndInferCtxMgr 直接拿到。这个细节很重要,JobSet 是承接上一个部分的线索。其实第二篇文章分析 Python 端构图的时候,没有深入 CurJobAddOp 去,因为里面涉及到了 SBP 的推导等。JobSet 是一个 Protobuf message,它的成员是可重复的 Job。JobSet 有 LazyJobBuildAndInferCtxMgr 进行管理,在打开一个 JobBuildAndInferCtx 的时候,会在 JobSet 中新增一个 Job,然后将 Job 传给 JobBuildAndInferCtx。
  • StartLazyGlobalSession 中最重要的操作是创建一个全局的 Oneflow 对象,然后使用 JobSet 去初始化这个 Oneflow 对象。JobSet 会这个过程中编译成 Plan,然后启动 Runtime。
// oneflow/api/python/session/session.h: 88
inline Maybe<void> StartLazyGlobalSession() {
  CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";
  CHECK_OR_RETURN(GlobalProcessCtx::IsThisProcessMaster());
  const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();
  if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
    TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);
  }
  if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }
  CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());
  Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);
  Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());
  Global<Oneflow>::New();
  JUST(Global<Oneflow>::Get()->Init(job_set));
  return Maybe<void>::Ok();
}

// oneflow/core/job/job_build_and_infer_ctx_mgr.h: 38
class JobBuildAndInferCtxMgr {
 public:
  // ...
  const JobSet& job_set() const { return job_set_; }
  // ...
}
  • 在 Oneflow 全局对象在初始化的过程中,调用 CompileJobsAndPushMergedPlan 将 Job 编译为 MergedPlan。如果不是 Master 节点,那么不会进行编译,会调用 PullPlan 从 Master 拉取 Plan。最后使用 Plan 初始化 Runtime。
// oneflow/core/job/oneflow.cpp: 1005
Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
  OF_PROFILER_RANGE_GUARD("Oneflow::Init");
  // Runtime
  OF_PROFILER_RANGE_PUSH("CompileJobsAndPushMergedPlan");
  JUST(CompileJobsAndPushMergedPlan(job_set.job()));
  OF_PROFILER_RANGE_POP();  // CompileJobsAndPushMergedPlan
  double start = GetCurTime();
  PullPlan("merged_plan", &plan_);
  LOG(INFO) << " PullPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.\n";
  if (GlobalProcessCtx::IsThisProcessMaster()) {
    runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_.job_confs()));
  }
  OF_PROFILER_RANGE_PUSH("new Runtime");
  if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
    LOG(ERROR) << "this is dry run, exiting";
    exit(0);
  }

  HashMap<std::string, Blob*> variable_op_name2eager_blob;
  runtime_.reset(new Runtime(plan_, variable_op_name2eager_blob));
  OF_PROFILER_RANGE_POP();  // new Runtime
  return Maybe<void>::Ok();
}

编译 Job 为 MergedPlan

  • CompileJobsAndPushMergedPlan 输入是 PbRfp,它是 Protobuf Repeated Field Ptr 的意思,可以认为这个函数的输入就是一个 Job 数组。这个方法,只由 Master 节点执行,即由 Master 生成 Plan。
// oneflow/core/job/oneflow.cpp: 985
Maybe<void> CompileJobsAndPushMergedPlan(const PbRpf<Job>& job_confs) {
  if (GlobalProcessCtx::IsThisProcessMaster()) {
    Plan plan;
    JUST(CompileJobsAndMergePlans(job_confs, plan));
    double start = GetCurTime();
    // push op_attribute_info
    OpAttributeInfo op_attribute_info;
    *op_attribute_info.mutable_job_id2op_attribute_ref_table() =
        plan.job_id2op_attribute_ref_table();
    Global<CtrlClient>::Get()->PushKV("op_attribute_info", op_attribute_info);
    // push plan
    PushPlan("merged_plan", std::move(plan));
    LOG(INFO) << " PushPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.\n";
  }
  OF_SESSION_BARRIER();
  return Maybe<void>::Ok();
}
  • CompileJobsAndMergePlans 在上面的流程中,仅仅只是不断调用方法,实际上什么都还没干呢。下面这个方法,开始干活了。不过这篇文章暂时不深入细节,重要的是先理清楚流程。

下面的代码有点长,这个方法主要做的事情有:

  • 添加 Model IO Job
  • 添加 Push Job 和 Pull Job
  • CompileCurJobOnMaster 逐个编译 Job,MergeSubPlan 将 Job 合并
  • Job 之间的内存复用和内存共享
  • FinishGlobalCriticalSectionDesc 划分临界区
  • MainJob 的生成、编译、链接
// oneflow/core/job/oneflow.cpp: 912
Maybe<void> CompileJobsAndMergePlans(const PbRpf<Job>& job_confs, Plan& plan) {
  std::vector<std::shared_ptr<Job>> jobs(job_confs.size());
  FOR_RANGE(int, i, 0, jobs.size()) { jobs.at(i).reset(new Job(job_confs.Get(i))); }
  if (jobs.size() > 1) { CheckNonDistributeOptimizerAvailable(jobs); }
  HashMap<std::string, ParallelBlobConf> var_op_name2parallel_blob_conf;
  FilterOpName2ParallelBlobConf({OperatorConf::kVariableConf}, jobs,
                                &var_op_name2parallel_blob_conf);
  auto AppendJob = [&](Job* job) {
    JobDesc job_desc(job->job_conf(), jobs.size());
    CHECK(!job_desc.Bool("__is_user_function__"));
    jobs.emplace_back(new Job(*job));
  };

  if (Global<ResourceDesc, ForSession>::Get()->resource().enable_legacy_model_io()) {
    if (Global<ResourceDesc, ForSession>::Get()->resource().enable_model_io_v2()) {
      MakeModelIoV2Jobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
    } else {
      MakeModelIoJobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
    }
  }
  std::vector<std::shared_ptr<Job>> function_jobs;
  function_jobs.reserve(jobs.size());
  FOR_RANGE(int, i, 0, jobs.size()) {
    JobDesc job_desc(jobs.at(i)->job_conf(), i);
    if (job_desc.Bool("__is_user_function__")) { function_jobs.push_back(jobs.at(i)); }
  }
  HashMap<std::string, ParallelBlobConf> push_op_name2parallel_blob_conf;
  FilterOpName2ParallelBlobConf({OperatorConf::kInputConf}, function_jobs,
                                &push_op_name2parallel_blob_conf);
  HashMap<std::string, ParallelBlobConf> pull_op_name2parallel_blob_conf;
  FilterOpName2ParallelBlobConf({OperatorConf::kReturnConf}, function_jobs,
                                &pull_op_name2parallel_blob_conf);
  for (const auto& pair : push_op_name2parallel_blob_conf) {
    auto push_job = std::make_shared<Job>();
    MakePushJob(std::string("System-Push-") + pair.first, pair.first, pair.second, push_job.get());
    jobs.emplace_back(push_job);
  }
  for (const auto& pair : pull_op_name2parallel_blob_conf) {
    auto pull_job = std::make_shared<Job>();
    MakePullJob(std::string("System-Pull-") + pair.first, pair.first, pair.second, pull_job.get());
    jobs.emplace_back(pull_job);
  }

  std::vector<Plan> sub_plans(jobs.size());
  FOR_RANGE(int64_t, i, 0, jobs.size()) {
    AddJobName2JobId(jobs.at(i)->job_conf().job_name(), i);
    auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(i)->job_conf(), i);
    JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true));
  }
  MergeSubPlan(&plan, std::move(sub_plans));
  InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan);
  InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan);
  PlanUtil::SetForceInplaceMemBlock(&plan);
  FinishGlobalCriticalSectionDesc(plan, jobs.size());
  Plan main_plan;
  std::vector<std::map<int64_t, std::string>> identity_tick_op_names;
  {
    Job main_job;
    std::vector<ReentrantLockBackEdge> lock_back_edges;
    JUST(MakeMainJob(&main_job, &identity_tick_op_names, &lock_back_edges));
    AddJobName2JobId(main_job.job_conf().job_name(), jobs.size());
    JUST(CompileMainJob(&main_job, lock_back_edges, jobs.size(), &main_plan));
  }
  LinkMainPlan(&plan, std::move(main_plan), identity_tick_op_names);
  PlanUtil::CleanUselessMemBlockAndCheckValid(&plan);
  PlanUtil::DumpCtrlRegstInfoToPlan(&plan);
  if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
    TeePersistentLogStream::Create("merged_plan")->Write(plan);
    PlanUtil::ToDotFile(plan, "/dot/merged_plan.dot");
  }
  return Maybe<void>::Ok();
}

启动

编译完成之后,就可以启动 Runtime 了。

启动 Runtime 主要做几件事情:

  • 所有需要 Plan 的全局对象,调用 AddPlan 将 Plan 传给他们
  • 分解 Plan 的 Task,每个 task 一个 actor,根据 task 上的 job_id 信息,创建 actor 的大小
  • 构建 RuntimeCtx,调用 HandoutTasks 分发 task,并且发送 ActorCmd::kConstructActor 启动 Actor。
  • 向所有 source_tasks 发送 ActorCmd::kStart 启动 actor。
// oneflow/core/job/runtime.cpp: 60
Runtime::Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob) {
  {
    // NOTE(chengcheng): All runtime Global objects AddPlan
    Global<RegstMgr>::Get()->AddPlan(plan, variable_op_name2eager_blob);
    Global<ThreadMgr>::Get()->AddPlan(plan);
    Global<RuntimeJobDescs>::Get()->AddPlan(plan);
    collective_boxing_executor_plan_token_ =
        Global<boxing::collective::CollectiveBoxingExecutor>::Get()->AddPlan(plan);
  }
  std::vector<const TaskProto*> source_tasks;
  std::vector<const TaskProto*> other_tasks;
  int64_t this_machine_task_num = 0;
  for (const TaskProto& task : plan.task()) {
    if (task.machine_id() != GlobalProcessCtx::Rank()) { continue; }
    if (!HasNonCtrlConsumedRegstDescId(task)) {
      source_tasks.push_back(&task);
    } else {
      other_tasks.push_back(&task);
    }
    auto it = job_id2actor_size_.find(task.job_id());
    if (it == job_id2actor_size_.end()) {
      auto emplace_ret_pair = job_id2actor_size_.emplace(task.job_id(), 0);
      CHECK(emplace_ret_pair.second);
      it = emplace_ret_pair.first;
    }
    it->second++;
    this_machine_task_num++;
  }
  RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();
  runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);
  HandoutTasks(source_tasks);
  HandoutTasks(other_tasks);
  runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
  LOG(INFO) << "Actors on this machine constructed";
  OF_SESSION_BARRIER();
  LOG(INFO) << "Actors on every machine constructed";
  for (auto pair : job_id2actor_size_) {
    runtime_ctx->NewCounter(GetRunningActorCountKeyByJobId(pair.first), pair.second);
  }
  SendCmdMsg(source_tasks, ActorCmd::kStart);
}

// oneflow/core/job/runtime.cpp: 43
void HandoutTasks(const std::vector<const TaskProto*>& tasks) {
  for (const TaskProto* task : tasks) {
    Global<ThreadMgr>::Get()->GetThrd(task->thrd_id())->AddTask(*task);
  }
  SendCmdMsg(tasks, ActorCmd::kConstructActor);
}

总结

总结一下 StartLazyGlobalSession,在进入这个方法之前,已经有 JobSet 了,这个 JobSet 是经过 CurJobBuildAndInferCtx_Complete 优化改写了。接下来进入 StartLazyGlobalSession,它会添加更多的 Job 用于模型 IO,用于推送输入、拉取输出,编译连接成 MergedPlan。有了 MergedPlan 之后,就可以带着这个 Plan 启动运行时,启动 Actor。

OneFlow: 启动 Session

上一篇:设计模式之-抽象工厂


下一篇:Qto_CurtainWallQuantities