OneFlow: 初始化环境

简介

暑期参加了开源之夏的活动:为 OneFlow 添加新的前端语言。功能已经基本做好了,不过我写的代码有点矬,所以读读 OneFlow 的源代码,看看别人是如何思考、如何设计的。在写这篇文章之前,反复阅读了 OneFlow 的研发工程师写的三篇文章[1],对整体有了一定的把握。在看这篇文章之前,强烈建议将 [1] 读个几遍。这篇文章引用了 OneFlow 内部大量源代码,直接贴上来,并且简单写写思考的笔记。这篇文章作为读源码系列的开篇,后续将会写大概 12 篇左右的源代码解读。你可以将这一系列的源代码解读文章看作 [1] 的展开,详细写写实现的细节,所以免不了贴上大量代码,毕竟代码就是一种很清晰清楚的表达形式。

初始化

import oneflow as flow 背后执行了一些操作:初始化物理环境,初始化默认的 Session,设置运行模式,注册结束时调用的函数。接着是 env 初始化,scope 初始化,session 初始化

从系统状态改变的角度来看,初始化对整个系统的状态做了哪些改变呢?在默认初始化阶段(import oneflow),对 OneFlow 有影响的初始化动作是,初始化默认的 Session。其实只做了两件事情,获取 id,然后用这个 id 去注册 session。之后进行的一系列 env 初始化,scope 初始化,session 初始化,将创建各种全局变量,已保存用户的配置,运行环境的信息等。

在阅读这篇文章的过程中,要抓住两点,什么时候进行了初始化?初始化做了哪些操作?

默认初始化 Session

调用 NewSessionId 获取一个 id,用这个 id 去注册一个 session

# python/oneflow/compatible/single_client/__init__.py: 53
session_context.OpenDefaultSession(
    session_util.Session(oneflow._oneflow_internal.NewSessionId())
)

# python/oneflow/compatible/single_client/framework/session_util.py: 52
class Session(object):
    def __init__(self, sess_id):
        # ...
        self.sess_ = oneflow._oneflow_internal.RegsiterSession(sess_id)
        # ...

NewSessionId: 使用静态 atomic 类型的整数,每次调用的时候自增 1。基本上可以确定是一个唯一的 id。

// oneflow/core/job/session.cpp: 24
int64_t NewSessionId() {
  static std::atomic<int64_t> counter(0);
  return counter++;
}

RegsiterSession: 创建一个 Session 对象,接下来就是加锁,将 id 及其对应的 session 放入 map 中,设置默认的 session id。注意到下面的 default session id 用的是一个 vector,为什么呢?应该是允许注册多个默认的 session。这个代码片段的最后面,获取默认的 session,查询了两次 map,可以优化一下。之前社区的导师给我 review 的时候,指出了我的这个问题,所以看代码的时候,也会再次注意到。

// oneflow/core/framework/session_util.cpp: 88
Maybe<Session> RegsiterSession(int64_t id) {
  std::shared_ptr<Session> sess = std::make_shared<Session>(id);
  std::unique_lock<std::mutex> lock(*GlobalSessionUtilMutex());
  auto* id2session_map = GlobalId2SessionMap();
  CHECK_OR_RETURN(id2session_map->find(id) == id2session_map->end());
  (*id2session_map)[id] = sess;
  JUST(SetDefaultSessionId(id));
  return id2session_map->at(id);
}

// oneflow/core/framework/session_util.cpp: 22
namespace {

std::mutex* GlobalSessionUtilMutex() {
  static std::mutex global_id2session_map_mutex;
  return &global_id2session_map_mutex;
}

HashMap<int64_t, std::shared_ptr<Session>>* GlobalId2SessionMap() {
  static HashMap<int64_t, std::shared_ptr<Session>> id2session_map;
  return &id2session_map;
}

std::vector<int64_t>* RegsiteredSessionIds() {
  static std::vector<int64_t> default_sess_id;
  return &default_sess_id;
}

Maybe<void> SetDefaultSessionId(int64_t val) {
  std::vector<int64_t>* ids = RegsiteredSessionIds();
  ids->push_back(val);
  return Maybe<void>::Ok();
}

}  // namespace

