不管是Android亦或者Java中或多或少须要调用底层的一些命令。运行一些參数;
此时我们须要用到Java的Process来创建一个子进程。之所以是子进程是由于此进程依赖于发起创建请求的进程,假设发起者被Kill那个子进程也将Kill。
对于Process相信使用过的朋友一定不会陌生,它具有例如以下特点:
1.创建简单
2.控制难
3.easy导致无法创建子进程
4.假设是多线程那么非常有可能造成内存溢出
以上现象假设你仅仅是偶尔使用一次,创建一个进程也许你什么都没有感觉到,可是假设你使用了多线程,进行了大量的创建,以上问题你都会遇到。
相关:[Android] ProcessBuilder与Runtime.getRuntime().exec分别创建进程的差别,[Android]
[Java] 分享 Process 运行命令行封装类
这两个星期一直在研究上面的问题,我要做的软件是在Android中进行TraceRoute,因为手机不可能全然Root所以不能採用JNI来发送ICMP请求的方式,终于仅仅能使用创建进程方式进行。详细实现思路是:使用PING命令来PING百度等地址,在PING命令中增加TTL,得到每一次的IP地址。当IP地址与目标IP地址符合时退出。而且还须要单独PING一次每一跳的延迟和丢包。
单线程:PING 百度 TTL=1 =》 得到IP,PING IP 得到延迟丢包。改变TTL,进行下一次PING。直到所得到的IP与目标(百度)一样时停止。
依照上面的思路一次须要创建两个子进程。一般到百度时TTL大约为12跳左右,所以就是2*12=24个子进程;假设是在单线程下简单明了,可是速度慢,整个过程大约须要1分钟左右。
多线程:同一时候发起3个线程进行3跳測试TTL=(1,2。3),測试完毕后測试下一批数据TTL=(4,5,6)。假设也是12跳的话。那么也是24个子进程,可是总体耗时将会为1/3.可见此时效率较高。
可是多线程须要考虑的是线程的同步问题,以及得到数据后的写入问题,这些赞不谈。仅仅谈进程问题。经过我的測试假如如今測试100个站点的TraceRoute数据,在上层控制一次測试4个站点,底层实现并发3个线程,此时在一定时间内将会同一时候存在3*4个进程。依照平均每一个站点12跳来算:12*2*100=240个子进程,须要的子线程为12*100=120个。
这个时候问题来了,假如如今程序子进程不正常了,遇到了一个一定的问题导致进程无法运行完毕。此时你的现象是:一个子进程卡住,随后创建的全部子进程都卡住。
假如最上层线程做了任务时间限制。那么到时间后将会尝试销毁,可是你会发现无法销毁,所持有的线程也不会销毁。
可是上层以为销毁掉了,然后继续进行下一批的数据測试。此时你的线程数量会逐渐添加,假设100任务下来你的线程也许会达到3*4*100=1200假设有前期没有这种情况那个就是一半:600个线程左右,假设后期还有任务将会继续添加可是却永远不会销毁。可是我们知道JVM的内存是有限的。所以此时将会出现内存溢出。
以上就是我遇到的问题,我最先改为了等待线程全然返回后再进行下一批数据測试。此时内存溢出是攻克了,可是任务却一直卡住在哪里了,永远也不走。
我就在想要解决这一的问题须要解决根本上的问题才行。经过研究我发如今程序创建了子进程后JVM将会创建一个子进程管理线程:“ProcessManager”:
正常情况下该线程状态为Native。可是假设创建大量子进程后有可能会出现此线程为Monitor状态,过一段时间后全部创建子进程的线程状态也将会变为Monitor状态,然后将一直死锁,后面创建线程也是继续死锁,无法继续。
通过查看ProcessManager源代码发现,当中启动了一个线程用于监听子进程状态,同一时候管理子进程,比方输出消息以及关闭子进程等操作,详细例如以下:
/**
* Copyright (C) 2007 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ package java.lang; import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.logging.Logger;
import java.util.logging.Level; /***
* Manages child processes.
*
* <p>Harmony's native implementation (for comparison purposes):
* http://tinyurl.com/3ytwuq
*/
final class ProcessManager { /***
* constant communicated from native code indicating that a
* child died, but it was unable to determine the status
*/
private static final int WAIT_STATUS_UNKNOWN = -1; /***
* constant communicated from native code indicating that there
* are currently no children to wait for
*/
private static final int WAIT_STATUS_NO_CHILDREN = -2; /***
* constant communicated from native code indicating that a wait()
* call returned -1 and set an undocumented (and hence unexpected) errno
*/
private static final int WAIT_STATUS_STRANGE_ERRNO = -3; /***
* Initializes native static state.
*/
static native void staticInitialize();
static {
staticInitialize();
} /***
* Map from pid to Process. We keep weak references to the Process objects
* and clean up the entries when no more external references are left. The
* process objects themselves don't require much memory, but file
* descriptors (associated with stdin/out/err in this case) can be
* a scarce resource.
*/
private final Map<Integer, ProcessReference> processReferences
= new HashMap<Integer, ProcessReference>(); /*** Keeps track of garbage-collected Processes. */
private final ProcessReferenceQueue referenceQueue
= new ProcessReferenceQueue(); private ProcessManager() {
// Spawn a thread to listen for signals from child processes.
Thread processThread = new Thread(ProcessManager.class.getName()) {
@Override
public void run() {
watchChildren();
}
};
processThread.setDaemon(true);
processThread.start();
} /***
* Kills the process with the given ID.
*
* @parm pid ID of process to kill
*/
private static native void kill(int pid) throws IOException; /***
* Cleans up after garbage collected processes. Requires the lock on the
* map.
*/
void cleanUp() {
ProcessReference reference;
while ((reference = referenceQueue.poll()) != null) {
synchronized (processReferences) {
processReferences.remove(reference.processId);
}
}
} /***
* Listens for signals from processes and calls back to
* {@link #onExit(int,int)}.
*/
native void watchChildren(); /***
* Called by {@link #watchChildren()} when a child process exits.
*
* @param pid ID of process that exited
* @param exitValue value the process returned upon exit
*/
void onExit(int pid, int exitValue) {
ProcessReference processReference = null; synchronized (processReferences) {
cleanUp();
if (pid >= 0) {
processReference = processReferences.remove(pid);
} else if (exitValue == WAIT_STATUS_NO_CHILDREN) {
if (processReferences.isEmpty()) {
/**
* There are no eligible children; wait for one to be
* added. The wait() will return due to the
* notifyAll() call below.
*/
try {
processReferences.wait();
} catch (InterruptedException ex) {
// This should never happen.
throw new AssertionError("unexpected interrupt");
}
} else {
/**
* A new child was spawned just before we entered
* the synchronized block. We can just fall through
* without doing anything special and land back in
* the native wait().
*/
}
} else {
// Something weird is happening; abort!
throw new AssertionError("unexpected wait() behavior");
}
} if (processReference != null) {
ProcessImpl process = processReference.get();
if (process != null) {
process.setExitValue(exitValue);
}
}
} /***
* Executes a native process. Fills in in, out, and err and returns the
* new process ID upon success.
*/
static native int exec(String[] command, String[] environment,
String workingDirectory, FileDescriptor in, FileDescriptor out,
FileDescriptor err, boolean redirectErrorStream) throws IOException; /***
* Executes a process and returns an object representing it.
*/
Process exec(String[] taintedCommand, String[] taintedEnvironment, File workingDirectory,
boolean redirectErrorStream) throws IOException {
// Make sure we throw the same exceptions as the RI.
if (taintedCommand == null) {
throw new NullPointerException();
}
if (taintedCommand.length == 0) {
throw new IndexOutOfBoundsException();
} // Handle security and safety by copying mutable inputs and checking them.
String[] command = taintedCommand.clone();
String[] environment = taintedEnvironment != null ? taintedEnvironment.clone() : null;
SecurityManager securityManager = System.getSecurityManager();
if (securityManager != null) {
securityManager.checkExec(command[0]);
}
// Check we're not passing null Strings to the native exec.
for (String arg : command) {
if (arg == null) {
throw new NullPointerException();
}
}
// The environment is allowed to be null or empty, but no element may be null.
if (environment != null) {
for (String env : environment) {
if (env == null) {
throw new NullPointerException();
}
}
} FileDescriptor in = new FileDescriptor();
FileDescriptor out = new FileDescriptor();
FileDescriptor err = new FileDescriptor(); String workingPath = (workingDirectory == null)
? null
: workingDirectory.getPath(); // Ensure onExit() doesn't access the process map before we add our
// entry.
synchronized (processReferences) {
int pid;
try {
pid = exec(command, environment, workingPath, in, out, err, redirectErrorStream);
} catch (IOException e) {
IOException wrapper = new IOException("Error running exec()."
+ " Command: " + Arrays.toString(command)
+ " Working Directory: " + workingDirectory
+ " Environment: " + Arrays.toString(environment));
wrapper.initCause(e);
throw wrapper;
}
ProcessImpl process = new ProcessImpl(pid, in, out, err);
ProcessReference processReference
= new ProcessReference(process, referenceQueue);
processReferences.put(pid, processReference); /**
* This will wake up the child monitor thread in case there
* weren't previously any children to wait on.
*/
processReferences.notifyAll(); return process;
}
} static class ProcessImpl extends Process { /*** Process ID. */
final int id; final InputStream errorStream; /*** Reads output from process. */
final InputStream inputStream; /*** Sends output to process. */
final OutputStream outputStream; /*** The process's exit value. */
Integer exitValue = null;
final Object exitValueMutex = new Object(); ProcessImpl(int id, FileDescriptor in, FileDescriptor out,
FileDescriptor err) {
this.id = id; this.errorStream = new ProcessInputStream(err);
this.inputStream = new ProcessInputStream(in);
this.outputStream = new ProcessOutputStream(out);
} public void destroy() {
try {
kill(this.id);
} catch (IOException e) {
Logger.getLogger(Runtime.class.getName()).log(Level.FINE,
"Failed to destroy process " + id + ".", e);
}
} public int exitValue() {
synchronized (exitValueMutex) {
if (exitValue == null) {
throw new IllegalThreadStateException(
"Process has not yet terminated.");
} return exitValue;
}
} public InputStream getErrorStream() {
return this.errorStream;
} public InputStream getInputStream() {
return this.inputStream;
} public OutputStream getOutputStream() {
return this.outputStream;
} public int waitFor() throws InterruptedException {
synchronized (exitValueMutex) {
while (exitValue == null) {
exitValueMutex.wait();
}
return exitValue;
}
} void setExitValue(int exitValue) {
synchronized (exitValueMutex) {
this.exitValue = exitValue;
exitValueMutex.notifyAll();
}
} @Override
public String toString() {
return "Process[id=" + id + "]";
}
} static class ProcessReference extends WeakReference<ProcessImpl> { final int processId; public ProcessReference(ProcessImpl referent,
ProcessReferenceQueue referenceQueue) {
super(referent, referenceQueue);
this.processId = referent.id;
}
} static class ProcessReferenceQueue extends ReferenceQueue<ProcessImpl> { @Override
public ProcessReference poll() {
// Why couldn't they get the generics right on ReferenceQueue? :(
Object reference = super.poll();
return (ProcessReference) reference;
}
} static final ProcessManager instance = new ProcessManager(); /*** Gets the process manager. */
static ProcessManager getInstance() {
return instance;
} /*** Automatically closes fd when collected. */
private static class ProcessInputStream extends FileInputStream { private FileDescriptor fd; private ProcessInputStream(FileDescriptor fd) {
super(fd);
this.fd = fd;
} @Override
public void close() throws IOException {
try {
super.close();
} finally {
synchronized (this) {
if (fd != null && fd.valid()) {
try {
ProcessManager.close(fd);
} finally {
fd = null;
}
}
}
}
}
} /*** Automatically closes fd when collected. */
private static class ProcessOutputStream extends FileOutputStream { private FileDescriptor fd; private ProcessOutputStream(FileDescriptor fd) {
super(fd);
this.fd = fd;
} @Override
public void close() throws IOException {
try {
super.close();
} finally {
synchronized (this) {
if (fd != null && fd.valid()) {
try {
ProcessManager.close(fd);
} finally {
fd = null;
}
}
}
}
}
} /*** Closes the given file descriptor. */
private static native void close(FileDescriptor fd) throws IOException;
}
在当中有一个“native void watchChildren();”方法,此方法为线程主方法。详细实现能够看看JNI,在当中回调了方法:“void onExit(int pid, int exitValue);” 在方法中:
void onExit(int pid, int exitValue) {
ProcessReference processReference = null; synchronized (processReferences) {
cleanUp();
if (pid >= 0) {
processReference = processReferences.remove(pid);
} else if (exitValue == WAIT_STATUS_NO_CHILDREN) {
if (processReferences.isEmpty()) {
/**
* There are no eligible children; wait for one to be
* added. The wait() will return due to the
* notifyAll() call below.
*/
try {
processReferences.wait();
} catch (InterruptedException ex) {
// This should never happen.
throw new AssertionError("unexpected interrupt");
}
} else {
/**
* A new child was spawned just before we entered
* the synchronized block. We can just fall through
* without doing anything special and land back in
* the native wait().
*/
}
} else {
// Something weird is happening; abort!
throw new AssertionError("unexpected wait() behavior");
}
} if (processReference != null) {
ProcessImpl process = processReference.get();
if (process != null) {
process.setExitValue(exitValue);
}
}
}
此方法作用是删除子进程队列中子进程同一时候通知子进程ProcessImpl已完毕。
可是在方法:“watchChildren()”中假设出现System.in缓冲期满的情况那么进程将无法正常结束,它将一直等待缓冲区有空间存在。而缓冲区又是公共区间,假设一个出现等待那么兴许子进程也将所有等待。假设缓冲区无法清空。那么所有子进程将会所有死锁掉。
这就是导致子进程卡死的凶手。
知道问题关键点那么就会有人想办法解决,比如:
//...读取数据... process.waitFor(); //....再次读取
这种方式看似非常好,可是你有没有想过有些数据无法及时返回,所以在waitfor()之前读取非常有可能没有数据导致进行waitfor()等待,这时我们能够看看源代码:
public int waitFor() throws InterruptedException {
synchronized (exitValueMutex) {
while (exitValue == null) {
exitValueMutex.wait();
}
return exitValue;
}
}
void setExitValue(int exitValue) {
synchronized (exitValueMutex) {
this.exitValue = exitValue;
exitValueMutex.notifyAll();
}
}
这里能够看见假如没有退出值将会进行等待,直到通知发生,可是通知想要发生必需要靠“ProcessManager”线程来告诉你。
可是假如在等待过程中出现了大量的数据。导致System.IN满了,此时“ProcessManager”线程非常傻非常傻的进入了等待状态中,也将无法进行通知。而这边也就无法往下走。无法到达第二次读取,所以第二次读取就非常随机了,在大量数据下第二次读取基本上就是摆设。也就是说无法正常的运行,终于也将导致死锁。
解决的方法也非常easy。创建线程后我们能够创建一个线程来专门读取信息。直到“ProcessManager”线程通知结束的时候,才退出线程。
首先我们看看Process提供的“exitValue()”方法:
public int exitValue() {
synchronized (exitValueMutex) {
if (exitValue == null) {
throw new IllegalThreadStateException(
"Process has not yet terminated.");
} return exitValue;
}
}
可见在”exitValue“没有值时将会抛出异常而不会堵塞,所以能够得出:”exitValue()“与”waitfor()“都能够用于推断线程是否完毕。可是一个是堵塞的一个是不堵塞的方法。在线程中当然使用不堵塞的来完毕我们的工作:
/**
* 实例化一个ProcessModel
*
* @param process Process
*/
private ProcessModel(Process process) {
//init
this.process = process;
//get
out = process.getOutputStream();
in = process.getInputStream();
err = process.getErrorStream(); //in
if (in != null) {
isInReader = new InputStreamReader(in);
bInReader = new BufferedReader(isInReader, BUFFER_LENGTH);
} sbReader = new StringBuilder(); //start read thread
readThread();
} .................... //读取结果
private void read() {
String str;
//read In
try {
while ((str = bInReader.readLine()) != null) {
sbReader.append(str);
sbReader.append(BREAK_LINE);
}
} catch (Exception e) {
e.printStackTrace();
Logs.e(TAG, e.getMessage());
}
} /**
* 启动线程进行异步读取结果
*/
private void readThread() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//
while (true) {
try {
process.exitValue();
//read last
read();
break;
} catch (IllegalThreadStateException e) {
read();
}
StaticFunction.sleepIgnoreInterrupt(300);
} //read end
int len;
if (in != null) {
try {
while ((len = in.read(BUFFER)) > 0) {
Logs.d(TAG, String.valueOf(len));
}
} catch (IOException e) {
e.printStackTrace();
Logs.e(TAG, e.getMessage());
}
} //close
close(); //done
isDone = true;
}
}); thread.setName("DroidTestAgent.Test.TestModel.ProcessModel:ReadThread");
thread.setDaemon(true);
thread.start(); }
当创建进程后把进程丢进我建立的类中实例化为一个进程管理类。随后启动线程,线程运行中调用进程的”exitValue()“,假设异常就进入读取数据,直到不异常时再次读取一次最后数据,随后退出循环,退出后还读取了一次底层的数据(这个事实上能够不用要,纯属心理作用!)。最后写入完毕标记。
当中”StaticFunction.sleepIgnoreInterrupt(300);“是我写的静态方法用于休眠等待而已。也就是Sleep。仅仅只是增加了try
catch。
当然光是读取IN流是不行的,还有Error流,这个时候就须要两个线程来完毕。一个也行。只是我为了简单採用了:ProcessBuilder类创建进程并重定向了错误流到IN流中。这样简化了操作。
而使用ProcessBuilder类须要注意的是同一个ProcessBuilder实例创建子进程的时候是须要进行线程同步操作的,由于假设并发操作将会导致进程參数错误等现象发生。所以建议加上线程相互排斥来实现。可是不建议反复创建ProcessBuilder实例。创建那么多实例,何不把全部子进程放在一个ProcessBuilder实例里边。降低内存消耗啊,手机伤不起啊。
有必要提出的是,当线程推断结束的时候,也就是退出值(exitvalue)有值得时候此时事实上在”ProcessManager“线程中已经杀掉了进程了。此时在进程中事实上没有此进程了。有的也就是运行后的数据流而已。所以正常结束情况下无需自己调用”destroy()“方法。调用后将会触发异常,说没有找到此进程。
public void destroy() {
try {
kill(this.id);
} catch (IOException e) {
Logger.getLogger(Runtime.class.getName()).log(Level.FINE,
"Failed to destroy process " + id + ".", e);
}
}
最终讲完了。累啊;
最后给大家分享我自己弄得一个类(ProcessModel)。大家喜欢就直接拿去,假设有好的建议希望大家提出来:
import com.droidtestagent.journal.Logs;
import com.droidtestagent.util.StaticFunction; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; /**
* Create By Qiujuer
* 2014-08-05
* <p/>
* 运行命令行语句进程管理封装
*/
public class ProcessModel {
private static final String TAG = "ProcessModel";
//换行符
private static final String BREAK_LINE;
//错误缓冲
private static final byte[] BUFFER;
//缓冲区大小
private static final int BUFFER_LENGTH;
//创建进程时须要相互排斥进行
private static final Lock lock = new ReentrantLock();
//ProcessBuilder
private static final ProcessBuilder prc; final private Process process;
final private InputStream in;
final private InputStream err;
final private OutputStream out;
final private StringBuilder sbReader; private BufferedReader bInReader = null;
private InputStreamReader isInReader = null;
private boolean isDone; /**
* 静态变量初始化
*/
static {
BREAK_LINE = "\n";
BUFFER_LENGTH = 128;
BUFFER = new byte[BUFFER_LENGTH]; prc = new ProcessBuilder();
} /**
* 实例化一个ProcessModel
*
* @param process Process
*/
private ProcessModel(Process process) {
//init
this.process = process;
//get
out = process.getOutputStream();
in = process.getInputStream();
err = process.getErrorStream(); //in
if (in != null) {
isInReader = new InputStreamReader(in);
bInReader = new BufferedReader(isInReader, BUFFER_LENGTH);
} sbReader = new StringBuilder(); //start read thread
readThread();
} /**
* 运行命令
*
* @param params 命令參数 eg: "/system/bin/ping", "-c", "4", "-s", "100","www.qiujuer.net"
*/
public static ProcessModel create(String... params) {
Process process = null;
try {
lock.lock();
process = prc.command(params)
.redirectErrorStream(true)
.start();
} catch (IOException e) {
e.printStackTrace();
} finally {
//sleep 100
StaticFunction.sleepIgnoreInterrupt(100);
lock.unlock();
}
if (process == null)
return null;
return new ProcessModel(process);
} /**
* 通过Android底层实现进程关闭
*
* @param process 进程
*/
public static void kill(Process process) {
int pid = getProcessId(process);
if (pid != 0) {
try {
android.os.Process.killProcess(pid);
} catch (Exception e) {
try {
process.destroy();
} catch (Exception ex) {
//ex.printStackTrace();
}
}
}
} /**
* 获取进程的ID
*
* @param process 进程
* @return id
*/
public static int getProcessId(Process process) {
String str = process.toString();
try {
int i = str.indexOf("=") + 1;
int j = str.indexOf("]");
str = str.substring(i, j);
return Integer.parseInt(str);
} catch (Exception e) {
return 0;
}
} //读取结果
private void read() {
String str;
//read In
try {
while ((str = bInReader.readLine()) != null) {
sbReader.append(str);
sbReader.append(BREAK_LINE);
}
} catch (Exception e) {
e.printStackTrace();
Logs.e(TAG, e.getMessage());
}
} /**
* 启动线程进行异步读取结果
*/
private void readThread() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//while to end
while (true) {
try {
process.exitValue();
//read last
read();
break;
} catch (IllegalThreadStateException e) {
read();
}
StaticFunction.sleepIgnoreInterrupt(300);
} //read end
int len;
if (in != null) {
try {
while ((len = in.read(BUFFER)) > 0) {
Logs.d(TAG, String.valueOf(len));
}
} catch (IOException e) {
e.printStackTrace();
Logs.e(TAG, e.getMessage());
}
} //close
close(); //done
isDone = true;
}
}); thread.setName("DroidTestAgent.Test.TestModel.ProcessModel:ReadThread");
thread.setDaemon(true);
thread.start(); } /**
* 获取运行结果
*
* @return 结果
*/
public String getResult() {
//waite process setValue
try {
process.waitFor();
} catch (Exception e) {
e.printStackTrace();
Logs.e(TAG, e.getMessage());
} //until startRead en
while (true) {
if (isDone)
break;
StaticFunction.sleepIgnoreInterrupt(100);
} //return
if (sbReader.length() == 0)
return null;
else
return sbReader.toString();
} /**
* 关闭全部流
*/
private void close() {
//close out
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//err
if (err != null) {
try {
err.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//in
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (isInReader != null) {
try {
isInReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (bInReader != null) {
try {
bInReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} /**
* 销毁
*/
public void destroy() {
//process
try {
process.destroy();
} catch (Exception ex) {
kill(process);
}
}
}
想了想还是把代码托管到了GitHub上,方便以后分享其它的代码。
地址:Android Utils
很欢迎大家找出不足发表问题。