Sedona spatial data visualization source code analysis

This article is automatically published through MetaWeblog, the original text and updated links: https://extendswind.top/posts/technical/sedona_spatial_big_data_visualization

Sedona (GeoSpark) spatial data visualization process is not too complicated, mainly the mapping of each spatial object to the corresponding raster space, similar to vector to raster. Go directly to the code:


// A slineRDD object of type LineStringRDD has been created through Sedona

import org.apache.sedona.viz.core.ImageGenerator
import org.apache.sedona.viz.extension.visualizationEffect.ScatterPlot
import org.apache.sedona.viz.utils.ImageType
import java.awt.Color

// create new object
var visualizationOperator = new ScatterPlot(10000, 10000, slineRDD.boundaryEnvelope, false)
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.blue, true)

// Specific visualization process realization
visualizationOperator.Visualize(sc, slineRDD)

// Save the result as a PNG image
var imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "/home/sparkl/visual", ImageType.PNG)

The specific visualization operation is in the visualizationOperator.Visualize(sc, slineRDD) function, which is divided into three steps: visualize, colorize and renderImage. The specific operations are as follows:

public boolean Visualize(JavaSparkContext sparkContext, SpatialRDD spatialRDD)
            throws Exception
{
  // The return value is JavaPairRDD<Pixel, Double>
  // Each pixel in the raster image corresponds to a Pixel, including x, y information, and Double is related to the attribute value
  this.Rasterize(sparkContext, spatialRDD, true);

  // Modify the value of value and assign color by value value
  this.Colorize();
  this.RenderImage(sparkContext);
  return true;
}

Rasterize

The Rasterize function is the most important conversion function from vector to raster. It first calculates the pixel position and pixel value corresponding to each spatial object through the flatMapToPair operator, and then filter s out the pixel values ​​that are out of range. In Sedona's implementation, the value does not consider the attributes of the spatial entity, and the value of each pixel is directly assigned 1.0.

protected JavaPairRDD<Pixel, Double> Rasterize(JavaSparkContext sparkContext,
        SpatialRDD spatialRDD, boolean useSparkDefaultPartition)
{
    JavaRDD<Object> rawSpatialRDD = spatialRDD.rawSpatialRDD;
	// The rawSpatialRDD contains the corresponding spatial objects, and the Pixel corresponding to each spatial object is calculated by the flatMapToPair operator to obtain a key-value pair of type <Pixel, Double>
    JavaPairRDD<Pixel, Double> spatialRDDwithPixelId = rawSpatialRDD.flatMapToPair(new PairFlatMapFunction<Object, Pixel, Double>(){
        @Override
        public Iterator<Tuple2<Pixel, Double>> call(Object spatialObject) throws Exception {
			// Divide into different data types for processing
            if (spatialObject instanceof Point) {
                return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (Point) spatialObject, colorizeOption, reverseSpatialCoordinate).iterator();
            }
            else if (spatialObject instanceof Polygon) {
                return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (Polygon) spatialObject, reverseSpatialCoordinate).iterator();
            }
            else if (spatialObject instanceof LineString) {
				// Calculate the <pixel position, pixel value> in the raster image corresponding to each spatial object, and a list of pixel values ​​will be obtained
                return RasterizationUtils.FindPixelCoordinates(resolutionX, resolutionY, datasetBoundary, (LineString) spatialObject, reverseSpatialCoordinate).iterator();
            }
            else {
				// Only the three data types above are supported
                throw new Exception("[Sedona-VizViz][Rasterize] Unsupported spatial object types. Sedona-VizViz only supports Point, Polygon, LineString");
            }
        }
    });
	
	// Remove points that are not within the range (this is more suitable for direct processing in the previous step?)
	spatialRDDwithPixelId = spatialRDDwithPixelId.filter(new Function<Tuple2<Pixel, Double>, Boolean>()
    {
        @Override
        public Boolean call(Tuple2<Pixel, Double> pixelCount)
                throws Exception
        {
            return !(pixelCount._1().getX() < 0) && !(pixelCount._1().getX() > resolutionX) && !(pixelCount._1().getY() < 0) && !(pixelCount._1().getY() > resolutionY);
        }
    });

    this.distributedRasterCountMatrix = spatialRDDwithPixelId;
    return this.distributedRasterCountMatrix;
}

