Spark(四十四):使用Java调用spark-submit.sh(支持 --deploy-mode client和cluster两种方式)并获取applicationId

之前也介绍过使用yarn api来submit spark任务,通过提交接口返回applicationId的用法,具体参考《Spark2.3(四十):如何使用java通过yarn api调度spark app,并根据appId监控任务,关闭任务,获取任务日志》;

但是我更喜欢使用该篇文章中介绍的使用java来调用spark-submit.sh shell提交任务,并从spark-sbumit.sh执行界面获取applicationId的方案。使用hadoop api方式需要配置好环境,以及根据hadoop版本不同,需要引入不通包。

用java调用shell使用说明:

用java调用shell,使用

 Process p=Runtime.getRuntime().exec(String[] cmd);

Runtime.exec()方法将产生一个本地的进程,并返回一个Process子类的实例,该实例可用于控制进程或取得进程的相关信息。
由于调用Runtime.exec方法所创建的子进程没有自己的终端或控制台,因此该子进程的标准IO(如stdin,stdou,stderr)都通过

     p.getOutputStream(),
p.getInputStream(),
p.getErrorStream()

方法重定向给它的父进程了.用户需要用这些stream来向 子进程输入数据或获取子进程的输出。
    例如:Runtime.getRuntime().exec("ls")

  • 另外需要关心的是Runtime.getRuntime().exec()中产生停滞(阻塞,blocking)的问题?

因为Runtime.getRuntime().exec()要自己去处理stdout和stderr的输出,就是说,执行的结果不知道是现有错误输出(stderr),还是现有标准输出(stdout)。你无法判断到底那个先输出,所以可能无法读取输出,而一直阻塞。
    例如:你先处理标准输出(stdout),但是处理的结果是先有错误输出(stderr),一直在等错误输出(stderr)被取走了,才到标准输出(stdout),这样就产生了阻塞。

  • 解决办法:

用两个线程将标准输出(stdout)和错误输出(stderr)。

参考代码:

 import java.util.*;
import java.io.*; class StreamGobbler extends Thread
{
InputStream is;
String type; StreamGobbler(InputStream is, String type)
{
this.is = is;
this.type = type;
} public void run()
{
try
{
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line=null;
while ( (line = br.readLine()) != null)
System.out.println(type + ">" + line);
} catch (IOException ioe)
{
ioe.printStackTrace();
}
}
} public class ExecRunner
{
public static void main(String args[])
{
if (args.length < 1)
{
System.out.println("USAGE: java GoodWindowsExec <cmd>");
System.exit(1);
} try
{
String osName = System.getProperty("os.name" );
String[] cmd = new String[3];
if( osName.equals( "Windows NT" ) )
{
cmd[0] = "cmd.exe" ;
cmd[1] = "/C" ;
cmd[2] = args[0];
}
else if( osName.equals( "Windows 95" ) )
{
cmd[0] = "command.com" ;
cmd[1] = "/C" ;
cmd[2] = args[0];
} else {
StringTokenizer st = new StringTokenizer(command, " ");
cmd = new String[st.countTokens()];
int token = 0;
while (st.hasMoreTokens()) {
String tokenString = st.nextToken();
// System.out.println(tokenString);
cmd[token++] = tokenString;
}
} Runtime rt = Runtime.getRuntime();
System.out.println("Execing " + cmd[0] + " " + cmd[1]
+ " " + cmd[2]);
Process proc = rt.exec(cmd);
// any error message?
StreamGobbler errorGobbler = new
StreamGobbler(proc.getErrorStream(), "ERROR"); // any output?
StreamGobbler outputGobbler = new
StreamGobbler(proc.getInputStream(), "OUTPUT"); // kick them off
errorGobbler.start();
outputGobbler.start(); // any error???
int exitVal = proc.waitFor();
System.out.println("ExitValue: " + exitVal);
} catch (Throwable t)
{
t.printStackTrace();
}
}
}

使用JAVA调用spark-submit.sh实现

spark-submit提交脚本submit_test.sh

#/bin/sh
jarspath='' for file in `ls /home/dx/djj/sparkjars/*.jar`
do
jarspath=${file},$jarspath
done
jarspath=${jarspath%?} echo $jarspath /home1/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.dx.test.BroadcastTest \
--properties-file ./conf/spark-properties-mrs.conf \
--jars $jarspath \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 1 \
--driver-memory 2G \
--driver-java-options "-XX:+TraceClassPaths" \
./test.jar $1 $2 $3 $4

注意:yarn的提交方式测试时,需要修改--deploy-mode参数:

cluster方式:--deploy-mode cluster \