// oneflow/core/framework/session_util.cpp: 98
Maybe<Session> GetDefaultSession() {
  std::unique_lock<std::mutex> lock(*GlobalSessionUtilMutex());
  const auto& regsitered_ids = *(RegsiteredSessionIds());
  CHECK_GT_OR_RETURN(regsitered_ids.size(), 0);
  int64_t default_sess_id = regsitered_ids.back();
  auto* id2session_map = GlobalId2SessionMap();
  CHECK_OR_RETURN(id2session_map->find(default_sess_id) != id2session_map->end());
  return id2session_map->at(default_sess_id);
}

env 初始化

什么时候执行的 env 初始化?在 import oneflow as flow 的时候执行的。这里使用到了 enable_if 这个注解,如果处在 normal_mode 并且还没有执行过 env 初始化,那么就执行一次 env 初始化。

# python/oneflow/framework/env_util.py: 65
@enable_if.condition(hob.in_normal_mode & ~hob.env_initialized)
def env_init():
    global default_env_proto
    is_multi_client = oneflow._oneflow_internal.IsMultiClient()
    assert len(default_env_proto.machine) > 0
    CompleteEnvProto(default_env_proto, is_multi_client)
    c_api_util.InitEnv(default_env_proto, is_multi_client)
    if not is_multi_client:
        if oneflow._oneflow_internal.CurrentMachineId() == 0:
            scope_util.InitScopeStack()
        else:
            exit(0)
    return True

一个 env 初始化的配置例子如下:

machine {
  id: 0
  addr: "127.0.0.1"
}
ctrl_port: 50645
cpp_logging_conf {
}
ctrl_bootstrap_conf {
  master_addr {
    host: "127.0.0.1"
    port: 37043
  }
  rank: 0
  world_size: 1
}

python 将上面的 env_proto 传给 oneflow 底层进行环境初始化。Env 初始化的时候,会进行很多全局变量的初始化。

  • 首先初始化 log,OneFlow 使用的是 google logging。设置 log 路径,log 等级等。
  • 初始化 CUDA 版本信息
  • 保存环境信息 env_proto 到全局变量当中,初始化 ProcessCtx。
  • 根据配置,设置 RpcManager。如果只有一台机器,使用 local;如果有多台机器,使用 grpc。接着调用 RpcManager 的方法,执行一些初始化的操作。
  • 从 env_proto 中取出资源信息 ResourceDesc,保存到全局变量中。资源信息包括,有多少个机器,多少个 cpu,多少个 gpu。
  • 根据 ResourceDesc 初始化线程池。
  • 初始化 EagerJobBuildAndInferCtxMgr 全局变量。
  • 根据配置和环境变量,初始化 CommNet,可以选择基于 Epoll 的实现,也可以选择基于 IBVerbs 的实现。
  • 总结:在环境初始化阶段,根据配置,初始化全局变量。还有一些细节没有提及,请看下面的代码片段。
// oneflow/core/job/env_global_objects_scope.cpp: 128
Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) {
  InitLogging(env_proto.cpp_logging_conf());
#ifdef WITH_CUDA
  InitGlobalCudaDeviceProp();
#endif
  Global<EnvDesc>::New(env_proto);
  Global<ProcessCtx>::New();
  // Avoid dead lock by using CHECK_JUST instead of JUST. because it maybe be blocked in
  // ~CtrlBootstrap.
  if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
#ifdef RPC_BACKEND_LOCAL
    LOG(INFO) << "using rpc backend: dry-run";
    Global<RpcManager>::SetAllocated(new DryRunRpcManager());
#else
    static_assert(false, "requires rpc backend dry-run to dry run oneflow");
#endif  // RPC_BACKEND_LOCAL
  } else if ((env_proto.machine_size() == 1 && env_proto.has_ctrl_bootstrap_conf() == false)
             || (env_proto.has_ctrl_bootstrap_conf()
                 && env_proto.ctrl_bootstrap_conf().world_size() == 1)) /*single process*/ {
#ifdef RPC_BACKEND_LOCAL
    LOG(INFO) << "using rpc backend: local";
    Global<RpcManager>::SetAllocated(new LocalRpcManager());
#else
    static_assert(false, "requires rpc backend local to run oneflow in single processs");
#endif  // RPC_BACKEND_LOCAL
  } else /*multi process, multi machine*/ {
#ifdef RPC_BACKEND_GRPC
    LOG(INFO) << "using rpc backend: gRPC";
    Global<RpcManager>::SetAllocated(new GrpcRpcManager());
#else
    UNIMPLEMENTED() << "to run distributed oneflow, you must enable at least one multi-node rpc "
                       "backend by adding cmake argument, for instance: -DRPC_BACKEND=GRPC";
#endif  // RPC_BACKEND_GRPC
  }
  CHECK_JUST(Global<RpcManager>::Get()->CreateServer());
  CHECK_JUST(Global<RpcManager>::Get()->Bootstrap());
  CHECK_JUST(Global<RpcManager>::Get()->CreateClient());
  Global<ResourceDesc, ForEnv>::New(GetDefaultResource(env_proto),
                                    GlobalProcessCtx::NumOfProcessPerNode());
  Global<ResourceDesc, ForSession>::New(GetDefaultResource(env_proto),
                                        GlobalProcessCtx::NumOfProcessPerNode());
  Global<device::NodeDeviceDescriptorManager>::SetAllocated(
      new device::NodeDeviceDescriptorManager());
  if (Global<ResourceDesc, ForEnv>::Get()->enable_debug_mode()) {
    Global<device::NodeDeviceDescriptorManager>::Get()->DumpSummary("devices");
  }
  Global<ThreadPool>::New(Global<ResourceDesc, ForSession>::Get()->ComputeThreadPoolSize());