The calculation of the pixel value divides the LineString into line segments composed of two points for calculation, first calculates the position of each point in the raster image, and then calculates the point that passes through the middle when the coordinates of the two raster images are formed into a line segment.


public static List<Tuple2<Pixel, Double>> FindPixelCoordinates(int resolutionX, int resolutionY, Envelope datasetBoundary, LineString spatialObject, boolean reverseSpatialCoordinate) {
    List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>();
    for (int i = 0; i < spatialObject.getCoordinates().length - 1; i++) {
        Tuple2<Integer, Integer> pixelCoordinate1 = null;
        Tuple2<Integer, Integer> pixelCoordinate2 = null;
        try {
			// The position of two points in the raster map
            pixelCoordinate1 = FindOnePixelCoordinate(resolutionX, resolutionY, datasetBoundary, spatialObject.getCoordinates()[i], reverseSpatialCoordinate);
            pixelCoordinate2 = FindOnePixelCoordinate(resolutionX, resolutionY, datasetBoundary, spatialObject.getCoordinates()[i + 1], reverseSpatialCoordinate);
        }
        catch (Exception e) {
            // This line segment is out of boundary, Should be ignored.
            continue;
        }
		// Calculate the other grid points that pass through after connecting the line segment through the coordinate positions of the two grid images, and add the result to the result
        result.addAll(FindPixelCoordinates(resolutionX, resolutionY, pixelCoordinate1, pixelCoordinate2, reverseSpatialCoordinate));
    }
    return result;
}

// Calculate the position of a point in a raster image
public static Tuple2<Integer, Integer> FindOnePixelCoordinate(int resolutionX, int resolutionY, Envelope datasetBoundaryOriginal, Coordinate spatialCoordinateOriginal, boolean reverseSpatialCoordinate) {
    Coordinate spatialCoordinate;
    Envelope datasetBoundary;
    if (reverseSpatialCoordinate) {
        spatialCoordinate = new Coordinate(spatialCoordinateOriginal.y, spatialCoordinateOriginal.x);
        datasetBoundary = new Envelope(datasetBoundaryOriginal.getMinY(), datasetBoundaryOriginal.getMaxY(), datasetBoundaryOriginal.getMinX(), datasetBoundaryOriginal.getMaxX());
    }
    else {
        spatialCoordinate = spatialCoordinateOriginal;
        datasetBoundary = datasetBoundaryOriginal;
    }
    Double pixelXDouble = ((spatialCoordinate.x - datasetBoundary.getMinX()) / (datasetBoundary.getMaxX() - datasetBoundary.getMinX())) * resolutionX;
    Double xRemainder = (spatialCoordinate.x - datasetBoundary.getMinX()) % (datasetBoundary.getMaxX() - datasetBoundary.getMinX());
    Double pixelYDouble = ((spatialCoordinate.y - datasetBoundary.getMinY()) / (datasetBoundary.getMaxY() - datasetBoundary.getMinY())) * resolutionY;
    Double yRemainder = (spatialCoordinate.y - datasetBoundary.getMinY()) % (datasetBoundary.getMaxY() - datasetBoundary.getMinY());
    int pixelX = pixelXDouble.intValue();
    int pixelY = pixelYDouble.intValue();
    if (xRemainder == 0.0 && pixelXDouble != 0.0) {
        pixelX--;
    }
    if (pixelX >= resolutionX) {
        pixelX--;
    }
    if (yRemainder == 0.0 && pixelYDouble != 0) {
        pixelY--;
    }
    if (pixelY >= resolutionY) {
        pixelY--;
    }
    return new Tuple2<Integer, Integer>(pixelX, pixelY);
}

