Future<bool> MesosContainerizerProcess::__launch(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const
string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint,
const list<Option<ContainerLaunchInfo>>& launchInfos)
{
……
// Prepare environment variables for the executor.
map<string, string> environment = executorEnvironment(
executorInfo,
directory,
slaveId,
slavePid,
checkpoint,
flags);
// Determine the root filesystem for the container. Only one
// isolator should return the container root filesystem.
Option<string> rootfs;
// Determine the executor launch command for the container.
// At most one command can be returned from docker runtime
// isolator if a docker image is specifed.
Option<CommandInfo> executorLaunchCommand;
Option<string> workingDirectory;
foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {
if (launchInfo.isSome() && launchInfo->has_rootfs()) {
if (rootfs.isSome()) {
return Failure("Only one isolator should return the container rootfs");
} else {
rootfs = launchInfo->rootfs();
}
}
if (launchInfo.isSome() && launchInfo->has_environment()) {
foreach (const Environment::Variable& variable,
launchInfo->environment().variables()) {
const
string& name = variable.name();
const
string& value = variable.value();
if (environment.count(name)) {
VLOG(1) << "Overwriting environment variable '"
<< name << "', original: '"
<< environment[name] << "', new: '"
<< value << "', for container "
<< containerId;
}
environment[name] = value;
}
}
if (launchInfo.isSome() && launchInfo->has_command()) {
if (executorLaunchCommand.isSome()) {
return Failure("At most one command can be returned from isolators");
} else {
executorLaunchCommand = launchInfo->command();
}
}
if (launchInfo.isSome() && launchInfo->has_working_directory()) {
if (workingDirectory.isSome()) {
return Failure(
"At most one working directory can be returned from isolators");
} else {
workingDirectory = launchInfo->working_directory();
}
}
}
// TODO(jieyu): Consider moving this to 'executorEnvironment' and
// consolidating with docker containerizer.
environment["MESOS_SANDBOX"] =
rootfs.isSome() ? flags.sandbox_directory : directory;
// Include any enviroment variables from CommandInfo.
foreach (const Environment::Variable& variable,
executorInfo.command().environment().variables()) {
environment[variable.name()] = variable.value();
}
JSON::Array commandArray;
int namespaces = 0;
foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {
if (!launchInfo.isSome()) {
continue;
}
// Populate the list of additional commands to be run inside the container
// context.
foreach (const CommandInfo& command, launchInfo->commands()) {
commandArray.values.emplace_back(JSON::protobuf(command));
}
// Process additional environment variables returned by isolators.
if (launchInfo->has_environment()) {
foreach (const Environment::Variable& variable,
launchInfo->environment().variables()) {
environment[variable.name()] = variable.value();
}
}
if (launchInfo->has_namespaces()) {
namespaces |= launchInfo->namespaces();
}
}
// TODO(jieyu): Use JSON::Array once we have generic parse support.
JSON::Object commands;
commands.values["commands"] = commandArray;
return logger->prepare(executorInfo, directory)
.then(defer(
self(),
[=](const ContainerLogger::SubprocessInfo& subprocessInfo)
-> Future<bool> {
// Use a pipe to block the child until it's been isolated.
int pipes[2];
// We assume this should not fail under reasonable conditions so
// we use CHECK.
CHECK(pipe(pipes) == 0);
// Prepare the flags to pass to the launch process.
MesosContainerizerLaunch::Flags launchFlags;
launchFlags.command = executorLaunchCommand.isSome()
? JSON::protobuf(executorLaunchCommand.get())
: JSON::protobuf(executorInfo.command());
launchFlags.sandbox = rootfs.isSome()
? flags.sandbox_directory
: directory;
// NOTE: If the executor shares the host filesystem, we should not
// allow them to 'cd' into an arbitrary directory because that'll
// create security issues.
if (rootfs.isNone() && workingDirectory.isSome()) {
LOG(WARNING) << "Ignore working directory '" << workingDirectory.get()
<< "' specified in container launch info for container "
<< containerId << " since the executor is using the "
<< "host filesystem";
} else {
launchFlags.working_directory = workingDirectory;
}
launchFlags.pipe_read = pipes[0];
launchFlags.pipe_write = pipes[1];
launchFlags.commands = commands;
// Fork the child using launcher.
vector<string> argv(2);
argv[0] = MESOS_CONTAINERIZER;
argv[1] = MesosContainerizerLaunch::NAME;
Try<pid_t> forked = launcher->fork(
containerId,
path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
argv,
Subprocess::FD(STDIN_FILENO),
(local ? Subprocess::FD(STDOUT_FILENO)
: Subprocess::IO(subprocessInfo.out)),
(local ? Subprocess::FD(STDERR_FILENO)
: Subprocess::IO(subprocessInfo.err)),
launchFlags,
environment,
None(),
namespaces); // 'namespaces' will be ignored by PosixLauncher.
if (forked.isError()) {
return Failure("Failed to fork executor: " + forked.error());
}
pid_t pid = forked.get();
// Checkpoint the executor's pid if requested.
if (checkpoint) {
const
string& path = slave::paths::getForkedPidPath(
slave::paths::getMetaRootDir(flags.work_dir),
slaveId,
executorInfo.framework_id(),
executorInfo.executor_id(),
containerId);
LOG(INFO) << "Checkpointing executor's forked pid " << pid
<< " to '" << path << "'";
Try<Nothing> checkpointed =
slave::state::checkpoint(path, stringify(pid));
if (checkpointed.isError()) {
LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
<< path << "': " << checkpointed.error();
return Failure("Could not checkpoint executor's pid");
}
}
// Monitor the executor's pid. We keep the future because we'll
// refer to it again during container destroy.
Future<Option<int>> status = process::reap(pid);
status.onAny(defer(self(), &Self::reaped, containerId));
containers_[containerId]->status = status;
return isolate(containerId, pid)
.then(defer(self(),
&Self::fetch,
containerId,
executorInfo.command(),
directory,
user,
slaveId))
.then(defer(self(), &Self::exec, containerId, pipes[1]))
.onAny(lambda::bind(&os::close, pipes[0]))
.onAny(lambda::bind(&os::close, pipes[1]));
}));
}