如何在PySpark中的不同线程中在一个Sparkcontext中运行多个作业?

从Spark文档中可以了解到Scheduling Within an Application

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users).”

我在Scala和Java中找到了相同的示例代码.
有人可以举例说明如何使用PySpark实现这一点吗?

解决方法:

我遇到了同样的问题,所以我创建了一个很小的自包含示例.我使用python的线程模块创建多个线程,并同时提交多个spark作业.

请注意,默认情况下,spark将以先进先出(FIFO):http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application运行作业.在下面的示例中,我将其更改为FAIR调度

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

输出:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000
上一篇:Zeppelin:Scala Dataframe to python


下一篇:python – 如何在pySpark数据帧中添加行ID [复制]