client  方式:--deploy-mode client \

我们如果需要从spark-submit中获取到applicationId,就需要从spark-submit执行打印结果(也就是Process对象的标准输出、错误输出)过滤出applicationId,如果用过spark-submit.sh提交spark任务的话,你会发现执行时,在打印界面上会输出applicationId。

  • yarn的client方式(--deploy-mode client)时,执行spark-submit.sh提交任务打印applicationid的位置:
// :: INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@215a34b4{/static,null,AVAILABLE,@Spark}
// :: INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e380628{/,null,AVAILABLE,@Spark}
// :: INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1eaf1e62{/api,null,AVAILABLE,@Spark}
// :: INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@652ab8d9{/jobs/job/kill,null,AVAILABLE,@Spark}
// :: INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51e0301d{/stages/stage/kill,null,AVAILABLE,@Spark}
// :: INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0..com.cn/10.60.0.11:
[Opened /usr/java/jdk1..0_152/jre/lib/jce.jar]
[Opened /usr/java/jdk1..0_152/jre/lib/charsets.jar]
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
  • yarn的cluster方式(--deploy-mode cluster)时,执行spark-submit.sh提交任务打印applicationid的位置:
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)

过滤applicationId函数实现如下:

    /**
* @param line stdin,stderr的一行信息。
* */
private String filterApplicationId(String line, boolean isCluster) {
String applicationId = null;
line = line.toLowerCase(); // --deploy-mode client
// 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
// 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
// --deploy-mode cluster
// 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
// 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
if (isIndexSparkOwnLog && false == isCluster) {
int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
} else if (isIndexSparkOwn2Log && true == isCluster) {
int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
}
}
} if (applicationId != null && applicationId.startsWith("application_")) {
System.out.println("====================================Index of applicationId:" + applicationId);
System.out.println("====================================Index of applicationId:Complete ...");
} return applicationId;
}

如果过滤成功,就反回applicationId,过滤不到返回null。

对stdin,stderr Stream进行接收的线程定义:

class StreamFilterTask implements Callable<String> {
private InputStream inputStream;
private ConcurrentLinkedQueue<String> queue;
private String streamType = null;
private boolean isCluster; private StreamFilterTask() {
} public StreamFilterTask(InputStream inputStream, ConcurrentLinkedQueue<String> queue, String streamType,
boolean isCluster) {
this.inputStream = inputStream;
this.queue = queue;
this.streamType = streamType;
this.isCluster = isCluster;
} @Override
public String call() throws Exception {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line); // 维护队列为最近1000条记录,超过就删除。
// size() 是要遍历一遍集合的,所以尽量要避免用size而改用isEmpty().
if (this.streamType.equalsIgnoreCase("error")) {
if (queue.size() > 1000) {
// 检索并删除此队列的头,如果此队列为空,则返回空值。
queue.poll();
}
// 在该队列的尾部插入指定的元素。由于队列未绑定,因此此方法永远不会返回false。
queue.offer(line);
} String applicationId = filterApplicationId(line, isCluster); if (applicationId != null && applicationId.startsWith("application_")) {
return applicationId;
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} return null;
} /**
* @param line stdin,stderr的一行信息。
* */
private String filterApplicationId(String line, boolean isCluster) {
String applicationId = null;
line = line.toLowerCase(); // --deploy-mode client
// 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
// 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
// --deploy-mode cluster
// 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
// 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
if (isIndexSparkOwnLog && false == isCluster) {
int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
} else if (isIndexSparkOwn2Log && true == isCluster) {
int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
}
}
} if (applicationId != null && applicationId.startsWith("application_")) {
System.out.println("====================================Index of applicationId:" + applicationId);
System.out.println("====================================Index of applicationId:Complete ...");
} return applicationId;
}
}

SubmitSpark类定义:

该类使用Porcess来处理脚本,通过获取Process对象的stdin,stderr过滤applicationId,通过Process.waitFro(tiimeout,TimeUnit)来控制最大允许等待时间。