#ifdef WITH_CUDA
  Global<EagerNcclCommMgr>::New();
  Global<CudnnConvAlgoCache>::New();
#endif
  Global<vm::VirtualMachineScope>::New(Global<ResourceDesc, ForSession>::Get()->resource());
  Global<EagerJobBuildAndInferCtxMgr>::New();
  if (!Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
#ifdef __linux__
    Global<EpollCommNet>::New();
    Global<Transport>::New();
    if (Global<ResourceDesc, ForSession>::Get()->process_ranks().size() > 1) {
#ifdef WITH_RDMA
      if (CommNetIBEnabled()) {
        Global<IBVerbsCommNet>::New();
        Global<CommNet>::SetAllocated(Global<IBVerbsCommNet>::Get());
      } else {
        Global<CommNet>::SetAllocated(Global<EpollCommNet>::Get());
      }
#else
      Global<CommNet>::SetAllocated(Global<EpollCommNet>::Get());
#endif  // WITH_RDMA
    }
#endif  // __linux__
  }
  return Maybe<void>::Ok();
}

// oneflow/core/job/env_global_objects_scope.cpp: 70
int32_t GetDefaultCpuDeviceNum() { return std::thread::hardware_concurrency(); }

int32_t GetDefaultGpuDeviceNum() {
#ifndef WITH_CUDA
  return 0;
#else
  int device_count = 0;
  cudaGetDeviceCount(&device_count);
  return device_count;
#endif
}

Resource GetDefaultResource(const EnvProto& env_proto) {
  Resource resource;
  if (env_proto.has_ctrl_bootstrap_conf()) {
    resource.set_machine_num(GlobalProcessCtx::NodeSize());
  } else {
    resource.set_machine_num(env_proto.machine_size());
  }
  resource.set_cpu_device_num(GetDefaultCpuDeviceNum());
  resource.set_gpu_device_num(GetDefaultGpuDeviceNum());
  return resource;
}

scope 初始化

原来的 single client 只有 master 需要 scope 初始化,现在的 multi client 每个机器都要进行 scope 初始化。scope 初始化过程有点骚,将一个 Python 函数传给 C++ 去执行,并且使用 nonlocal 来将值传给外部。

# python/oneflow/framework/scope_util.py: 100
def InitScopeStack():
    job_conf = job_conf_cfg.JobConfigProto()
    job_conf.mutable_predict_conf()
    job_conf.set_job_name("")
    scope = MakeInitialScope(job_conf, "cpu", ["0:0"], None, is_mirrored=False)
    oneflow._oneflow_internal.InitGlobalScopeStack(scope)

# python/oneflow/framework/scope_util.py: 86
def MakeInitialScope(job_conf, device_tag, machine_device_ids, hierarchy, is_mirrored):
    scope = None

    def BuildInitialScope(builder):
        nonlocal scope
        session_id = session_ctx.GetDefaultSession().id
        scope = builder.BuildInitialScope(
            session_id, job_conf, device_tag, machine_device_ids, hierarchy, is_mirrored
        )

    oneflow._oneflow_internal.deprecated.LogicalRun(BuildInitialScope)
    return scope

