spark learning -- spark SQL

Spark SQL

Spark SQL is a module that Spark uses to process structured data. It provides a programming abstraction called DataFrame and acts as a distributed SQL query engine. In Hive, Hive SQL is converted into MapReduce and then submitted to the cluster for execution, which greatly simplifies the complexity of writing MapReduce programs, because the execution efficiency of MapReduce is relatively slow. All Spark SQL came into being. It converts Spark SQL into RDD and submits it to the cluster for execution. The execution efficiency is very fast!

SparkConf and SparkSession

SparkContext

Any Spark program starts with SparkContext. The initialization of SparkContext requires a SparkConf object. SparkConf contains various parameters of Spark cluster configuration.
Once initialized, you can use the various methods contained in the SparkContext object to create and manipulate RDD s and shared variables.

SparkContext creation method 1 using SparkConf

    val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest")
    val sc = SparkContext.getOrCreate(conf)

SparkContext creation method 2 using SparkSession

    val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
    val sc = spark.sparkContext

RDD,DataSet(DS),DataFrame(DF)

DataFrame and DataSet are two new abstractions provided by Spark in SparkSQL. What is the difference between them and RDD?

Compared with RDD, DataSet does not know the column information of self-generated elements, while DataSet can know the column and column information of data.
Compared with DataSet, DataFrame: The data type in DataSet is determined according to the actual situation, while the data type in DataFrame is fixed as ROW

Convert RDD to DS

import org.apache.spark.sql.SparkSession

object CreateDataSetDemo {
  case class Point(label:String,x:Double,y:Double)
  case class Category(id:Long,name:String)
  def main(args: Array[String]): Unit = {
    //1. The first step is to create a SparkSession object
    val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
    val sc = spark.sparkContext
    //guide package
    import spark.implicits._
    //2. Create an RDD
    val pointRDD = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))
    val categoryRDD = sc.makeRDD(List((1,"foo"),(2,"bar")))
    //2. Use RDD to create a DataSet
    val ds1 = pointRDD.toDS()
    ds1.show()
    println("------------------")
    val pointDS = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()
    val categoryDS = categoryRDD.map(x=>Category(x._1,x._2)).toDS()
    pointDS.show()
    categoryDS.show()
    val df = pointDS.join(categoryDS,pointDS("label")===categoryDS("name"))
    df.show()
    spark.stop()
  }
}

(The same is true for DF)

Demo: load data, convert to DS

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object test01 extends App {
  case class Order(id:String,date:String,customerId:String,status:String)
  case class Product(id:String,categoryId:String,name:String,description:String,price:String,image:String)
  case class Customer(id:String,fname:String,lname:String,email:String,password:String,street:String,city:String,state:String,zipcode:String)
  case class Order_Item(id:String,order_id:String,product_id:String,quantity:String,subtotal:String,product_price:String)
  //1. The first step is to create a SparkSession object
  val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
  val sc = spark.sparkContext
  //guide package
  import spark.implicits._

  private val orderRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\orders.csv")
  private val productRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\products.csv")
  private val customerRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\customers.csv")
  private val order_ItemRDD: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\order_items.csv")

  private val orderDS: Dataset[Order] = orderRDD.map(x => {
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Order(fields(0), fields(1), fields(2), fields(3))
  }).toDS()
  private val productDS: Dataset[Product] = productRDD.map(x => {
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Product(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5))
  }).toDS()
  private val customerDS: Dataset[Customer] = customerRDD.map(x => {
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Customer(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5), fields(6), fields(7), fields(8))
  }).toDS()
  private val order_ItemDS: Dataset[Order_Item] = order_ItemRDD.map(x => {
    val fields = x.split(",").map(y => y.replace("\"", ""))
    Order_Item(fields(0), fields(1), fields(2), fields(3), fields(4), fields(5))
  }).toDS()
  order_ItemDS.show()
  customerDS.show()
  productDS.show()
  orderDS.show()
  spark.stop()

Read json file and convert to DF