class SubmitSpark {
public String submit(String filePath, long timeoutMinutes, String charsetName) {
String applicatioId = null; String command = filePath;
boolean isCluster = false;
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), charsetName));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
if (line.replace(" ", " ").toLowerCase().indexOf("--deploy-mode cluster") != -1) {
isCluster = true;
break;
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} StringTokenizer st = new StringTokenizer(command, " ");
String[] cmd = new String[st.countTokens()];
int token = 0;
while (st.hasMoreTokens()) {
String tokenString = st.nextToken();
cmd[token++] = tokenString;
} Runtime rt = Runtime.getRuntime();
System.out.println("Execing " + command);
Process proc = null; try {
proc = rt.exec(cmd); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
ExecutorService executor = Executors.newFixedThreadPool(2); // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get();
// 如果在这里使用get(),会造成等待同步。
// any output?
Future<String> futureInput = executor.submit(new StreamFilterTask(proc.getInputStream(), queue, "input",
isCluster));
// any error message?
Future<String> futureError = executor.submit(new StreamFilterTask(proc.getErrorStream(), queue, "error",
isCluster)); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ ",开始proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);");
// any error???
boolean exitVal = proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
System.out.println("exitVal:" + exitVal);
proc.destroyForcibly();
System.out.println("proc.isAlive():" + proc.isAlive());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ ",结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);"); // applicationId不管--deploy-mode是cluster,还是client方式,applicationId信息都从getErrorStream中读取出来,因此只要能提交成功,就返回,除非资源不足,一直到超时失败为止。
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureError.get();:");
if (futureError.get() != null) {
applicatioId = futureError.get();
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureError.get();:"
+ applicatioId); // 如果是cluster方式,会阻塞,因此不能打开该段代码
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureInput.get();:");
// if (futureInput.get() != null) {
// applicatioId = futureInput.get();
// } // kill process进程
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureInput.get();:"
// + applicatioId);
//
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取process Id");
// long pid = -1;
// try {
// Class<?> clazz = Class.forName("java.lang.UNIXProcess");
// Field field = clazz.getDeclaredField("pid");
// field.setAccessible(true);
// pid = (Integer) field.get(proc);
// } catch (Throwable e) {
// e.printStackTrace();
// }
// System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取process Id:"
// + pid);
//
// System.out.println("proc.isAlive():" + proc.isAlive());
// String[] killCmd = { "sh", "-c", "kill -9 " + pid };
// Runtime.getRuntime().exec(killCmd).waitFor(); System.out.println("Complete:" + applicatioId);
} catch (Throwable t) {
t.printStackTrace();
} finally {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ ",开始获取if (proc != null && proc.isAlive())");
if (proc != null && proc.isAlive()) {
proc.destroyForcibly();
System.out.println("proc.isAlive():" + proc.isAlive());
}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
+ ",结束获取if (proc != null && proc.isAlive())");
} return applicatioId;
}
}

注意:

1)过期时间不能太短,太短会造成结果:任务还未提交到yarn就结束,导致任务提交还未提交就被结束问题,无法返回applicationId。

2)上边的SparkSubmit函数即使返回了applicatioId后,java -cp test.jar com.dx.test.Submit该java执行spark-submit.sh shell的程序都不会退出,实际上process的stdin,stderr还在打开中;

3)即使打开上边的kill process的代码,stdin,stderr被关闭依然无法让java -cp test.jar com.dx.test.Submit程序退出。打开kill process代码吧process对象给关闭后,(只要已经将spark任务提交到了yarn上)程序会catch到stdin,stderr的错误(在人为关闭java执行shell提交spark程序之前,yarn client方式式yarn上的spark程序也不会退出,yarn cluster一旦提交到yarn关闭java程序也无法关闭yarn上的spark程序)但yarn上的spark程序不会被关闭。因此,这个process代码可有可无。

测试:

package com.dx.test

public class Submit {
public static void main(String[] args) {
String filePath = "./submit_test.sh";
String charsetName = "utf-8";
long timeoutMinutes = ; SubmitSpark submitSpark = new SubmitSpark();
String applicationId = submitSpark.submit(filePath, timeoutMinutes, charsetName); System.out.println("return the applicationId:" + applicationId);
}
}

超时时间设置为2minutes

  • yarn --deploy-mode client时,执行会出现以下问题:此时超时时间设置为2 minutes
// :: INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0..com.cn/10.60.0.11:
[Opened /usr/java/jdk1..0_152/jre/lib/jce.jar]
exitVal:false
proc.isAlive():false
-- ::,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
-- ::,开始获取applicationId:
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:)
at java.io.BufferedInputStream.read(BufferedInputStream.java:)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:)
at java.io.InputStreamReader.read(InputStreamReader.java:)
at java.io.BufferedReader.fill(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:)
at java.io.BufferedInputStream.read(BufferedInputStream.java:)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:)
at java.io.InputStreamReader.read(InputStreamReader.java:)
at java.io.BufferedReader.fill(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
-- ::,结束获取applicationId:null
-- ::,开始获取process Id
-- ::,结束获取process Id:
proc.isAlive():false
Complete:null
-- ::,开始获取if (proc != null && proc.isAlive())
-- ::,结束获取if (proc != null && proc.isAlive())
return the applicationId:null

备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。

1)获取不到applicationId,但是此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,此时yarn上的spark程序会被关闭。

  • yarn --deploy-mode cluster时,执行会出现以下问题:此时超时时间设置为2 minutes