// Calculate the point through which the line segment passes through the points on the two raster images
public static List<Tuple2<Pixel, Double>> FindPixelCoordinates(int resolutionX, int resolutionY, Tuple2<Integer, Integer> pixelCoordinate1, Tuple2<Integer, Integer> pixelCoordinate2, boolean reverseSpatialCoordinate) {
    // This function uses Bresenham's line algorithm to plot pixels touched by a given line segment.
    int x1 = pixelCoordinate1._1;
    int y1 = pixelCoordinate1._2;
    int x2 = pixelCoordinate2._1;
    int y2 = pixelCoordinate2._2;
    int dx = x2 - x1;
    int dy = y2 - y1;
    int ux = dx > 0 ? 1 : -1; // x direction
    int uy = dy > 0 ? 1 : -1; // y direction
    int x = x1, y = y1;
    int eps = 0; //cumulative errors
    dx = Math.abs(dx);
    dy = Math.abs(dy);
    List<Tuple2<Pixel, Double>> result = new ArrayList<Tuple2<Pixel, Double>>();
    if (dx > dy) {
        for (x = x1; x != x2; x += ux) {
            try {
                Pixel newPixel = new Pixel(x, y, resolutionX, resolutionY);
                result.add(new Tuple2<Pixel, Double>(newPixel, 1.0));
            }
            catch (Exception e) {
                 // This spatial object is out of the given dataset boudanry. It is ignored here.
            }
            eps += dy;
            if ((eps << 1) >= dx) {  // The x value is +1 each time, and the y value is +1 when eps/2 > dx
                y += uy;
                eps -= dx;
            }
        }
    }
    else {
        for (y = y1; y != y2; y += uy) {
            try {
                Pixel newPixel = new Pixel(x, y, resolutionX, resolutionY);
                result.add(new Tuple2<Pixel, Double>(newPixel, 1.0));
            }
            catch (Exception e) {
                 // This spatial object is out of the given dataset boudanry. It is ignored here.
            }
            eps += dx;
            if ((eps << 1) >= dy) {
                x += ux;
                eps -= dy;
            }
        }
    }
    return result;
}

colorize

The above Visualize function gets an RDD of distributedRasterCountMatrix, the key is the Pixel position of the Pixel, and the value is the Double type representing the number of points (I feel that this Double type can be used directly with Integer). colorize uses a mapValues ​​operation to normalize the value and assign it to a color value of type Integer.


  this.distributedRasterColorMatrix = this.distributedRasterCountMatrix.mapValues(new Function<Double, Integer>()
  {

      @Override
      public Integer call(Double pixelCount)
              throws Exception
      {
          Double currentPixelCount = pixelCount;
          if (currentPixelCount > maxWeight) {
              currentPixelCount = maxWeight;
          }
          Double normalizedPixelCount = (currentPixelCount - minWeight) * 255 / (maxWeight - minWeight);
          Integer pixelColor = EncodeToRGB(normalizedPixelCount.intValue());
          return pixelColor;
      }
  });
  //logger.debug("[Sedona-VizViz][Colorize]output count "+this.distributedRasterColorMatrix.count());
  logger.info("[Sedona-VizViz][Colorize][Stop]");
  return true;

RenderImage

The RenderImage function converts the <Pixel, Integer> key-value pair obtained in the previous step into an image, and obtains the result in the form of ImageSerializableWrapper. The parallelRenderImage parameter in the function defaults to false, and directly processes each partition in parallel to obtain a complete image, and obtains the result of <Integer, ImageSerializablewrapper>, where Integer is all 0, and then ReduceByKey superimposes all images to obtain the result.

When the ParalleRenderImage option is true, the distributedRasterColorMatrix in the previous step is firstly spatially partitioned, so that each partition corresponds to a part of the result image, and the corresponding regions are processed in parallel. The obtained result is stored in distributedRasterImage.

The shuffle process of the method whose parallelRenderImage parameter is false transmits the complete result image generated by each partition to the same node for superposition calculation. When the result image is large, it will cause a lot of shuffle IO. When the parameter is true, the shuffle process mainly comes from the space division of the original data, and has higher requirements for the effect of space division.