LogicalRun: 这个函数的输入是一个 function,从 python 拿过来的 BuildInitialScope 函数就是一个 function。

// oneflow/core/framework/instructions_builder.cpp: 1616
Maybe<void> LogicalRun(const std::function<Maybe<void>(InstructionsBuilder*)>& Build) {
  if (JUST(GlobalMultiClientEnv())) {
    // NOTE(chengcheng): in Multi-Client LogicalRun will degenerate directly to PhysicalRun,
    //   because each rank will process instructions ONLY from itself, NOT the master.
    return PhysicalRun(Build);
  }

  const std::shared_ptr<vm::LogicalIdGenerator> id_generator =
      std::make_shared<vm::LogicalIdGenerator>();
  std::shared_ptr<Session> sess = JUST(GetDefaultSession());
  const auto& instruction_list = sess->instruction_list();
  const auto& eager_symbol_list = sess->eager_symbol_list();
  InstructionsBuilder instructions_builder(id_generator, instruction_list.get(),
                                           eager_symbol_list.get(), _ReleaseLogicalObject);
  JUST(Build(&instructions_builder));
  JUST(Global<vm::EagerOneflow>::Get()->RunLogicalInstruction(
      instructions_builder.mut_instruction_list(), instructions_builder.eager_symbol_list()));
  return Maybe<void>::Ok();
}

// oneflow/core/framework/instructions_builder.cpp: 149
Maybe<void> _ReleaseLogicalObject(compatible_py::Object* obj) {
  JUST(LogicalRun([&obj](InstructionsBuilder* build) -> Maybe<void> {
    JUST(build->DeleteObject(obj));
    return Maybe<void>::Ok();
  }));
  return Maybe<void>::Ok();
}

在 Python 传给 C++ 的函数里面,调用了 BuildInitialScope 来构建 scope。这个方法返回值就是一个 scope,保存了运行时的配置。

// oneflow/core/framework/instructions_builder.cpp: 498
Maybe<Scope> InstructionsBuilder::BuildInitialScope(
    int64_t session_id, const std::shared_ptr<cfg::JobConfigProto>& job_conf,
    const std::string& device_tag, const std::vector<std::string>& machine_device_ids,
    const std::shared_ptr<Shape>& hierarchy, bool is_mirrored) {
  std::shared_ptr<cfg::ScopeProto> scope_proto = std::make_shared<cfg::ScopeProto>();
  scope_proto->set_session_id(session_id);
  std::shared_ptr<JobDesc> job_conf_sym = JUST(GetJobConfSymbol(job_conf));
  scope_proto->set_job_desc_symbol_id(JUST(job_conf_sym->symbol_id()));
  std::shared_ptr<cfg::ParallelConf> parallel_conf =
      JUST(MakeParallelConf(device_tag, machine_device_ids, hierarchy));
  std::shared_ptr<ParallelDesc> device_parallel_desc_sym =
      JUST(GetParallelDescSymbol(parallel_conf));
  scope_proto->set_device_parallel_desc_symbol_id(JUST(device_parallel_desc_sym->symbol_id()));
  parallel_conf = JUST(MakeParallelConf("cpu", machine_device_ids, hierarchy));
  std::shared_ptr<ParallelDesc> host_parallel_desc_sym = JUST(GetParallelDescSymbol(parallel_conf));
  scope_proto->set_host_parallel_desc_symbol_id(JUST(host_parallel_desc_sym->symbol_id()));
  if (is_mirrored) {
    scope_proto->mutable_opt_mirrored_parallel_conf()->mutable_mirrored_parallel();
  } else {
    scope_proto->mutable_opt_mirrored_parallel_conf()->clear_mirrored_parallel();
  }
  return GetScopeSymbol(scope_proto);
}

// oneflow/core/framework/instructions_builder.cpp: 314
Maybe<Scope> InstructionsBuilder::GetScopeSymbol(
    const std::shared_ptr<cfg::ScopeProto>& scope_proto) {
  if (JUST(HasSymbol<cfg::ScopeProto>(*scope_proto))) {
    return GetSymbol<cfg::ScopeProto, Scope>(*scope_proto);
  }
  int64_t symbol_id = JUST(NewSymbolId4Scope(scope_proto));
  JUST(AddSymbol<cfg::ScopeProto, ScopeProto, Scope>(symbol_id, *scope_proto));
  return GetSymbol<cfg::ScopeProto, Scope>(*scope_proto);
}

