GeoSpark是一种用于大规模空间数据处理的集群计算。 GeoSpark通过一组out-of-the-box空间弹性分布式数据集( SRDDs ) 扩展 Apache Spark,它可以跨机器高效地加载。处理、分析、展示大规模空间数据。
准备工作
- Windows 和 spark
- IDEA
-
GeoSpark支持Java、Scala两种,本次开发语言选择Java。
GeoSpark
参考https://github.com/jiayuasu/GeoSparkTemplateProject,下载项目到本地。
GeoSpark-Viz Java项目构建
cd ./geospark-viz/java
mvn clean install
由于项目中的数据生成图片不太满意,将map.shp数据解析成polygon.csv,修改下java代码
ConfFile= new FileInputStream(resourcePath+"babylon.polygon2.properties");
通过buildChoroplethMap统计面内得点数生成分级统计图,修改buildScatterPlot和
buildHeatMap输入数据为点数据生成散点图和热力图。
完整代码:
package example;
import com.vividsolutions.jts.geom.Envelope;
import com.vividsolutions.jts.geom.Polygon;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.datasyslab.geospark.enums.FileDataSplitter;
import org.datasyslab.geospark.enums.GridType;
import org.datasyslab.geospark.enums.IndexType;
import org.datasyslab.geospark.formatMapper.EarthdataHDFPointMapper;
import org.datasyslab.geospark.spatialOperator.JoinQuery;
import org.datasyslab.geospark.spatialRDD.PointRDD;
import org.datasyslab.geospark.spatialRDD.PolygonRDD;
import org.datasyslab.geospark.spatialRDD.RectangleRDD;
import org.datasyslab.geosparkviz.core.ImageGenerator;
import org.datasyslab.geosparkviz.core.ImageStitcher;
import org.datasyslab.geosparkviz.core.RasterOverlayOperator;
import org.datasyslab.geosparkviz.core.Serde.GeoSparkVizKryoRegistrator;
import org.datasyslab.geosparkviz.extension.visualizationEffect.ChoroplethMap;
import org.datasyslab.geosparkviz.extension.visualizationEffect.HeatMap;
import org.datasyslab.geosparkviz.extension.visualizationEffect.ScatterPlot;
import org.datasyslab.geosparkviz.utils.ColorizeOption;
import org.datasyslab.geosparkviz.utils.ImageType;
import java.awt.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
// TODO: Auto-generated Javadoc
/**
* The Class Example.
*/
public class Example2019 {
/** The spark context. */
static JavaSparkContext sparkContext;
/** The prop. */
static Properties prop;
/** The Point input location. */
static String PointInputLocation;
/** The Point offset. */
static Integer PointOffset;
/** The Point splitter. */
static FileDataSplitter PointSplitter;
/** The Point num partitions. */
static Integer PointNumPartitions;
/** The Rectangle input location. */
static String RectangleInputLocation;
/** The Rectangle offset. */
static Integer RectangleOffset;
/** The Rectangle splitter. */
static FileDataSplitter RectangleSplitter;
/** The Rectangle num partitions. */
static Integer RectangleNumPartitions;
/** The Polygon input location. */
static String PolygonInputLocation;
/** The Polygon offset. */
static Integer PolygonOffset;
/** The Polygon splitter. */
static FileDataSplitter PolygonSplitter;
/** The Polygon num partitions. */
static Integer PolygonNumPartitions;
/** The Line string input location. */
static String LineStringInputLocation;
/** The Line string offset. */
static Integer LineStringOffset;
/** The Line string splitter. */
static FileDataSplitter LineStringSplitter;
/** The Line string num partitions. */
static Integer LineStringNumPartitions;
/** The US main land boundary. */
static Envelope USMainLandBoundary;
/** The earthdata input location. */
static String earthdataInputLocation;
/** The earthdata num partitions. */
static Integer earthdataNumPartitions;
/** The HDF increment. */
static int HDFIncrement = 5;
/** The HDF offset. */
static int HDFOffset = 2;
/** The HDF root group name. */
static String HDFRootGroupName = "MOD_Swath_LST";
/** The HDF data variable name. */
static String HDFDataVariableName = "LST";
/** The HDF data variable list. */
static String[] HDFDataVariableList = {"LST","QC","Error_LST","Emis_31","Emis_32"};
/** The HD fswitch XY. */
static boolean HDFswitchXY = true;
/** The url prefix. */
static String urlPrefix = "";
/**
* Builds the scatter plot.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean buildScatterPlot(String outputPath)
{
try{
PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
//PolygonRDD spatialRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions, StorageLevel.MEMORY_ONLY());
ScatterPlot visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false);
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
visualizationOperator.Visualize(sparkContext, spatialRDD);
ImageGenerator imageGenerator = new ImageGenerator();
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG);
// visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,-1,-1,false,true);
// visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
// visualizationOperator.Visualize(sparkContext, spatialRDD);
// imageGenerator = new ImageGenerator();
// imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, outputPath,ImageType.SVG);
//
// visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,-1,-1,true,true);
// visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
// visualizationOperator.Visualize(sparkContext, spatialRDD);
// imageGenerator = new ImageGenerator();
// imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, outputPath+"-distributed",ImageType.SVG);
//
}
catch(Exception e)
{
e.printStackTrace();
return false;
}
return true;
}
/**
* Builds the heat map.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean buildHeatMap(String outputPath)
{
try{
PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
HeatMap visualizationOperator = new HeatMap(1000,600,USMainLandBoundary,false,5);
visualizationOperator.Visualize(sparkContext, spatialRDD);
ImageGenerator imageGenerator = new ImageGenerator();
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath,ImageType.PNG);
}
catch(Exception e)
{
e.printStackTrace();
return false;
}
return true;
}
/**
* Builds the choropleth map.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean buildChoroplethMap(String outputPath)
{
try{
PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
PolygonRDD queryRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions, StorageLevel.MEMORY_ONLY());
spatialRDD.spatialPartitioning(GridType.RTREE);
queryRDD.spatialPartitioning(spatialRDD.grids);
spatialRDD.buildIndex(IndexType.RTREE,true);
JavaPairRDD<Polygon,Long> joinResult = JoinQuery.SpatialJoinQueryCountByKey(spatialRDD,queryRDD,true,false);
long start = System.currentTimeMillis();
ChoroplethMap visualizationOperator = new ChoroplethMap(1000,600,USMainLandBoundary,false);
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.RED, true);
visualizationOperator.Visualize(sparkContext, joinResult);
ScatterPlot frontImage = new ScatterPlot(1000,600,USMainLandBoundary,false);
frontImage.CustomizeColor(0, 0, 0, 255, Color.GREEN, true);
frontImage.Visualize(sparkContext, queryRDD);
RasterOverlayOperator overlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage);
overlayOperator.JoinImage(frontImage.rasterImage);
ImageGenerator imageGenerator = new ImageGenerator();
//imageGenerator.SaveRasterImageAsLocalFile(frontImage.rasterImage, outputPath,ImageType.PNG);
imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, outputPath,ImageType.PNG);
//imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath,ImageType.PNG);
//ImageStitcher.stitchImagePartitionsFromLocalFile(outputPath, 1000,600,0,4, 4);
System.out.println("散点图生成完成,共耗时" + (System.currentTimeMillis() - start) + "ms");
}
catch(Exception e)
{
e.printStackTrace();
return false;
}
return true;
}
/**
* Parallel filter render no stitch.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean parallelFilterRenderNoStitch(String outputPath)
{
try{
PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
HeatMap visualizationOperator = new HeatMap(1000,600,USMainLandBoundary,false,2,4,4,true,true);
visualizationOperator.Visualize(sparkContext, spatialRDD);
ImageGenerator imageGenerator = new ImageGenerator();
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath,ImageType.PNG);
}
catch(Exception e)
{
e.printStackTrace();
return false;
}
return true;
}
/**
* Parallel filter render stitch.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean parallelFilterRenderStitch(String outputPath)
{
try{
PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
HeatMap visualizationOperator = new HeatMap(1000,600,USMainLandBoundary,false,2,4,4,true,true);
visualizationOperator.Visualize(sparkContext, spatialRDD);
ImageGenerator imageGenerator = new ImageGenerator();
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath,ImageType.PNG);
ImageStitcher.stitchImagePartitionsFromLocalFile(outputPath, 1000,600,0,4, 4);
}
catch(Exception e)
{
e.printStackTrace();
return false;
}
return true;
}
/**
* Earthdata visualization.
*
* @param outputPath the output path
* @return true, if successful
*/
public static boolean earthdataVisualization(String outputPath)
{
try {
EarthdataHDFPointMapper earthdataHDFPoint = new EarthdataHDFPointMapper(HDFIncrement,HDFOffset,HDFRootGroupName,
HDFDataVariableList,HDFDataVariableName,HDFswitchXY,urlPrefix);
PointRDD spatialRDD = new PointRDD(sparkContext, earthdataInputLocation, earthdataNumPartitions, earthdataHDFPoint,StorageLevel.MEMORY_ONLY());
ScatterPlot visualizationOperator = new ScatterPlot(1000,600,spatialRDD.boundaryEnvelope,ColorizeOption.EARTHOBSERVATION,false,false);
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.BLUE, true);
visualizationOperator.Visualize(sparkContext, spatialRDD);
ImageGenerator imageGenerator = new ImageGenerator();
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* The main method.
*
* @param args the arguments
* @throws IOException Signals that an I/O exception has occurred.
*/
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
Logger.getLogger("org").setLevel(Level.WARN);
Logger.getLogger("akka").setLevel(Level.WARN);
SparkConf sparkConf = new SparkConf().setAppName("GeoSparkVizDemo").setMaster("local[*]").set("spark.serializer", KryoSerializer.class.getName())
.set("spark.kryo.registrator", GeoSparkVizKryoRegistrator.class.getName());
sparkContext = new JavaSparkContext(sparkConf);
prop = new Properties();
String resourcePath = "src/test/resources/";
String demoOutputPath = "target/demo";
FileInputStream ConfFile= new FileInputStream(resourcePath+"babylon.point.properties");
prop.load(ConfFile);
String scatterPlotOutputPath = System.getProperty("user.dir")+"/"+demoOutputPath + "/scatterplot";
String heatMapOutputPath = System.getProperty("user.dir")+"/"+demoOutputPath+"/heatmap";
String choroplethMapOutputPath = System.getProperty("user.dir")+"/"+demoOutputPath+"/choroplethmap";
String parallelFilterRenderStitchOutputPath = System.getProperty("user.dir")+"/"+demoOutputPath+"/parallelfilterrenderstitchheatmap";
String earthdataScatterPlotOutputPath = System.getProperty("user.dir")+"/"+demoOutputPath+"/earthdatascatterplot";
PointInputLocation = System.getProperty("user.dir")+"/"+resourcePath+prop.getProperty("inputLocation");
PointOffset = Integer.parseInt(prop.getProperty("offset"));;
PointSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter"));
PointNumPartitions = Integer.parseInt(prop.getProperty("numPartitions"));
ConfFile= new FileInputStream(resourcePath+"babylon.rectangle.properties");
prop.load(ConfFile);
RectangleInputLocation = System.getProperty("user.dir")+"/"+resourcePath+prop.getProperty("inputLocation");
RectangleOffset = Integer.parseInt(prop.getProperty("offset"));
RectangleSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter"));
RectangleNumPartitions = Integer.parseInt(prop.getProperty("numPartitions"));
ConfFile= new FileInputStream(resourcePath+"babylon.polygon2.properties");
prop.load(ConfFile);
PolygonInputLocation = System.getProperty("user.dir")+"/"+resourcePath+prop.getProperty("inputLocation");
PolygonOffset = Integer.parseInt(prop.getProperty("offset"));
PolygonSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter"));
PolygonNumPartitions = Integer.parseInt(prop.getProperty("numPartitions"));
ConfFile= new FileInputStream(resourcePath+"babylon.linestring.properties");
prop.load(ConfFile);
LineStringInputLocation = System.getProperty("user.dir")+"/"+resourcePath+prop.getProperty("inputLocation");
LineStringOffset = Integer.parseInt(prop.getProperty("offset"));
LineStringSplitter = FileDataSplitter.getFileDataSplitter(prop.getProperty("splitter"));
LineStringNumPartitions = Integer.parseInt(prop.getProperty("numPartitions"));
USMainLandBoundary = new Envelope(-126.790180,-64.630926,24.863836,50.000);
earthdataInputLocation = System.getProperty("user.dir")+"/src/test/resources/modis/modis.csv";
earthdataNumPartitions = 5;
HDFIncrement=5;
HDFOffset=2;
HDFRootGroupName = "MOD_Swath_LST";
HDFDataVariableName = "LST";
HDFswitchXY = true;
urlPrefix = System.getProperty("user.dir")+"/src/test/resources/modis/";
if(buildScatterPlot(scatterPlotOutputPath)&&buildHeatMap(heatMapOutputPath)
&&buildChoroplethMap(choroplethMapOutputPath)&¶llelFilterRenderStitch(parallelFilterRenderStitchOutputPath+"-stitched")
&¶llelFilterRenderNoStitch(parallelFilterRenderStitchOutputPath)&&earthdataVisualization(earthdataScatterPlotOutputPath))
{
System.out.println("散点图生成完成,共耗时" + (System.currentTimeMillis() - start) + "ms");
System.out.println("All GeoSparkViz Demos have passed.");
}
else
{
System.out.println("GeoSparkViz Demos failed.");
}
sparkContext.stop();
}
}
项目可视化一览
参考资料:
https://www.jianshu.com/p/1a531de087df
https://www.helplib.com/GitHub/article_127813