protected boolean RenderImage(JavaSparkContext sparkContext) throws Exception {
    if (this.parallelRenderImage == true) {
        if (this.hasBeenSpatialPartitioned == false) {
            this.spatialPartitioningWithoutDuplicates();
            this.hasBeenSpatialPartitioned = true;
        }
        this.distributedRasterImage = this.distributedRasterColorMatrix.mapPartitionsToPair(
                new PairFlatMapFunction<Iterator<Tuple2<Pixel, Integer>>, Integer, ImageSerializableWrapper>()
                {
                    @Override
                    public Iterator<Tuple2<Integer, ImageSerializableWrapper>> call(Iterator<Tuple2<Pixel, Integer>> currentPartition)
                            throws Exception
                    {
                        BufferedImage imagePartition = new BufferedImage(partitionIntervalX, partitionIntervalY, BufferedImage.TYPE_INT_ARGB);
                        Tuple2<Pixel, Integer> pixelColor = null;
                        while (currentPartition.hasNext()) {
                            //Render color in this image partition pixel-wise.
                            pixelColor = currentPartition.next();
                            if (pixelColor._1().getX() < 0 || pixelColor._1().getX() >= resolutionX || pixelColor._1().getY() < 0 || pixelColor._1().getY() >= resolutionY) {
                                pixelColor = null;
                                continue;
                            }
                            imagePartition.setRGB((int) pixelColor._1().getX() % partitionIntervalX, (partitionIntervalY - 1) - (int) pixelColor._1().getY() % partitionIntervalY, pixelColor._2);
                        }
                        List<Tuple2<Integer, ImageSerializableWrapper>> result = new ArrayList<Tuple2<Integer, ImageSerializableWrapper>>();
                        if (pixelColor == null) {
                            // No pixels in this partition. Skip this subimage
                            return result.iterator();
                        }
                        logger.info("[Sedona-VizViz][Render]add a image partition into result set " + pixelColor._1().getCurrentPartitionId());
                        result.add(new Tuple2<Integer, ImageSerializableWrapper>(pixelColor._1().getCurrentPartitionId(), new ImageSerializableWrapper(imagePartition)));
                        return result.iterator();
                    }
                });
    }
    else if (this.parallelRenderImage == false) {
        // Draw full size image in parallel
        this.distributedRasterImage = this.distributedRasterColorMatrix.mapPartitionsToPair(
                new PairFlatMapFunction<Iterator<Tuple2<Pixel, Integer>>, Integer, ImageSerializableWrapper>()
                {
                    @Override
                    public Iterator<Tuple2<Integer, ImageSerializableWrapper>> call(Iterator<Tuple2<Pixel, Integer>> currentPartition)
                            throws Exception
                    {
                        BufferedImage imagePartition = new BufferedImage(resolutionX, resolutionY, BufferedImage.TYPE_INT_ARGB);
                        Tuple2<Pixel, Integer> pixelColor = null;
                        while (currentPartition.hasNext()) {
                            //Render color in this image partition pixel-wise.
                            pixelColor = currentPartition.next();
                            if (pixelColor._1().getX() < 0 || pixelColor._1().getX() >= resolutionX || pixelColor._1().getY() < 0 || pixelColor._1().getY() >= resolutionY) {
                                pixelColor = null;
                                continue;
                            }
                            imagePartition.setRGB((int) pixelColor._1().getX(), (resolutionY - 1) - (int) pixelColor._1().getY(), pixelColor._2);
                        }
                        List<Tuple2<Integer, ImageSerializableWrapper>> result = new ArrayList<Tuple2<Integer, ImageSerializableWrapper>>();
                        if (pixelColor == null) {
                            // No pixels in this partition. Skip this subimage
                            return result.iterator();
                        }
                        result.add(new Tuple2<Integer, ImageSerializableWrapper>(1, new ImageSerializableWrapper(imagePartition)));
                        return result.iterator();
                    }
                });
        // Merge images together using reduce

        this.distributedRasterImage = this.distributedRasterImage.reduceByKey(new Function2<ImageSerializableWrapper, ImageSerializableWrapper, ImageSerializableWrapper>()
        {
            @Override
            public ImageSerializableWrapper call(ImageSerializableWrapper image1, ImageSerializableWrapper image2)
                    throws Exception
            {
                // The combined image should be a full size image
                BufferedImage combinedImage = new BufferedImage(resolutionX, resolutionY, BufferedImage.TYPE_INT_ARGB);
                Graphics graphics = combinedImage.getGraphics();
                graphics.drawImage(image1.image, 0, 0, null);
                graphics.drawImage(image2.image, 0, 0, null);
                return new ImageSerializableWrapper(combinedImage);
            }
        });
        List<Tuple2<Integer, ImageSerializableWrapper>> imageList = this.distributedRasterImage.collect();
        this.rasterImage = imageList.get(0)._2().image;
    }
    logger.info("[Sedona-VizViz][RenderImage][Stop]");
    return true;
}

Tags: Hadoop gis

Posted by lisa99 on Wed, 12 Oct 2022 12:02:28 +0300