Sedona (GeoSpark) spatial data visualization process is not too complicated, mainly the mapping of each spatial object to the corresponding raster space, similar to vector to raster. Go directly to the code:
// A slineRDD object of type LineStringRDD has been created through Sedona import org.apache.sedona.viz.core.ImageGenerator import org.apache.sedona.viz.extension.visualizationEffect.ScatterPlot import org.apache.sedona.viz.utils.ImageType import java.awt.Color // create new object var visualizationOperator = new ScatterPlot(10000, 10000, slineRDD.boundaryEnvelope, false) visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.blue, true) // Specific visualization process realization visualizationOperator.Visualize(sc, slineRDD) // Save the result as a PNG image var imageGenerator = new ImageGenerator imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "/home/sparkl/visual", ImageType.PNG)
The specific visualization operation is in the visualizationOperator.Visualize(sc, slineRDD) function, which is divided into three steps: visualize, colorize and renderImage. The specific operations are as follows:
public boolean Visualize(JavaSparkContext sparkContext, SpatialRDD spatialRDD) throws Exception { // The return value is JavaPairRDD<Pixel, Double> // Each pixel in the raster image corresponds to a Pixel, including x, y information, and Double is related to the attribute value this.Rasterize(sparkContext, spatialRDD, true); // Modify the value of value and assign color by value value this.Colorize(); this.RenderImage(sparkContext); return true; }
Rasterize
The Rasterize function is the most important conversion function from vector to raster. It first calculates the pixel position and pixel value corresponding to each spatial object through the flatMapToPair operator, and then filter s out the pixel values that are out of range. In Sedona's implementation, the value does not consider the attributes of the spatial entity, and the value of each pixel is directly assigned 1.0.
protected JavaPairRDD<Pixel, Double> Rasterize(JavaSparkContext sparkContext, SpatialRDD spatialRDD, boolean useSparkDefaultPartition) { JavaRDD<Object> rawSpatialRDD = spatialRDD.rawSpatialRDD; // The rawSpatialRDD contains the corresponding spatial objects, and the Pixel corresponding to each spatial object is calculated by the flatMapToPair operator to obtain a key-value pair of type <Pixel, Double> JavaPairRDD<Pixel, Double> spatialRDDwithPixelId = rawSpatialRDD.flatMapToPair(new PairFlatMapFunction<Object, Pixel, Double>(){ @Override public Iterator<Tuple2<Pixel, Double>> call(Object spatialObject) throws Exception { // Divide into different data types for processing if (spatialObject instanceof Point) { return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (Point) spatialObject, colorizeOption, reverseSpatialCoordinate).iterator(); } else if (spatialObject instanceof Polygon) { return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (Polygon) spatialObject, reverseSpatialCoordinate).iterator(); } else if (spatialObject instanceof LineString) { // Calculate the <pixel position, pixel value> in the raster image corresponding to each spatial object, and a list of pixel values will be obtained return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (LineString) spatialObject, reverseSpatialCoordinate).iterator(); } else { // Only the three data types above are supported throw new Exception("[Sedona-VizViz][Rasterize] Unsupported spatial object types. Sedona-VizViz only supports Point, Polygon, LineString"); } } }); // Remove points that are not within the range (this is more suitable for direct processing in the previous step?) spatialRDDwithPixelId = spatialRDDwithPixelId.filter(new Function<Tuple2<Pixel, Double>, Boolean>() { @Override public Boolean call(Tuple2<Pixel, Double> pixelCount) throws Exception { return !(pixelCount._1().getX() < 0) && !(pixelCount._1().getX() > resolutionX) && !(pixelCount._1().getY() < 0) && !(pixelCount._1().getY() > resolutionY); } }); this.distributedRasterCountMatrix = spatialRDDwithPixelId; return this.distributedRasterCountMatrix; }
The calculation of the pixel value divides the LineString into line segments composed of two points for calculation, first calculates the position of each point in the raster image, and then calculates the point that passes through the middle when the coordinates of the two raster images are formed into a line segment.
public static List<Tuple2<Pixel, Double>> FindPixelCoordinates(int resolutionX, int resolutionY, Envelope datasetBoundary, LineString spatialObject, boolean reverseSpatialCoordinate) { List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>(); for (int i = 0; i < spatialObject.getCoordinates().length - 1; i++) { Tuple2<Integer, Integer> pixelCoordinate1 = null; Tuple2<Integer, Integer> pixelCoordinate2 = null; try { // The position of two points in the raster map pixelCoordinate1 = FindOnePixelCoordinate(resolutionX, resolutionY, datasetBoundary, spatialObject.getCoordinates()[i], reverseSpatialCoordinate); pixelCoordinate2 = FindOnePixelCoordinate(resolutionX, resolutionY, datasetBoundary, spatialObject.getCoordinates()[i + 1], reverseSpatialCoordinate); } catch (Exception e) { // This line segment is out of boundary, Should be ignored. continue; } // Calculate the other grid points that pass through after connecting the line segment through the coordinate positions of the two grid images, and add the result to the result result.addAll(FindPixelCoordinates(resolutionX, resolutionY, pixelCoordinate1, pixelCoordinate2, reverseSpatialCoordinate)); } return result; } // Calculate the position of a point in a raster image public static Tuple2<Integer, Integer> FindOnePixelCoordinate(int resolutionX, int resolutionY, Envelope datasetBoundaryOriginal, Coordinate spatialCoordinateOriginal, boolean reverseSpatialCoordinate) { Coordinate spatialCoordinate; Envelope datasetBoundary; if (reverseSpatialCoordinate) { spatialCoordinate = new Coordinate(spatialCoordinateOriginal.y, spatialCoordinateOriginal.x); datasetBoundary = new Envelope(datasetBoundaryOriginal.getMinY(), datasetBoundaryOriginal.getMaxY(), datasetBoundaryOriginal.getMinX(), datasetBoundaryOriginal.getMaxX()); } else { spatialCoordinate = spatialCoordinateOriginal; datasetBoundary = datasetBoundaryOriginal; } Double pixelXDouble = ((spatialCoordinate.x - datasetBoundary.getMinX()) / (datasetBoundary.getMaxX() - datasetBoundary.getMinX())) * resolutionX; Double xRemainder = (spatialCoordinate.x - datasetBoundary.getMinX()) % (datasetBoundary.getMaxX() - datasetBoundary.getMinX()); Double pixelYDouble = ((spatialCoordinate.y - datasetBoundary.getMinY()) / (datasetBoundary.getMaxY() - datasetBoundary.getMinY())) * resolutionY; Double yRemainder = (spatialCoordinate.y - datasetBoundary.getMinY()) % (datasetBoundary.getMaxY() - datasetBoundary.getMinY()); int pixelX = pixelXDouble.intValue(); int pixelY = pixelYDouble.intValue(); if (xRemainder == 0.0 && pixelXDouble != 0.0) { pixelX--; } if (pixelX >= resolutionX) { pixelX--; } if (yRemainder == 0.0 && pixelYDouble != 0) { pixelY--; } if (pixelY >= resolutionY) { pixelY--; } return new Tuple2<Integer, Integer>(pixelX, pixelY); } // Calculate the point through which the line segment passes through the points on the two raster images public static List<Tuple2<Pixel, Double>> FindPixelCoordinates(int resolutionX, int resolutionY, Tuple2<Integer, Integer> pixelCoordinate1, Tuple2<Integer, Integer> pixelCoordinate2, boolean reverseSpatialCoordinate) { // This function uses Bresenham's line algorithm to plot pixels touched by a given line segment. int x1 = pixelCoordinate1._1; int y1 = pixelCoordinate1._2; int x2 = pixelCoordinate2._1; int y2 = pixelCoordinate2._2; int dx = x2 - x1; int dy = y2 - y1; int ux = dx > 0 ? 1 : -1; // x direction int uy = dy > 0 ? 1 : -1; // y direction int x = x1, y = y1; int eps = 0; //cumulative errors dx = Math.abs(dx); dy = Math.abs(dy); List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>(); if (dx > dy) { for (x = x1; x != x2; x += ux) { try { Pixel newPixel = new Pixel(x, y, resolutionX, resolutionY); result.add(new Tuple2<Pixel, Double>(newPixel, 1.0)); } catch (Exception e) { // This spatial object is out of the given dataset boudanry. It is ignored here. } eps += dy; if ((eps << 1) >= dx) { // The x value is +1 each time, and the y value is +1 when eps/2 > dx y += uy; eps -= dx; } } } else { for (y = y1; y != y2; y += uy) { try { Pixel newPixel = new Pixel(x, y, resolutionX, resolutionY); result.add(new Tuple2<Pixel, Double>(newPixel, 1.0)); } catch (Exception e) { // This spatial object is out of the given dataset boudanry. It is ignored here. } eps += dx; if ((eps << 1) >= dy) { x += ux; eps -= dy; } } } return result; }
colorize
The above Visualize function gets an RDD of distributedRasterCountMatrix, the key is the Pixel position of the Pixel, and the value is the Double type representing the number of points (I feel that this Double type can be used directly with Integer). colorize uses a mapValues operation to normalize the value and assign it to a color value of type Integer.
this.distributedRasterColorMatrix = this.distributedRasterCountMatrix.mapValues(new Function<Double, Integer>() { @Override public Integer call(Double pixelCount) throws Exception { Double currentPixelCount = pixelCount; if (currentPixelCount > maxWeight) { currentPixelCount = maxWeight; } Double normalizedPixelCount = (currentPixelCount - minWeight) * 255 / (maxWeight - minWeight); Integer pixelColor = EncodeToRGB(normalizedPixelCount.intValue()); return pixelColor; } }); //logger.debug("[Sedona-VizViz][Colorize]output count "+this.distributedRasterColorMatrix.count()); logger.info("[Sedona-VizViz][Colorize][Stop]"); return true;
RenderImage
The RenderImage function converts the <Pixel, Integer> key-value pair obtained in the previous step into an image, and obtains the result in the form of ImageSerializableWrapper. The parallelRenderImage parameter in the function defaults to false, and directly processes each partition in parallel to obtain a complete image, and obtains the result of <Integer, ImageSerializablewrapper>, where Integer is all 0, and then ReduceByKey superimposes all images to obtain the result.
When the ParalleRenderImage option is true, the distributedRasterColorMatrix in the previous step is firstly spatially partitioned, so that each partition corresponds to a part of the result image, and the corresponding regions are processed in parallel. The obtained result is stored in distributedRasterImage.
The shuffle process of the method whose parallelRenderImage parameter is false transmits the complete result image generated by each partition to the same node for superposition calculation. When the result image is large, it will cause a lot of shuffle IO. When the parameter is true, the shuffle process mainly comes from the space division of the original data, and has higher requirements for the effect of space division.
protected boolean RenderImage(JavaSparkContext sparkContext) throws Exception { if (this.parallelRenderImage == true) { if (this.hasBeenSpatialPartitioned == false) { this.spatialPartitioningWithoutDuplicates(); this.hasBeenSpatialPartitioned = true; } this.distributedRasterImage = this.distributedRasterColorMatrix.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<Pixel, Integer>>, Integer, ImageSerializableWrapper>() { @Override public Iterator<Tuple2<Integer, ImageSerializableWrapper>> call(Iterator<Tuple2<Pixel, Integer>> currentPartition) throws Exception { BufferedImage imagePartition = new BufferedImage(partitionIntervalX, partitionIntervalY, BufferedImage.TYPE_INT_ARGB); Tuple2<Pixel, Integer> pixelColor = null; while (currentPartition.hasNext()) { //Render color in this image partition pixel-wise. pixelColor = currentPartition.next(); if (pixelColor._1().getX() < 0 || pixelColor._1().getX() >= resolutionX || pixelColor._1().getY() < 0 || pixelColor._1().getY() >= resolutionY) { pixelColor = null; continue; } imagePartition.setRGB((int) pixelColor._1().getX() % partitionIntervalX, (partitionIntervalY - 1) - (int) pixelColor._1().getY() % partitionIntervalY, pixelColor._2); } List<Tuple2<Integer, ImageSerializableWrapper>> result = new ArrayList<Tuple2<Integer, ImageSerializableWrapper>>(); if (pixelColor == null) { // No pixels in this partition. Skip this subimage return result.iterator(); } logger.info("[Sedona-VizViz][Render]add a image partition into result set " + pixelColor._1().getCurrentPartitionId()); result.add(new Tuple2<Integer, ImageSerializableWrapper>(pixelColor._1().getCurrentPartitionId(), new ImageSerializableWrapper(imagePartition))); return result.iterator(); } }); } else if (this.parallelRenderImage == false) { // Draw full size image in parallel this.distributedRasterImage = this.distributedRasterColorMatrix.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<Pixel, Integer>>, Integer, ImageSerializableWrapper>() { @Override public Iterator<Tuple2<Integer, ImageSerializableWrapper>> call(Iterator<Tuple2<Pixel, Integer>> currentPartition) throws Exception { BufferedImage imagePartition = new BufferedImage(resolutionX, resolutionY, BufferedImage.TYPE_INT_ARGB); Tuple2<Pixel, Integer> pixelColor = null; while (currentPartition.hasNext()) { //Render color in this image partition pixel-wise. pixelColor = currentPartition.next(); if (pixelColor._1().getX() < 0 || pixelColor._1().getX() >= resolutionX || pixelColor._1().getY() < 0 || pixelColor._1().getY() >= resolutionY) { pixelColor = null; continue; } imagePartition.setRGB((int) pixelColor._1().getX(), (resolutionY - 1) - (int) pixelColor._1().getY(), pixelColor._2); } List<Tuple2<Integer, ImageSerializableWrapper>> result = new ArrayList<Tuple2<Integer, ImageSerializableWrapper>>(); if (pixelColor == null) { // No pixels in this partition. Skip this subimage return result.iterator(); } result.add(new Tuple2<Integer, ImageSerializableWrapper>(1, new ImageSerializableWrapper(imagePartition))); return result.iterator(); } }); // Merge images together using reduce this.distributedRasterImage = this.distributedRasterImage.reduceByKey(new Function2<ImageSerializableWrapper, ImageSerializableWrapper, ImageSerializableWrapper>() { @Override public ImageSerializableWrapper call(ImageSerializableWrapper image1, ImageSerializableWrapper image2) throws Exception { // The combined image should be a full size image BufferedImage combinedImage = new BufferedImage(resolutionX, resolutionY, BufferedImage.TYPE_INT_ARGB); Graphics graphics = combinedImage.getGraphics(); graphics.drawImage(image1.image, 0, 0, null); graphics.drawImage(image2.image, 0, 0, null); return new ImageSerializableWrapper(combinedImage); } }); List<Tuple2<Integer, ImageSerializableWrapper>> imageList = this.distributedRasterImage.collect(); this.rasterImage = imageList.get(0)._2().image; } logger.info("[Sedona-VizViz][RenderImage][Stop]"); return true; }