// oneflow/core/framework/symbol_storage_util.h: 24
template<typename SymbolConfT>
Maybe<bool> HasSymbol(const SymbolConfT& symbol_conf) {
  const auto& id_cache = *JUST(GlobalMaybe<symbol::IdCache<SymbolConfT>>());
  return id_cache.Has(symbol_conf);
}

template<typename SymbolConfT, typename SymbolT>
Maybe<SymbolT> GetSymbol(const SymbolConfT& symbol_conf) {
  const auto& id_cache = *JUST(GlobalMaybe<symbol::IdCache<SymbolConfT>>());
  const auto& symbol_storage = *Global<symbol::Storage<SymbolT>>::Get();
  int64_t symbol_id = JUST(id_cache.Get(symbol_conf));
  const auto& ptr = JUST(symbol_storage.MaybeGetPtr(symbol_id));
  JUST(ptr->symbol_id());
  return ptr;
}

// TODO(hanbibin): the second template arg will be moved after symbol_storage is refactored
template<typename SymbolConfT, typename SymbolPbT, typename SymbolT>
Maybe<void> AddSymbol(int64_t symbol_id, const SymbolConfT& symbol_conf) {
  SymbolPbT symbol_pb;
  symbol_conf.ToProto(&symbol_pb);
  JUST(Global<symbol::Storage<SymbolT>>::Get()->Add(symbol_id, symbol_pb));
  auto* id_cache = JUST(GlobalMaybe<symbol::IdCache<SymbolConfT>>());
  CHECK_OR_RETURN(!id_cache->Has(symbol_conf));
  JUST(id_cache->FindOrCreate(symbol_conf, [&symbol_id]() -> Maybe<int64_t> { return symbol_id; }));
  return Maybe<void>::Ok();
}

Scope 究竟是什么东西呢?Scope 其实是对 scope_proto 的封装,下面是 Scope 多封装的一些属性,可以看到 scope_proto_ 作为 Scope 的一个类成员而存在。

// oneflow/core/job/scope.h: 80
int64_t auto_increment_id_;
Maybe<int64_t> symbol_id_;
const ScopeProto scope_proto_;
std::shared_ptr<JobDesc> job_desc_;
Symbol<PlacementScope> placement_scope_;
std::shared_ptr<Scope> parent_scope_symbol_;

scope 的 protobuf 定义如下:

message ScopeProto {
  required int64 job_desc_symbol_id = 20;
  required int64 device_parallel_desc_symbol_id = 30;
  required int64 host_parallel_desc_symbol_id = 40; 
  optional bool enable_cpu_alternative_op = 41 [default = true];
  required OptMirroredParallel opt_mirrored_parallel_conf = 50;
  repeated string scope_op_name_prefixes = 60;
  optional int64 parent_scope_symbol_id = 70;
  required int64 session_id = 80;
  map<string, AttrValue> attr_name2attr_value = 90;
  optional string calculation_pass_name = 100 [default = "forward_pass"];
}

scope 构建完成之后,调用 InitGlobalScopeStack,它其实是个 lambda 函数,负责去调用 InitThreadLocalScopeStack,将 scope 压栈。

// oneflow/api/python/framework/scope_util.cpp: 26
m.def("InitGlobalScopeStack", [](const std::shared_ptr<Scope>& scope) {
  return InitThreadLocalScopeStack(scope).GetOrThrow();
});

// oneflow/core/framework/scope_util.cpp: 23
std::list<std::shared_ptr<Scope>>* ThreadLocalScopeStack() {
  thread_local static std::list<std::shared_ptr<Scope>> scope_stack;
  return &scope_stack;
}

// oneflow/core/framework/scope_util.cpp: 36
Maybe<void> InitThreadLocalScopeStack(const std::shared_ptr<Scope>& scope) {
  auto* scope_stack = ThreadLocalScopeStack();
  scope_stack->clear();
  scope_stack->emplace_back(scope);
  return Maybe<void>::Ok();
}

Session 初始化

在 InferenceSession,Session 的初始化在创建的时候就进行了。在 OneFlow 中,使用 @flow.global_function 修饰一个函数,表示一个 job。这个 job 函数第一次被调用的时候,将会触发 session 的初始化。这个触发的过程是怎么样的呢?下面来仔细看看。