import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrameDemo extends App {
  //1. The first step is to create a SparkSession object
  val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
  val sc = spark.sparkContext
  //guide package
  import spark.implicits._
  //Read the json file through spark.read and generate a DataFrame
  private val jsontoDF: DataFrame = spark.read.json("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.json")
  jsontoDF.show()
  //print field information
  /**
    * root
    * |-- age: long (nullable = true)
    * |-- name: string (nullable = true)
    * Is the field type nullable?
    */
  jsontoDF.printSchema()
  //select query
  jsontoDF.select("name").show()
  jsontoDF.select("age").show()
  //query multiple fields
  jsontoDF.select(jsontoDF("name"),jsontoDF("age")).show()
  //age+1
  jsontoDF.select(jsontoDF("name"),jsontoDF("age")+1).show()
  //another form
  println("------------")
  jsontoDF.select($"name",$"age"+1).show()
  //Filter similar to where in sql
  jsontoDF.filter($"age">20).show()
  //group groupBy
  jsontoDF.groupBy("age").count().show()
  //Let's register the DataFrame as a table
  jsontoDF.createOrReplaceTempView("people")
  private val df2: DataFrame = spark.sql("select * from people where age>20")
  df2.show()

  jsontoDF.createOrReplaceGlobalTempView("p1")
  spark.newSession().sql("select * from global_temp,p1 where age>20").show()
  spark.stop()
}

Convert RDD to DF through sample class

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object DataFrameDemo02 extends App {
  case class People(name:String,age:Int)

  //1. The first step is to create a SparkSession object
  val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
  val sc = spark.sparkContext
  //guide package
  import spark.implicits._

  private val peopleRDD: RDD[Array[String]] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.txt")
    .map(_.split(","))
  //Convert RDD to DF with sample class
  private val peopleDF: DataFrame = peopleRDD.map(x=>People(x(0),x(1).trim.toInt)).toDF()
  peopleDF.printSchema()
  peopleDF.show()
  spark.stop()
}

Create DF by schema

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession, types}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
  * @author WGY
  */
object createDataFrameBySchame extends App {
  //1. The first step is to create a SparkSession object
  val spark = SparkSession.builder().master("local[*]").appName("testSparkSQL").getOrCreate()
  val sc = spark.sparkContext
  //guide package
  import spark.implicits._

  private val textRDD: RDD[Array[String]] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\people.txt").map(_.split(","))

  //Define schema information name,age
  private val schema = StructType(Array(
    StructField("name", StringType, true),
    StructField("age", IntegerType, true)
  ))
  schema
  //Convert rdd to row type
  private val mapRDD: RDD[Row] = textRDD.map(x=>Row(x(0),x(1).trim.toInt))
  //Convert RDD to DataFrame
  private val df1: DataFrame = spark.createDataFrame(mapRDD,schema)

  df1.printSchema()
  df1.show()
  println("--------------")
  //DataFrame->RDD
  private val r1: RDD[Row] = df1.rdd
  println(r1.collect().mkString(" "))
}
spark.stop()

Convert DF to RDD

/** people.json The content is as follows
      * {"name":"Michael"}
      * {"name":"Andy", "age":30}
      * {"name":"Justin", "age":19}
      */
val df = spark.read.json("file:///home/hadoop/data/people.json")
//Convert DF to RDD
df.rdd.collect

Spark SQL to manipulate external data sources

Read parquet file via DF

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._

object TestParquet extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate()
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext

  private val rdd1: RDD[(String, String, Array[Int])] = sc.parallelize(List(
    ("zhangsan", "green", Array(3, 5, 6, 9)),
    ("lisi", "red", Array(3, 5, 6, 10)),
    ("wangwu", "black", Array(3, 5, 6, 11)),
    ("zhaoliu", "yellow", Array(3, 5, 6, 12)),
    ("wuqi", "green", Array(3, 5, 6, 13))
  ))

  private val structType = StructType(Array(
    StructField("name", StringType),
    StructField("color", StringType),
    StructField("numbers", ArrayType(IntegerType))
  ))
 private val rowRDD: RDD[Row] = rdd1.map(x=>Row(x._1,x._2,x._3))
  private val df: DataFrame = spark.createDataFrame(rowRDD,structType)
  //Read and write parquet files
//  df.write.parquet("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\user")
private val parquetRDD: DataFrame = spark
  .read
  .parquet("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\user")
  parquetRDD.printSchema()
  parquetRDD.show()
  spark.stop()
}

spark connection database (SparkToMySQL)

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkToMySQL extends App{
  private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("mysql").getOrCreate()
  val url = "jdbc:mysql://hadoop1:3306/test"
  val tableName="student"//table name in mysql
  private val prop = new Properties()
  prop.setProperty("user","kb07")//username
  prop.setProperty("password","ok")//password
  prop.getProperty("driver","com.mysql.jdbc.Driver")
  //connect
  private val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,prop)
  mysqlDF.printSchema()
  mysqlDF.show()
  spark.stop()
}

