Spark core components, operation architecture and RDD creation

Spark core components

Before explaining the Spark architecture, let's first understand several core components of Spark and find out their functions.
1. Application:Spark application
User programs built on Spark, including Driver code and code running in the Executor of each node of the cluster. It consists of one or more Job jobs.
As shown in the figure:

2. Driver program: Driver
The main() function in the Application and create the SparkContext. SparkContext is created to prepare the running environment of Spark. In Spark, SparkContext is responsible for establishing communication with ClusterManager, applying for resources, task allocation and monitoring, etc. After the Executor runs, the Driver closes the SparkContext. Often, SparkContext represents Driver.

3. Cluster Manager: Cluster Management (Resource Management)
Be responsible for obtaining external resources on the cluster. Common resource managers are:
Standalone: Spark's native resource manager, which is in the charge of the Master;
Mesos: Mesos Master is responsible for resource management;
YARN: YARN in Hadoop, which is in the charge of resource manager
4. Worker Node: work node (calculation node)
Any node in the cluster that can run Application code. We can think of each Worker Node as an independent computer. Similar to the NodeManager node in YARN. When we start Spark's Standalone mode or YARN mode, we need to start this node. When we install the spark configuration file, configure the hostname of each node in the Slave file. When we run in YARN mode, this node refers to the NodeManager node. In the Messos mode, it refers to the Messos Slave node.

5. Executor: executor
Application is a process running on the Worker. The process is responsible for running the Task and storing the data in memory or disk. Each application has its own batch of executors.
6,Task:
The unit of work sent to an Executor process;
7,Job
Parallel computing composed of multiple tasks is often triggered by Spark Action, and multiple jobs are often generated in one Application.
8,Stage
Each Job will be divided into multiple groups of tasks as a TaskSet, i.e. Stage

It can also be understood as follows:

Spark operation architecture

After understanding the core components of Spark, let's take a look at the operation architecture of Spark. As shown in the figure:

1. Driver Program creates SparkContext, establishes communication with ClusterManager, applies for resources from ClusterManager, and obtains task allocation and monitoring;
2. The Cluster Manager is responsible for applying for and managing the resources required to run on the Worker Node. We mentioned it earlier when we introduced the core components. There are three kinds of resource schedulers used:
Spark native: Cluster Manager;
YARN: ResourceManager
Mesos: Based on Master.
3. The Cluster Manager divides the obtained jobs into several stages and distributes them to different worker nodes. Calculate and process through the Executor;
4. The Executor on each Worker Node serves different applications, and data cannot be shared between them.
5. Spark uses mobile computing rather than mobile data. When a Job is divided into different tasks, the Driver will move the calculation we want to perform to the node where each Task is located and perform the calculation at the same time, which speeds up the efficiency.

RDD

RDD (resilient distributed dataset): elastic distributed dataset. It is the core of Spark and the main data abstraction. It is the basic computing unit of Spark.
Simple explanation:
RDD is a collection that splits data items into multiple partitions, stores them in memory on the working node of the cluster, and performs correct operations.
Complex interpretation:
RDD is an interface for data conversion;
RDD points to data stored in HDFS, Cassandra, HBase, etc., or cache (memory, memory + disk, disk only, etc.), or recalculate other RDD partitions in case of failure or cache retraction.
Distributed datasets:
RDD is a set of read-only and partition records, and each partition is distributed on different nodes of the cluster;
RDD does not store real data, but only describes data and operations;
Elasticity:
RDD is stored in memory by default. When memory is insufficient, Spark will automatically write RDD to disk;
Fault tolerance:
Depending on the data lineage, partitions can be automatically recovered from node failures.
Characteristics of RDD

  • A series of partition (slice) information, and each character processes a partition;
  • Each partition has a compute function to calculate the data in the partition;
  • There are a series of dependencies between RDD S (wide dependence and narrow dependence);
  • The partition function determines which partition the data (key value) is allocated to;
  • The best location list assigns the calculated person to the storage location of its processing data block.

DAG: directed acyclic graph

Like RDD, it is the core abstraction provided by Spark and reflects the dependencies between RDDS:

RDD creation

We create a Spark project through Maven of Idea.
For the creation of Maven, please refer to my previous article:
Interaction between Hadoop and Java
1. Download dependent packages:

<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.21</version>
    </dependency>
	<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>

2. After installation, create a new file scala under the main folder


3. Set the scala file to Source Root

4. Setting up the Scala SDK



5. After we create the file in scala, we can:

RDD creation method 1: parallelize

1. Create a wordCount object to demonstrate word statistics.

import org.apache.spark.{SparkConf, SparkContext}

object wordCount {
  def main(args: Array[String]): Unit = {
    //local[2] is the number of threads (partitions). setAppName("worldCount") names the Application
    val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val sc = SparkContext.getOrCreate(conf)
	//Create an rdd
    val rdd1 = sc.parallelize(List("hello world","hello scala","hello spark","scala"))
    //Implement word statistics
    rdd1.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println)

  }
}

At this time, the result display will produce a lot of prompt messages, which will affect the viewing of the results. We can remove these hints through log4j.


2. Create a resource file in the project root directory:

Locate the log4j file in the following path of External Libraries

Copy the log4j file to the created resource folder and modify the following name:

Change rootCategory=INFO to ERROR


When you run again, you will find that the prompt message is gone:

RDD creation method 2: makeRDD

Above, we use the parallelize method to create, and another way is to use makeRDD. Before creating, let's look at the source code of these two methods:

def parallelize[T](seq : scala.Seq[T], numSlices : 
scala.Int = { /* compiled code */ })
(implicit evidence$1 : 
scala.reflect.ClassTag[T]) :
 org.apache.spark.rdd.RDD[T] = 
{ /* compiled code */ }
  def makeRDD[T](seq : scala.Seq[T], numSlices : 
  scala.Int = { /* compiled code */ })(implicit evidence$2 :
   scala.reflect.ClassTag[T]) : 
   org.apache.spark.rdd.RDD[T] = { /* compiled code */ }

The codes of the two methods are actually the same, but the method names are different. I won't demonstrate it here.

Partition settings

In addition to specifying the partition in setMaster(), we can also set the partition when creating RDD
At this time, the number of partitions is 2:

val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val sc = SparkContext.getOrCreate(conf)
    val rdd2 = sc.makeRDD(List("hello world","hello scala","hello spark","scala"))
    rdd2.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println)
    //partition
    val partition = rdd2.partitions
    println("The number of partitions is:"+partition.length)


When creating RDD, add 3 after the statement and set the number of partitions to 3

val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val sc = SparkContext.getOrCreate(conf)
    val rdd2 = sc.makeRDD(List("hello world","hello scala","hello spark","scala"),3)
    rdd2.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect.foreach(println)
    //partition
    val partition = rdd2.partitions
    println("The number of partitions is:"+partition.length)

The number of partitions becomes 3.

textFile create RDD

Create locally
We can also create an RDD by loading files.
Document content:

hello world
hello scala
hello spark
scala

Put the file on the idea.

import org.apache.spark.{SparkConf, SparkContext}

object wordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val sc = SparkContext.getOrCreate(conf)
    //Relative path
    val words = sc.textFile("data/word.txt")
    //Absolute path
    //val words = sc.textFile("C:/data/java/Scala/SparkDemo/data/word.txt")
    words.collect.foreach(println)
  }
}


hdfs upload
Find the file path to run on hdfs.

import org.apache.spark.{SparkConf, SparkContext}
object wordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
    val sc = SparkContext.getOrCreate(conf)
//Path to file on hdfs
    val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/word.txt")
    wordsHdfs.collect.foreach(println)
  }
}

The two creation methods are actually the same, but the paths are different.

When we load a file, we can select its parent directory, which will read out all the files in the parent directory:

val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/")

You can also read the contents of the file you want by pattern matching:

val wordsHdfs = sc.textFile("hdfs://hadoop01:9000/spark/test/*.txt")

Creating RDD in linux

Creating RDD in linux is much simpler than on idea. Enter the shell interface of spark and directly enter the creation statement:

val rdd=sc.textFile("/data/hello/hello1.txt")
//file can be omitted
val rdd2=sc.textFile("file:///data/hello/hello1.txt")

Tags: Big Data Spark

Posted by Someone789 on Sun, 08 May 2022 23:43:36 +0300