在 python 的初始化脚本中有 import api_oneflow_function as global_function,所以实际上执行的是 api_oneflow_function。根据 eager 或者 lazy 模式,返回一个函数。

# python/oneflow/compatible/single_client/framework/function_util.py: 85
def api_oneflow_function(
    type: str = "predict", function_config: FunctionConfig = None
) -> Callable[[Callable], Callable]:
    """Creates a callable OneFlow global function from a Python function.

    For instance::

        @oneflow.compatible.single_client.global_function(flow.FunctionConfig())
        def train():
            # your model

    Args:
        function_config (FunctionConfig, optional): a `FunctionConfig` object. Defaults to FunctionConfig().

    Returns:
        Callable[[Callable], Callable]: a callable which is called to execute the compiled function
    """
    if isinstance(type, FunctionConfig):
        function_config = type
        print(
            "WARNING: flow.global_function(func_config) is deprecated. Please replace it with flow.global_function(type, func_config).\n            "
        )
        print(traceback.format_stack()[-2])
    else:
        assert type in ["train", "predict"]
        if function_config is None:
            function_config = FunctionConfig()
        if type == "train":
            function_config.function_desc.job_config_proto.mutable_train_conf()
        else:
            function_config.function_desc.job_config_proto.mutable_predict_conf()
    api = enable_if.unique([eager_oneflow_function, lazy_oneflow_function])
    return api(function_config)

下面以 lazy_oneflow_function 为例子,返回这个函数的条件是,normal 模式,静态图,且 session 还没有初始化。session 会这里面进行初始化,也就是用户第一次调用自己定义的 job 函数的时候,会进行 session 的初始化。Decorator 返回一个 Func 函数,这个 Func 会被赋值给用户定义的函数。于是,用户调用自己定义的 job 函数的时候,实际上执行的是 _RunLazyJob。

# python/oneflow/compatible/single_client/framework/function_util.py: 143
@enable_if.condition(
    hob.in_normal_mode & ~hob.eager_execution_enabled & ~hob.session_initialized
)
def lazy_oneflow_function(function_config=FunctionConfig()):
    assert isinstance(function_config, FunctionConfig)

    def Decorator(job_func):
        if not hasattr(job_func, "__oneflow_function_signature__"):
            job_func.__oneflow_function_signature__ = inspect.signature(job_func)
        oft_util.CheckGlobalFunctionAnnotation(job_func.__oneflow_function_signature__)
        sess = session_ctx.GetDefaultSession()

        @functools.wraps(job_func)
        def Func(*args, **kwargs):
            return _RunLazyJob(sess, job_func, *args, **kwargs)

        sess.AddJob(_CloneFunctionDesc(function_config.function_desc, job_func))
        for x in dir(job_func):
            if x.startswith("__oneflow_"):
                setattr(Func, x, getattr(job_func, x))
        return Func

    return Decorator

# python/oneflow/compatible/single_client/framework/function_util.py: 225
def _RunLazyJob(session, job_func, *args, **kwargs):
    return session.TryInit().LazyRun(job_func, *args, **kwargs)

_RunLazyJob 调用了 session 的 TryInit 去执行初始化,这个部分还涉及到构建编译用户的定义的 Job 函数,构建成计算图。这个部分先不展开,下一篇文章再具体分析。这里我们需要知道的时候,Session 的初始化。Session 的初始化调用了一个函数:InitLazyGlobalSession。InitLazyGlobalSession 将一些用户的配置传进去,比如多少个机器,多少个 cpu,多少个 gpu 等信息。StartLazyGlobalSession 为启动时调用的函数,这里咱们先不要关心它。

# python/oneflow/compatible/single_client/framework/session_util.py: 161
def TryInit(self):
    if self.status_ is SessionStatus.OPEN:
        self.Init()
    return self