// :: INFO yarn.Client: Uploading resource file:/home1/boco/duanjiajun/sparkjars/bcprov-jdk15on-1.52.jar -> hdfs://vm10.60.0.11.com.cn:8020/user/boco/.sparkStaging/application_1548381669007_0816/bcprov-jdk15on-1.52.jar
exitVal:false
proc.isAlive():false
-- ::,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
-- ::,开始获取applicationId:
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:)
at java.io.BufferedInputStream.read(BufferedInputStream.java:)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:)
at java.io.InputStreamReader.read(InputStreamReader.java:)
at java.io.BufferedReader.fill(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
-- ::,结束获取applicationId:null
-- ::,开始获取process Id
-- ::,结束获取process Id:
proc.isAlive():false
Complete:null
-- ::,开始获取if (proc != null && proc.isAlive())
-- ::,结束获取if (proc != null && proc.isAlive())
return the applicationId:null

备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。

1)获取不到applicationId,且此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,如果已经将spark任务提交到yarn上去了,此时yarn上的spark程序不会被关闭。

设置超时时间为5 minutes

  • --deploy-mode cluster方式,设置超时时间为5 minutes
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
// :: INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
====================================Index of applicationId:application_1548381669007_0828
====================================Index of applicationId:Complete ...
exitVal:false
proc.isAlive():true
-- ::,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
-- ::,开始获取applicatioId = futureError.get();:
-- ::,结束获取applicatioId = futureError.get();:application_1548381669007_0828
Complete:application_1548381669007_0828
-- ::,开始获取if (proc != null && proc.isAlive())
-- ::,结束获取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0828
^Cbash-4.1$

此时手动结束进程,不会终止yarn上的spark程序

  • --deploy-mode client方式,设置超时时间为5 minutes
// :: INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
====================================Index of applicationId:application_1548381669007_0829
====================================Index of applicationId:Complete ...
the value is :
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
|-- int_id: long (nullable = true) root
|-- int_id: string (nullable = false)
|-- job_result: string (nullable = true) Query started: a82ad759-8b14-4d58-93a3-8bed7617dd9c
-------------------------------------------
Batch:
-------------------------------------------
listener...application_1548381669007_0829
+------+----------+
|int_id|job_result|
+------+----------+
| | null|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | ,|
| | null|
| | null|
| | null|
| | null|
| | null|
| | ,|
| | ,|
| | ,|
| | ,|
+------+----------+
only showing top rows
。。。
listener...application_1548381669007_0829
Query made progress: {
"id" : "a82ad759-8b14-4d58-93a3-8bed7617dd9c",
"runId" : "a53447f1-056e-4d84-b27e-7007829bc1e2",
"name" : null,
"timestamp" : "2019-04-02T03:43:10.001Z",
"batchId" : ,
"numInputRows" : ,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 1584.7860538827258,
"durationMs" : {
"addBatch" : ,
"getBatch" : ,
"getOffset" : ,
"queryPlanning" : ,
"triggerExecution" : ,
"walCommit" :
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateSource[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=64]",
"startOffset" : ,
"endOffset" : ,
"numInputRows" : ,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 1584.7860538827258
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@58975f19"
}
}
the value is :
Trigger accumulator value:
Load count accumulator value:
exitVal:false
proc.isAlive():false
-- ::,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
-- ::,开始获取applicatioId = futureError.get();:
-- ::,结束获取applicatioId = futureError.get();:application_1548381669007_0829
Complete:application_1548381669007_0829
-- ::,开始获取if (proc != null && proc.isAlive())
-- ::,结束获取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0829
java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:)
at java.io.BufferedInputStream.read(BufferedInputStream.java:)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:)
at java.io.InputStreamReader.read(InputStreamReader.java:)
at java.io.BufferedReader.fill(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at java.io.BufferedReader.readLine(BufferedReader.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at com.dx.test.StreamFilterTask.call(Submit.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
#这里这个错误被程序catch的错误,打印出来的错误,不会导致程序中心。
^Cbash-4.1$

此时手动结束进程,将会终止yarn上的spark程序

参考《JAVA调用Shell脚本--及阻塞的解决办法

上一篇:spring security 方法权限使用


下一篇:Spring Security 4 使用@PreAuthorize,@PostAuthorize, @Secured, EL实现方法安全