spark connect hive (HiveWoSpark)

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object HiveOnSpark extends App {
  private val spark: SparkSession = SparkSession.builder()
    .master("local[4]")
    .appName("Demo2")
    .enableHiveSupport()
    .config("hive.metastore.uris","thrift://192.168.226.101:9083")
    .getOrCreate()
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext

  private val df1: DataFrame = spark.sql("select * from toronto")
  df1.printSchema()
  df1.show()
  spark.stop()
}

Spark SQL built-in functions

package SQL0813

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.swing.event.AdjustingEvent

object testInterFunction extends App {
  case class Student(id:Int, name:String,sex:String,age:Int)

  private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate()
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext
  //Import the packages needed by built-in functions
  import org.apache.spark.sql.functions._

  private val accessLog = Array(
    "2020-08-13,1",
    "2020-08-13,1",
    "2020-08-13,2",
    "2020-08-13,2",
    "2020-08-13,3",
    "2020-08-13,3",
    "2020-08-14,1",
    "2020-08-14,1",
    "2020-08-14,2",
    "2020-08-14,3",
    "2020-08-15,1",
    "2020-08-15,1",
    "2020-08-15,2",
    "2020-08-15,3"
  )
  private val accessLogRDD: RDD[Row] = sc.parallelize(accessLog).map(x => {
    val strings = x.split(",")
    Row(strings(0), strings(1).toInt)
  })

  private val structType = StructType(Array(
    StructField("day", StringType),
    StructField("userID", IntegerType)
  ))

  private val logDF: DataFrame = spark.createDataFrame(accessLogRDD,structType)
  logDF.printSchema()
  logDF.show()

//Find how many visits per day
  //agg aggregation
  logDF.groupBy("day").agg(count("userID").as("pv")).show()
  //result of deduplication
  logDF.groupBy("day").agg(countDistinct("userID").as("uv")).show()
  //sql method
  logDF.createOrReplaceTempView("logs")
  spark.sql(
    """
      |select day,count(distinct userID) as pv
      |from logs
      |group by day
    """.stripMargin
  ).show()
  
  private val stuDF: DataFrame = Seq(
    Student(1001, "zhangsan", "F", 20),
    Student(1002, "zhangsan1", "M", 18),
    Student(1003, "zhangsan2", "F", 20),
    Student(1004, "zhangsan3", "F", 20),
    Student(1005, "zhangsan4", "M", 20),
    Student(1006, "zhangsan5", "F", 20),
    Student(1007, "zhangsan6", "M", 20),
    Student(1008, "zhangsan7", "F", 25),
    Student(1009, "zhangsan8", "M", 20),
    Student(1010, "zhangsan9", "F", 20)
  ).toDF()
  stuDF.printSchema()
  stuDF.show()
  //Average age grouped by gender
  stuDF.groupBy("sex").agg(avg("age")).show()
  stuDF.groupBy("sex").avg("age").show()
  //Age by gender (average max min)
  stuDF.groupBy("sex").agg("age"->"avg","age"->"max","age"->"min").show()
  //Group by gender and age
  stuDF.groupBy("sex","age").count().show()
  //Sort by age
  stuDF.sort("age").show()
  stuDF.sort($"age".desc).show()
  //Sort by age in reverse order
  stuDF.sort($"age",$"id".desc).show()
  //
  stuDF.orderBy($"age".desc).show()
  
  spark.stop()

}

Spark SQL custom function

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object TestUGFDemo extends App {
  private val spark: SparkSession = SparkSession.builder().master("local[4]").appName("Demo2").getOrCreate()
  import spark.implicits._
  private val sc: SparkContext = spark.sparkContext
  //Import the packages needed by built-in functions
  import org.apache.spark.sql.functions._
  private val info: RDD[String] = sc.textFile("file:///D:\\IntelliJ IDEA 2018.2.2\\Project\\testscala\\spark\\src\\main\\scala\\data\\sparkSQL\\hobbies.txt")
  private val hobbyDF: DataFrame = info.map(_.split("\t")).map(x=>Hobbies(x(0),x(1))).toDF()
  hobbyDF.createOrReplaceTempView("hobby")
  //Register the UDF function and use
  spark.udf.register("hobby_num",(s:String)=>s.split(",").size)
  spark.sql(
    """
      |select name,hobbies,hobby_num(hobbies) as hobbyNum
      |from hobby
    """.stripMargin
  ).show()
  spark.stop()

}
case class Hobbies(name:String,hobbies:(String))

Tags: Big Data Spark SQL sparkSQL

Posted by SoaringUSAEagle on Mon, 23 May 2022 18:39:54 +0300