def Init(self):
    assert self.status_ is SessionStatus.OPEN
    self.status_ = SessionStatus.RUNNING
    if not oneflow._oneflow_internal.IsEnvInited():
        flow.env.init()
    _TryCompleteConfigProto(self.config_proto)
    self.resource_ = self.config_proto.resource
    if not oneflow._oneflow_internal.EagerExecutionEnabled():
        c_api_util.InitLazyGlobalSession(self.config_proto)
        for (job_name, func_desc) in self.job_name2function_desc_.items():
            compiler.Compile(self, func_desc, self.config_proto)
            self.existed_module_names_ = set()
        self.job_name2var_name2var_blob_ = dict()
        assert len(self.job_name2function_desc_.items()) > 0
        oneflow._oneflow_internal.StartLazyGlobalSession()
        self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
        self.UpdateInfo4InterfaceOp()
        if not config_util.api_legacy_model_io_enabled():
            check_point_v2.Init()
    else:
        self.eager_config_proto_ctx_ = oneflow._oneflow_internal.LogicalConfigProtoContext(
            str(self.config_proto)
        )
    return self

接下来就将那些配置信息通过序列化传给 C++ 进行 session 的初始化,初始化的时候,主要是进行各种全局变量的创建。

// oneflow/api/python/session/session.h: 62
inline Maybe<void> InitLazyGlobalSession(const std::string& config_proto_str) {
  CHECK_NOTNULL_OR_RETURN(Global<EnvDesc>::Get()) << "env not found";
  CHECK_OR_RETURN(GlobalProcessCtx::IsThisProcessMaster());

  ClusterInstruction::MasterSendSessionStart();

  ConfigProto config_proto;
  CHECK_OR_RETURN(TxtString2PbMessage(config_proto_str, &config_proto))
      << "failed to parse config_proto: " << config_proto_str;
  FixCpuDeviceNum(&config_proto);
  Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);

  CHECK_ISNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get());
  Global<SessionGlobalObjectsScope>::SetAllocated(new SessionGlobalObjectsScope());
  JUST(Global<SessionGlobalObjectsScope>::Get()->Init(config_proto));
  LOG(INFO) << "NewGlobal " << typeid(SessionGlobalObjectsScope).name();
  return Maybe<void>::Ok();
}

// oneflow/core/job/session_global_objects_scope.cpp: 104
Maybe<void> SessionGlobalObjectsScope::Init(const ConfigProto& config_proto) {
  session_id_ = config_proto.session_id();
  Global<ResourceDesc, ForSession>::Delete();
  DumpVersionInfo();
  Global<ResourceDesc, ForSession>::New(config_proto.resource(),
                                        GlobalProcessCtx::NumOfProcessPerNode());
  Global<IDMgr>::New();
  if (GlobalProcessCtx::IsThisProcessMaster()) {
    Global<AvailableMemDesc>::New();
    if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
      *Global<AvailableMemDesc>::Get() = GetDryRunAvailableMemDesc();
    } else {
      *Global<AvailableMemDesc>::Get() = GetAvailableMemDesc();
    }
    Global<JobName2JobId>::New();
    Global<CriticalSectionDesc>::New();
    Global<InterUserJobInfo>::New();
    Global<LazyJobBuildAndInferCtxMgr>::New();
    Global<JobSetCompileCtx>::New();
    Global<RuntimeBufferManagersScope>::New();
  }
  for (const std::string& lib_path : config_proto.load_lib_path()) { JUST(LoadLibrary(lib_path)); }
  {
    // NOTE(chengcheng): Init Global Runtime objects.
    Global<RuntimeCtx>::New();
    Global<MemoryAllocator>::New();
    Global<ChunkMgr>::New();
    Global<RegstMgr>::New();
    Global<ActorMsgBus>::New();
    Global<ThreadMgr>::New();
    Global<RuntimeJobDescs>::New();
    Global<summary::EventsWriter>::New();
    Global<boxing::collective::CollectiveBoxingExecutor>::New();
    Global<boxing::collective::CollectiveBoxingDeviceCtxPoller>::New();
  }

  return Maybe<void>::Ok();
}

结语

至此,初始化的工作已经基本完成,对于 InferenceSession 来说,接下来要做的是加载模型、编译模型,启动 session、加载权重。而对于一个 Job 函数来说,接下来要做的是将 Job 函数进行编译。InferenceSession 和 Job 函数做的事情,其实差不多,只不过一个从文件系统读取模型,一个从用户的 Job 函数中构建出模型。

参考

[1] https://zhuanlan.zhihu.com/p/337851255

OneFlow: 初始化环境

上一篇:Codeforces Round #740 (Div. 1, based on VK Cup 2021 - Final (Engine)) 题解


下一篇:Windows Azure初体验--功能介绍